铆钉卷花图片:java实时获取oracle数据变更
来源:百度文库 编辑:九乡新闻网 时间:2024/05/11 16:25:44
在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。
背景:
要做一个车辆GPS监控系统,主要分两块:
1.采集。由GPS厂商提供实时数据,通过UDP包接收
2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
备选方案:
1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。
方案评估:
方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。
方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。
最终方案:
最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。
关于捕获进程,请参考《Streams概述》,《Streams捕获进程》
实现:
SQL代码
创建表空间和用户:
Sql代码
授予相应权限
Sql代码
执行系统存储过程分配权限
Sql代码
以strmadmin帐户登录oracle
创建AQ,类型为JMS消息
Sql代码
为目标表创建捕获进程
Sql代码
初始化scn
Sql代码
为消息队列创建代理
Sql代码
创建存储过程以决定将哪些信息放到消息队列里面
Sql代码
为目标表配置处理器
Sql代码
设定参数及启动捕获进程
Sql代码
至此,捕获进程配置完毕
可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。
下面是java处理代码,可直接使用JMS接口
本例使用oracle提供的API
Java代码
Java代码
开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。
Java代码
最后:
本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。
背景:
要做一个车辆GPS监控系统,主要分两块:
1.采集。由GPS厂商提供实时数据,通过UDP包接收
2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
备选方案:
1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。
方案评估:
方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。
方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。
最终方案:
最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。
关于捕获进程,请参考《Streams概述》,《Streams捕获进程》
实现:
SQL代码
创建表空间和用户:
Sql代码
- Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;
- --修改目标表(要捕获变更的表)追加日志
- ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;
- create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;
- grant connect, resource, select_catalog_role to strmadmin;
授予相应权限
Sql代码
- grant execute on dbms_aqadm to strmadmin;
- grant execute on dbms_capture_adm to strmadmin;
- grant execute on dbms_propagation_adm to strmadmin;
- grant execute on dbms_streams_adm to strmadmin;
- grant execute on dbms_apply_adm to strmadmin;
- grant execute on dbms_flashback to strmadmin;
- grant execute on dbms_aq to strmadmin;
- grant execute on dbms_aqjms to strmadmin;
- grant execute on dbms_aqin to strmadmin;
- grant execute on dbms_aqjms_internal to strmadmin;
执行系统存储过程分配权限
Sql代码
- BEGIN
- DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
- privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
- grantee => 'strmadmin',
- grant_option => FALSE);
- END;
- /
- BEGIN
- DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
- privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
- grantee => 'strmadmin',
- grant_option => FALSE);
- END;
- /
以strmadmin帐户登录oracle
创建AQ,类型为JMS消息
Sql代码
- BEGIN
- DBMS_AQADM.CREATE_QUEUE_TABLE(
- Queue_table => 'gpsstatus_queue_table',
- Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
- multiple_consumers => false,
- compatible => '8.1.5');
- DBMS_AQADM.CREATE_QUEUE(
- Queue_name => 'gpsstatus_queue',
- Queue_table => 'gpsstatus_queue_table');
- DBMS_AQADM.START_QUEUE(
- queue_name => 'gpsstatus_queue');
- END;
- /
- BEGIN
- DBMS_STREAMS_ADM.SET_UP_QUEUE(
- queue_table => 'gps_temp_queue_table',
- queue_name => 'gps_temp_queue');
- END;
- /
为目标表创建捕获进程
Sql代码
- BEGIN
- DBMS_STREAMS_ADM.ADD_TABLE_RULES(
- table_name => 'myoracle.TEST_GPS_STATUS',
- streams_type => 'capture',
- streams_name => 'capture_gps',
- queue_name => 'gps_temp_queue',
- include_dml => true,
- include_ddl => false);
- END;
- /
初始化scn
Sql代码
- DECLARE
- iscn NUMBER; -- Variable to hold instantiation SCN value
- BEGIN
- iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
- DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
- source_object_name => 'myoracle.TEST_GPS_STATUS',
- source_database_name => 'TESTdb',
- instantiation_scn => iscn);
- END;
- /
为消息队列创建代理
Sql代码
- BEGIN
- DBMS_AQADM.CREATE_AQ_AGENT(
- agent_name => 'gpsstatus_agent');
- DBMS_AQADM.ENABLE_DB_ACCESS(
- agent_name => 'gpsstatus_agent',
- db_username => 'strmadmin');
- END;
- /
- DECLARE
- subscriber SYS.AQ$_AGENT;
- BEGIN
- subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
- SYS.DBMS_AQADM.ADD_SUBSCRIBER(
- queue_name => 'strmadmin.gpsstatus_queue',
- subscriber => subscriber,
- rule => NULL,
- transformation => NULL);
- END;
- /
创建存储过程以决定将哪些信息放到消息队列里面
Sql代码
- CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
- --agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
- message sys.aq$_jms_message;
- enqueue_options dbms_aq.enqueue_options_t;
- message_properties dbms_aq.message_properties_t;
- msgid raw(16);
- lcr SYS.LCR$_ROW_RECORD;
- rc PLS_INTEGER;
- DEVICEID varchar2(11);
- GATHERDATETIME date;
- LONGITUDETYPE char(1);
- LONGITUDEVALUE number ;
- LATITUDETYPE char(1);
- LATITUDEVALUE number ;
- SPEED number ;
- DIRECTION number ;
- BEGIN
- rc := in_any.GETOBJECT(lcr);
- DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
- GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
- LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
- LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
- LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
- LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
- SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
- DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
- message := sys.aq$_jms_message.construct(1);
- --message.set_replyto(agent);
- message.set_type('');
- message.set_userid('strmadmin');
- message.set_appid('');
- message.set_groupid('');
- message.set_groupseq('');
- message.set_string_property('DEVICEID', DEVICEID);
- message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
- message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
- message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
- message.set_string_property('LATITUDETYPE', LATITUDETYPE);
- message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
- message.set_string_property('SPEED', to_char(SPEED) );
- message.set_string_property('DIRECTION', to_char(DIRECTION) );
- --指定消息生存时间
- message_properties.expiration:=60;
- dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
- enqueue_options => enqueue_options,
- message_properties => message_properties,
- payload => message,
- msgid => msgid);
- COMMIT;
- END;
- /
为目标表配置处理器
Sql代码
- BEGIN
- DBMS_APPLY_ADM.SET_DML_HANDLER(
- object_name => 'myoracle.TEST_GPS_STATUS',
- object_type => 'TABLE',
- operation_name => 'UPDATE', --可配置为insert,update,delete等
- error_handler => false,
- user_procedure => 'strmadmin.enq_gps_lcr',
- apply_database_link => NULL);
- END;
- /
设定参数及启动捕获进程
Sql代码
- BEGIN
- DBMS_STREAMS_ADM.ADD_TABLE_RULES(
- table_name => 'myoracle.TEST_GPS_STATUS',
- streams_type => 'apply',
- streams_name => 'apply_gps',
- queue_name => 'strmadmin.gps_temp_queue',
- include_dml => true,
- include_ddl => false,
- source_database => 'TESTdb');
- END;
- /
- BEGIN
- DBMS_APPLY_ADM.SET_PARAMETER(
- apply_name => 'apply_gps',
- parameter => 'disable_on_error',
- value => 'n');
- END;
- /
- BEGIN
- DBMS_APPLY_ADM.START_APPLY(
- apply_name => 'apply_gps');
- END;
- /
- BEGIN
- DBMS_CAPTURE_ADM.START_CAPTURE(
- capture_name => 'capture_gps');
- END;
- /
至此,捕获进程配置完毕
可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。
下面是java处理代码,可直接使用JMS接口
本例使用oracle提供的API
Java代码
- QueueConnectionFactory queueConnectionFactory = null;
- QueueConnection queueConnection = null;
- QueueSession queueSession = null;
- Queue queue = null;
- QueueReceiver subscriber = null;
- Message message = null;
Java代码
- log.info("开始连接 ");
- queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
- queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
- log.info("创建Queue Connection 成功");
- queueConnection.start();
- log.info("connection started");
- queueSession = queueConnection.createQueueSession(false,
- Session.AUTO_ACKNOWLEDGE);
- .info("Queue session created");
- queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
- log.info("Queue getted");
- subscriber = queueSession.createReceiver(queue);
- log.info("初始化完成");
开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。
Java代码
- while (true) {
- message = subscriber.receive();//receive方法使没有新消息时,线程挂起
- //do something...
- }
最后:
本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。
java实时获取oracle数据变更
获取实时股票数据
实时股票数据获取方式
获取实时股票数据与股票数据接口API
Oracle数据库实现获取前几条数据的方法
Java连接Oracle数据库
java获取当前路径
oracle _数据字典
在Java中获取环境变量
实时在线获取漂亮Flash时钟
jdbc优化[手工原创] java oracle - DAO
Excel动画教程:获取网络数据
java获取当前路径(包含tomcat)2
香港天文台实时公布辐射数据
利用顶牛数据精选个股用实时数据说话 2
Oracle中大批量删除数据的方法
Oracle中的数据字典技术初级入门
Oracle数据的异地自动备份
Oracle 中如何删除重复数据
oracle中如何删除重复数据
将excel数据导入oracle中- -
Oracle数据库数据对象分析(下)
使用Java?API压缩和?解压缩数据
JSON与JAVA的数据转换