铆钉卷花图片:java实时获取oracle数据变更

来源:百度文库 编辑:九乡新闻网 时间:2024/04/28 11:41:19
在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。

背景:

       要做一个车辆GPS监控系统,主要分两块:

    1.采集。由GPS厂商提供实时数据,通过UDP包接收
    2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
      

备选方案:

     1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
     2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。

方案评估:



    方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。

    方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。



最终方案:



   最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。

    关于捕获进程,请参考《Streams概述》,《Streams捕获进程》



实现:




  

SQL代码


    创建表空间和用户:

   

Sql代码  
  1. Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;  
  2.   
  3. --修改目标表(要捕获变更的表)追加日志  
  4. ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;  
  5.   
  6.   
  7. create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;  
  8.   
  9.   
  10. grant connect, resource, select_catalog_role to strmadmin;  




授予相应权限

Sql代码  
  1. grant execute on dbms_aqadm to strmadmin;  
  2.   
  3. grant execute on dbms_capture_adm to strmadmin;  
  4.   
  5. grant execute on dbms_propagation_adm to strmadmin;  
  6.   
  7. grant execute on dbms_streams_adm to strmadmin;  
  8.   
  9. grant execute on dbms_apply_adm to strmadmin;  
  10.   
  11. grant execute on dbms_flashback to strmadmin;  
  12.   
  13. grant execute on dbms_aq to strmadmin;  
  14.   
  15. grant execute on dbms_aqjms to strmadmin;  
  16.   
  17. grant execute on dbms_aqin to strmadmin;  
  18.   
  19. grant execute on dbms_aqjms_internal to strmadmin;  
  20.    



执行系统存储过程分配权限


   
Sql代码  
  1. BEGIN  
  2. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(  
  3. privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,  
  4. grantee => 'strmadmin',  
  5. grant_option => FALSE);  
  6. END;  
  7. /  
  8.   
  9.   
  10.   
  11. BEGIN  
  12. DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(  
  13. privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,  
  14. grantee => 'strmadmin',  
  15. grant_option => FALSE);  
  16. END;  
  17. /  
  18.    





以strmadmin帐户登录oracle




创建AQ,类型为JMS消息



Sql代码  
  1. BEGIN  
  2.    DBMS_AQADM.CREATE_QUEUE_TABLE(  
  3.         Queue_table            => 'gpsstatus_queue_table',  
  4.         Queue_payload_type     => 'SYS.AQ$_JMS_MESSAGE',  
  5.         multiple_consumers  => false,  
  6.         compatible             => '8.1.5');  
  7.    DBMS_AQADM.CREATE_QUEUE(  
  8.       Queue_name          => 'gpsstatus_queue',  
  9.       Queue_table         => 'gpsstatus_queue_table');  
  10.    DBMS_AQADM.START_QUEUE(  
  11.       queue_name         => 'gpsstatus_queue');  
  12. END;  
  13. /  
  14. BEGIN  
  15. DBMS_STREAMS_ADM.SET_UP_QUEUE(  
  16.     queue_table  => 'gps_temp_queue_table',  
  17.     queue_name   => 'gps_temp_queue');  
  18. END;  
  19. /  




为目标表创建捕获进程



Sql代码  
  1. BEGIN  
  2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(  
  3. table_name => 'myoracle.TEST_GPS_STATUS',  
  4. streams_type => 'capture',  
  5. streams_name => 'capture_gps',  
  6. queue_name => 'gps_temp_queue',  
  7. include_dml => true,  
  8. include_ddl => false);  
  9. END;  
  10. /  




初始化scn


Sql代码  
  1. DECLARE  
  2. iscn NUMBER; -- Variable to hold instantiation SCN value  
  3. BEGIN  
  4. iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();  
  5. DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(  
  6. source_object_name => 'myoracle.TEST_GPS_STATUS',  
  7. source_database_name => 'TESTdb',  
  8. instantiation_scn => iscn);  
  9. END;  
  10. /  
  11.    



为消息队列创建代理



Sql代码  
  1. BEGIN  
  2. DBMS_AQADM.CREATE_AQ_AGENT(  
  3. agent_name => 'gpsstatus_agent');  
  4. DBMS_AQADM.ENABLE_DB_ACCESS(  
  5. agent_name => 'gpsstatus_agent',  
  6. db_username => 'strmadmin');  
  7. END;  
  8. /  
  9. DECLARE  
  10. subscriber SYS.AQ$_AGENT;  
  11. BEGIN  
  12. subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);  
  13. SYS.DBMS_AQADM.ADD_SUBSCRIBER(  
  14. queue_name => 'strmadmin.gpsstatus_queue',  
  15. subscriber => subscriber,  
  16. rule => NULL,  
  17. transformation => NULL);  
  18. END;  
  19. /  




创建存储过程以决定将哪些信息放到消息队列里面




