Python 操作Hbase
1. 介绍¶
我们使用python 操作hbase,Apache HBase 是 Hadoop 生态环境中的键值存储系统(Key-value Store)。它构建在 HDFS 之上,可以对大型数据进行高速的读写操作。HBase 的开发语言是 Java,因此提供了原生的 Java 语言客户端。不过,借助于 Thrift 和其丰富的语言扩展,我们可以十分便捷地在任何地方调用 HBase 服务。文本将讲述的就是如何使用 Thrift 和 Python 来读写 HBase。
-
- 安装库
pip install thrift pip install hbase-thrift
-
- 启动
cd /usr/local/hbase bin/start-hbase.sh
-
- 检查
检查hbase和thrift
[root@master bin]# hbase shell SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/hbase-1.3.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.3.1, r930b9a55528fe45d8edce7af42fef2d35e77677a, Thu Apr 6 19:36:54 PDT 2017 hbase(main):001:0> list TABLE cmz_music_table emp hbase_company student test user 6 row(s) in 0.1730 seconds => ["cmz_music_table", "emp", "hbase_company", "student", "test", "user"] hbase(main):002:0> put 'user', '524382618264914241', 'info:name', 'zhangsan' 0 row(s) in 0.4270 seconds hbase(main):003:0> list TABLE cmz_music_table emp hbase_company student test user 6 row(s) in 0.0040 seconds => ["cmz_music_table", "emp", "hbase_company", "student", "test", "user"] hbase(main):004:0> scan 'student' ROW COLUMN+CELL 0 row(s) in 0.0330 seconds hbase(main):005:0> scan 'user' ROW COLUMN+CELL 524382618264914241 column=info:name, timestamp=1566215270939, value=zhangsan 1 row(s) in 0.0460 seconds hbase(main):006:0> disable student NameError: undefined local variable or method `student' for #<Object:0x611e5819> hbase(main):007:0> disable 'student' 0 row(s) in 2.3160 seconds hbase(main):008:0> drop 'student' 0 row(s) in 1.2510 seconds hbase(main):009:0> list TABLE cmz_music_table emp hbase_company test user 5 row(s) in 0.0130 seconds => ["cmz_music_table", "emp", "hbase_company", "test", "user"] 检查thrift [root@master hbase]# netstat -lnp|grep 9090 tcp6 0 0 :::9090 :::* LISTEN 8969/java thrift默认端口是9090
检查的目的就是保住hbase和thrift正常
2. 代码¶
code
# coding=utf-8 # author: caimengzhi at 2019-08-19 from thrift.transport import TSocket from thrift.transport.TTransport import TBufferedTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import ColumnDescriptor from hbase.ttypes import Mutation import sys class HBaseClient(object): def __init__(self, ip, port=9090): """ 建立与thrift server端的连接 """ # server端地址和端口设定 self.__transport = TBufferedTransport(TSocket.TSocket(ip, port)) # 设置传输协议 protocol = TBinaryProtocol.TBinaryProtocol(self.__transport) # 客户端 self.__client = Hbase.Client(protocol) # 打开连接 self.__transport.open() def __del__(self): self.__transport.close() def get_tables(self): """ 获得所有表 :return:表名列表 """ return self.__client.getTableNames() def create_table(self, table, *columns): """ 创建表格 :param table:表名 :param columns:列族名 """ func = lambda col: ColumnDescriptor(col) column_families = map(func, columns) self.__client.createTable(table, column_families) def put(self, table, row, columns): """ 添加记录 :param table:表名 :param row:行键 :param columns:列名 :return: """ func = lambda (k, v): Mutation(column=k, value=v) mutations = map(func, columns.items()) self.__client.mutateRow(table, row, mutations) def delete(self, table, row, column): """ 删除记录 :param table:表名 :param row:行键 """ self.__client.deleteAll(table, row, column) def scan(self, table, start_row="", columns=None): """ 获得记录 :param table: 表名 :param start_row: 起始行 :param columns: 列族 :param attributes: """ scanner = self.__client.scannerOpen(table, start_row, columns) func = lambda (k, v): (k, v.value) while True: r = self.__client.scannerGet(scanner) if not r: break yield dict(map(func, r[0].columns.items())) if __name__ == '__main__': print '1. 链接 hbase' client = HBaseClient('master') print '2. 创建表' if sys.argv[1] == "init": client.create_table('student', 'name', 'course') print '3. 插入数据' client.put("student", "1", {"name:": "Jack", "course:art": "88", "course:math": "100"}) print '4. 再次插入数据' client.put("student", "2", {"name:": "Tom", "course:art": "90", "course:math": "100"}) print '5. 再次插入数据' client.put("student", "3", {"name:": "Jerry"}) print '6. 删除数据' client.delete('student', '1', 'course:math') print '7. 打印数据' for v in client.scan('student'): print v
运行
[root@master hbase]# python opt_hbase.py init 1. 链接 hbase 2. 创建表 3. 插入数据 4. 再次插入数据 5. 再次插入数据 6. 删除数据 7. 打印数据 {'course:art': '88', 'name:': 'Jack'} {'course:art': '90', 'name:': 'Tom', 'course:math': '100'} {'name:': 'Jerry'}
hbase(main):011:0* list TABLE cmz_music_table emp hbase_company student test user 6 row(s) in 0.0130 seconds => ["cmz_music_table", "emp", "hbase_company", "student", "test", "user"] hbase(main):012:0> scan 'student' ROW COLUMN+CELL 1 column=course:art, timestamp=1566216127526, value=88 1 column=name:, timestamp=1566216127526, value=Jack 2 column=course:art, timestamp=1566216127547, value=90 2 column=course:math, timestamp=1566216127547, value=100 2 column=name:, timestamp=1566216127547, value=Tom 3 column=name:, timestamp=1566216127556, value=Jerry 3 row(s) in 0.0470 seconds hbase(main):013:0>