跳转至

Python 操作Hbase

1. 介绍

  我们使用python 操作hbase,Apache HBase 是 Hadoop 生态环境中的键值存储系统(Key-value Store)。它构建在 HDFS 之上,可以对大型数据进行高速的读写操作。HBase 的开发语言是 Java,因此提供了原生的 Java 语言客户端。不过,借助于 Thrift 和其丰富的语言扩展,我们可以十分便捷地在任何地方调用 HBase 服务。文本将讲述的就是如何使用 Thrift 和 Python 来读写 HBase。

    1. 安装库
pip install thrift
pip install hbase-thrift
    1. 启动
cd /usr/local/hbase
bin/start-hbase.sh 
    1. 检查

检查hbase和thrift

[root@master bin]# hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hbase-1.3.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.3.1, r930b9a55528fe45d8edce7af42fef2d35e77677a, Thu Apr  6 19:36:54 PDT 2017

hbase(main):001:0> list
TABLE                                                                                                                                                                                        
cmz_music_table                                                                                                                                                                              
emp                                                                                                                                                                                          
hbase_company                                                                                                                                                                                
student                                                                                                                                                                                      
test                                                                                                                                                                                         
user                                                                                                                                                                                         
6 row(s) in 0.1730 seconds

=> ["cmz_music_table", "emp", "hbase_company", "student", "test", "user"]
hbase(main):002:0> put 'user', '524382618264914241', 'info:name', 'zhangsan'
0 row(s) in 0.4270 seconds

hbase(main):003:0> list
TABLE                                                                                                                                                                                        
cmz_music_table                                                                                                                                                                              
emp                                                                                                                                                                                          
hbase_company                                                                                                                                                                                
student                                                                                                                                                                                      
test                                                                                                                                                                                         
user                                                                                                                                                                                         
6 row(s) in 0.0040 seconds

=> ["cmz_music_table", "emp", "hbase_company", "student", "test", "user"]
hbase(main):004:0> scan 'student'
ROW                                              COLUMN+CELL                                                                                                                                 
0 row(s) in 0.0330 seconds

hbase(main):005:0> scan 'user'
ROW                                              COLUMN+CELL                                                                                                                                 
 524382618264914241                              column=info:name, timestamp=1566215270939, value=zhangsan                                                                                   
1 row(s) in 0.0460 seconds

hbase(main):006:0> disable student
NameError: undefined local variable or method `student' for #<Object:0x611e5819>

hbase(main):007:0> disable 'student'
0 row(s) in 2.3160 seconds

hbase(main):008:0> drop 'student'
0 row(s) in 1.2510 seconds

hbase(main):009:0> list
TABLE                                                                                                                                                                                        
cmz_music_table                                                                                                                                                                              
emp                                                                                                                                                                                          
hbase_company                                                                                                                                                                                
test                                                                                                                                                                                         
user                                                                                                                                                                                         
5 row(s) in 0.0130 seconds

=> ["cmz_music_table", "emp", "hbase_company", "test", "user"]

检查thrift
[root@master hbase]# netstat -lnp|grep 9090
tcp6       0      0 :::9090                 :::*                    LISTEN      8969/java 

thrift默认端口是9090

检查的目的就是保住hbase和thrift正常

2. 代码

code
# coding=utf-8
# author: caimengzhi at 2019-08-19
from thrift.transport import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import ColumnDescriptor
from hbase.ttypes import Mutation
import sys

class HBaseClient(object):
    def __init__(self, ip, port=9090):
        """
        建立与thrift server端的连接
        """
        # server端地址和端口设定
        self.__transport = TBufferedTransport(TSocket.TSocket(ip, port))
        # 设置传输协议
        protocol = TBinaryProtocol.TBinaryProtocol(self.__transport)
        # 客户端
        self.__client = Hbase.Client(protocol)
        # 打开连接
        self.__transport.open()

    def __del__(self):
        self.__transport.close()

    def get_tables(self):
        """
        获得所有表
        :return:表名列表
        """
        return self.__client.getTableNames()

    def create_table(self, table, *columns):
        """
        创建表格
        :param table:表名
        :param columns:列族名
        """
        func = lambda col: ColumnDescriptor(col)
        column_families = map(func, columns)
        self.__client.createTable(table, column_families)

    def put(self, table, row, columns):
        """
        添加记录
        :param table:表名
        :param row:行键
        :param columns:列名
        :return:
        """
        func = lambda (k, v): Mutation(column=k, value=v)
        mutations = map(func, columns.items())
        self.__client.mutateRow(table, row, mutations)

    def delete(self, table, row, column):
        """
        删除记录
        :param table:表名
        :param row:行键
        """
        self.__client.deleteAll(table, row, column)

    def scan(self, table, start_row="", columns=None):
        """
        获得记录
        :param table: 表名
        :param start_row: 起始行
        :param columns: 列族
        :param attributes:
        """
        scanner = self.__client.scannerOpen(table, start_row, columns)
        func = lambda (k, v): (k, v.value)
        while True:
            r = self.__client.scannerGet(scanner)
            if not r:
                break
            yield dict(map(func, r[0].columns.items()))


if __name__ == '__main__':
    print '1. 链接 hbase'
    client = HBaseClient('master')

    print '2. 创建表'
    if sys.argv[1] == "init":
        client.create_table('student', 'name', 'course')

    print '3. 插入数据'
    client.put("student", "1",
               {"name:": "Jack",
                "course:art": "88",
                "course:math": "100"})

    print '4. 再次插入数据'
    client.put("student", "2",
               {"name:": "Tom", "course:art": "90",
                "course:math": "100"})

    print '5. 再次插入数据'
    client.put("student", "3",
               {"name:": "Jerry"})

    print '6. 删除数据'
    client.delete('student', '1', 'course:math')

    print '7. 打印数据'
    for v in client.scan('student'):
        print v

运行

[root@master hbase]# python opt_hbase.py init 
1. 链接 hbase
2. 创建表
3. 插入数据
4. 再次插入数据
5. 再次插入数据
6. 删除数据
7. 打印数据
{'course:art': '88', 'name:': 'Jack'}
{'course:art': '90', 'name:': 'Tom', 'course:math': '100'}
{'name:': 'Jerry'}
检查
hbase(main):011:0* list
TABLE                                                                                                                                                                                        
cmz_music_table                                                                                                                                                                              
emp                                                                                                                                                                                          
hbase_company                                                                                                                                                                                
student                                                                                                                                                                                      
test                                                                                                                                                                                         
user                                                                                                                                                                                         
6 row(s) in 0.0130 seconds

=> ["cmz_music_table", "emp", "hbase_company", "student", "test", "user"]
hbase(main):012:0> scan 'student'
ROW                                              COLUMN+CELL                                                                                                                                 
 1             column=course:art, timestamp=1566216127526, value=88                                                                                        
 1             column=name:, timestamp=1566216127526, value=Jack                                                                                           
 2             column=course:art, timestamp=1566216127547, value=90                                                                                        
 2             column=course:math, timestamp=1566216127547, value=100                                                                                      
 2             column=name:, timestamp=1566216127547, value=Tom                                                                                            
 3             column=name:, timestamp=1566216127556, value=Jerry                                                                                          
3 row(s) in 0.0470 seconds

hbase(main):013:0>