Sql代码  
  1. CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS  
  2. --agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);  
  3. message sys.aq$_jms_message;  
  4. enqueue_options dbms_aq.enqueue_options_t;  
  5. message_properties dbms_aq.message_properties_t;  
  6. msgid raw(16);  
  7. lcr SYS.LCR$_ROW_RECORD;  
  8. rc PLS_INTEGER;  
  9. DEVICEID varchar2(11);  
  10. GATHERDATETIME date;  
  11. LONGITUDETYPE char(1);  
  12. LONGITUDEVALUE number ;  
  13. LATITUDETYPE char(1);  
  14. LATITUDEVALUE number ;  
  15. SPEED number ;  
  16. DIRECTION number ;  
  17. BEGIN  
  18. rc := in_any.GETOBJECT(lcr);  
  19. DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();  
  20. GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();  
  21. LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();  
  22. LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();  
  23. LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();  
  24. LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();  
  25. SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();  
  26. DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();  
  27. message := sys.aq$_jms_message.construct(1);  
  28. --message.set_replyto(agent);  
  29. message.set_type('');  
  30. message.set_userid('strmadmin');  
  31. message.set_appid('');  
  32. message.set_groupid('');  
  33. message.set_groupseq('');  
  34. message.set_string_property('DEVICEID', DEVICEID);  
  35. message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));  
  36. message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);  
  37. message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );  
  38. message.set_string_property('LATITUDETYPE', LATITUDETYPE);  
  39. message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));  
  40. message.set_string_property('SPEED', to_char(SPEED) );  
  41. message.set_string_property('DIRECTION', to_char(DIRECTION) );  
  42. --指定消息生存时间  
  43. message_properties.expiration:=60;  
  44. dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',  
  45. enqueue_options => enqueue_options,  
  46. message_properties => message_properties,  
  47. payload => message,  
  48. msgid => msgid);  
  49. COMMIT;  
  50. END;  
  51. /  




为目标表配置处理器



Sql代码  
  1. BEGIN  
  2. DBMS_APPLY_ADM.SET_DML_HANDLER(  
  3. object_name => 'myoracle.TEST_GPS_STATUS',  
  4. object_type => 'TABLE',  
  5. operation_name => 'UPDATE',  --可配置为insert,update,delete等  
  6. error_handler => false,  
  7. user_procedure => 'strmadmin.enq_gps_lcr',  
  8. apply_database_link => NULL);  
  9. END;  
  10. /  




设定参数及启动捕获进程





Sql代码  
  1. BEGIN  
  2. DBMS_STREAMS_ADM.ADD_TABLE_RULES(  
  3. table_name => 'myoracle.TEST_GPS_STATUS',  
  4. streams_type => 'apply',  
  5. streams_name => 'apply_gps',  
  6. queue_name => 'strmadmin.gps_temp_queue',  
  7. include_dml => true,  
  8. include_ddl => false,  
  9. source_database => 'TESTdb');  
  10. END;  
  11. /  
  12. BEGIN  
  13. DBMS_APPLY_ADM.SET_PARAMETER(  
  14. apply_name => 'apply_gps',  
  15. parameter => 'disable_on_error',  
  16. value => 'n');  
  17. END;  
  18. /  
  19. BEGIN  
  20. DBMS_APPLY_ADM.START_APPLY(  
  21. apply_name => 'apply_gps');  
  22. END;  
  23. /  
  24. BEGIN  
  25. DBMS_CAPTURE_ADM.START_CAPTURE(  
  26. capture_name => 'capture_gps');  
  27. END;  
  28. /  




至此,捕获进程配置完毕

可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。





下面是java处理代码,可直接使用JMS接口

本例使用oracle提供的API





Java代码  
  1. QueueConnectionFactory queueConnectionFactory = null;  
  2. QueueConnection queueConnection = null;  
  3. QueueSession queueSession = null;  
  4.   
  5. Queue queue = null;  
  6. QueueReceiver subscriber = null;  
  7. Message message = null;  










Java代码  
  1. log.info("开始连接 ");  
  2. queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");  
  3. queueConnection = queueConnectionFactory.createQueueConnection(userName, password);  
  4. log.info("创建Queue Connection 成功");  
  5. queueConnection.start();  
  6. log.info("connection started");  
  7. queueSession = queueConnection.createQueueSession(false,  
  8. Session.AUTO_ACKNOWLEDGE);  
  9. .info("Queue session created");  
  10. queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);  
  11. log.info("Queue getted");  
  12. subscriber = queueSession.createReceiver(queue);  
  13. log.info("初始化完成");  




开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。





   

Java代码  
  1. while (true) {  
  2.          message = subscriber.receive();//receive方法使没有新消息时,线程挂起  
  3.       //do something...  
  4. }  
  5.    







最后:



      本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。