钟汉良北京演唱会地址:php操作hbase例子

来源:百度文库 编辑:九乡新闻网 时间:2024/05/02 08:51:48
$GLOBALS['THRIFT_ROOT'] = '/home/thrift';
  2 require_once $GLOBALS['THRIFT_ROOT'].'/Thrift.php';
  3 require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php';
  4 require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
  5 require_once $GLOBALS['THRIFT_ROOT'].'/transport/THttpClient.php';
  6 require_once $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php';
  7 require_once $GLOBALS['THRIFT_ROOT'].'/packages/Hbase/Hbase.php';
  8 require_once($GLOBALS['THRIFT_ROOT'].'/packages/hadoopfs/ThriftHadoopFileSystem.php');
  9
10 $socket = new TSocket(192.168.1.4, 9091);
11 $socket->setSendTimeOut(20000);
12 $socket->setRecvTimeOut(20000);
13 $transport = new TBufferedTransport($socket);
14 $protocol = new TBinaryProtocol($transport);
15 $hbase = new HbaseClient($protocol);
16 $transport->open();
17
18 // 在这里实现功能
19
20 $transport->close();
21
22 // 多记录批量提交(200提交一次时测试小记录大概在5000/s左右):
23 $rows = array('timestamp'=>$timestamp, 'columns'=>array('txt:col1'=>$col1, 'txt:col2'=>$col2, 'txt:col3'=>$col3));
24 $records = array(rowkey=>$rows,...);
25 $batchrecord = array();
26 foreach ($records as $rowkey => $rows) {
27     $timestamp = $rows['timestamp'];
28     $columns = $rows['columns'];
29     // 生成一条记录
30     $record = array();
31     foreach($columns as $column => $value) {
32         $col = new Mutation(array('column'=>$column, 'value'=>$value));
33         array_push($record, $col);
34     }
35     // 加入记录数组
36     $batchTmp = new BatchMutation(array('row'=>$rowkey, 'mutations'=>$record));
37     array_push($batchrecord, $batchTmp);
38 }
39 $ret = $hbase->mutateRows('test', $batchrecord);
40
41 // 单记录提交(1000/s左右)
42 $mutation = array(new Mutation(array('column'=>'txt:col1', 'value'=>$col1)),
43           new Mutation(array('column'=>'txt:col2', 'value'=>$col2)),
44           new Mutation(array('column'=>'txt:col3', 'value'=>$col3)));
45 $hbase->mutateRow('test', $rowkey, $mutation);
46
47 // 扫描记录
48 $result = $hbase->scannerOpenWithStop($table, $startKey, $endKey, $columns);
49 while (true) {
50     $record = $hbase->scannerGet($result);
51     if ($record == NULL) {
52         break;
53     }
54     $recordArray = array();
55     foreach($record as $TRowResult) {
56         $row = $TRowResult->row;
57         $column = $TRowResult->columns;
58         foreach($column as $family_column=>$cell) {
59             $recordArray[$family_column] = $cellVal;
60         }
61         $resultArray[] = $recordArray;
62     }
63 }
64 print_r($resultArray);
65
66 // 以下记录从别人处(佛祖球球)拷贝,把几个接口补全,(未测试...)
67
68 // 列出hbase 裡的所有 table
69 echo( "listing tables...\n" );
70 $tables = $client->getTableNames();
71 sort( $tables );
72 foreach ( $tables as $name )
73 {
74     echo $name."\n";
75 }
76
77 // 刪除table
78 $name = "test2";
79 if($client->isTableEnabled($name))
80 {
81     echo "关闭".$name."资料表\n";
82     $client->disableTable($name);
83 }
84 echo "刪除中...\n";
85 $client->deleteTable($name);
86 echo "刪除完成";
87
88 // 新增table
89 $columns = array(new ColumnDescriptor(array('name' => 'name:')),
90            new ColumnDescriptor(array('name'=> 'scores:',)));
91 $t = "test2";
92 echo("creating table: {$t}\n");
93 try
94 {
95     $client->createTable( $t, $columns );
96 }
97 catch (AlreadyExists $ae)
98 {
99     echo( "WARN: {$ae->message}\n" );
100 }
101
102 //列出table內的column family
103 $t = "results";
104 echo("column families in {$t}:\n");
105 $descriptors = $client->getColumnDescriptors( $t );
106 asort( $descriptors );
107 foreach ( $descriptors as $col )
108 {
109     echo( " column: {$col->name}, maxVer: {$col->maxVersions}\n" );
110 }
111
112 // 取得一個 column value
113 $table_name = 'results';//Table name
114 $row_name = 's96113106';//row name
115 $fam_col_name = 'scores:math';//column name
116 $arr = $client->get($table_name, $row_name , $fam_col_name);
117 foreach ($arr as $k=>$v) {
118     echo "value = {$v->value} ,
";
119     echo "timestamp = {$v->timestamp}
";
120 }