HBase学习笔记

Hbase架构,原理,搭建,练习,表设计,写表,读表的优化

Hadoop生态系统

HBase简介

HBase -Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、可实时读写的分布式数据库,利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务,主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)。

列式数据库

列式数据库是以列相关存储架构进行数据存储的数据库,主要适合与批量数据处理和即席查询。相对应的是行式数据库,数据以行相关的存储体系架构进行空间分配,主要适合与小批量的数据处理,常用于联机事务型数据处理。
列式数据库以行、列的二维表的形式存储数据,但是却以一维字符串的方式存储,例如以下的一个表:

这个简单的表包括员工代码(EmpId), 姓名字段(Lastname and Firstname)及工资(Salary).这个表存储在电脑的内存(RAM)和存储(硬盘)中。虽然内存和硬盘在机制上不同,电脑的操作系统是以同样的方式存储的。数据库必须把这个二维表存储在一系列一维的“字节”中,操作系统把它们写到内存或硬盘中。

行式数据库把一行中的数据值串在一起存储起来,然后再存储下一行的数据,以此类推。
1,Smith,Joe,40000;2,Jones,Mary,50000;3,Johnson,Cathy,44000;
列式数据库把一列中的数据值串在一起存储起来,然后再存储下一列的数据,以此类推。
1,2,3;Smith,Jones,Johnson;Joe,Mary,Cathy;40000,50000,44000;

HBase数据模型

在HBase中,数据是存储在有行有列的表格中。这是与关系型数据库重复的术语,但不能做类比。相反,HBase可以被认为是一个多维度的映射。

HBase数据模型术语

Table(表格)

一个HBase表格由多行组成。

Row(行)

HBase中的行里面包含一个key和一个或者多个包含值的列。行按照行的key字母顺序存储在表格中。Row key只能存储64k的字节数据。因为这个原因,行的key的设计就显得非常重要。数据的存储目标是相近的数据存储到一起。一个常用的行的key的格式是网站域名。如果你的行的key是域名,你应该将域名进行反转(org.apache.www, org.apache.mail, org.apache.jira)再存储。这样的话,所有Apache域名将会存储在一起,好过基于子域名的首字母分散在各处。

Column(列)

HBase中的列包含用:分隔开的列族和列的限定符。

Column Family(列族)

因为性能的原因,列族物理上包含一组列和它们的值。每一个列族拥有一系列的存储属性,例如值是否缓存在内存中,数据是否要压缩或者他的行key是否要加密等等。表格中的每一行拥有相同的列族,尽管一个给定的行可能没有存储任何数据在一个给定的列族中。

Column Qualifier(列的限定符)

列的限定符是列族中数据的索引。例如给定了一个列族content,那么限定符可能是content:html,也可以是content:pdf。列族在创建表格时是确定的了,但是列的限定符是动态地并且行与行之间的差别也可能是非常大的。
HBase表中的每个列都归属于某个列族,列族必须作为表模式(schema)定义的一部分预先给出。如 create ‘test’, ‘course’;
列名以列族作为前缀,每个“列族”都可以有多个列成员(column);如course:math, course:english,新的列族成员(列)可以随后按需、动态加入;
权限控制、存储以及调优都是在列族层面进行的;
HBase把同一列族里面的数据存储在同一目录下,由几个文件保存。

Cell(单元)

由行和列的坐标交叉决定;
单元格是有版本的;
单元格的内容是未解析的字节数组;
单元格是由行、列族、列限定符、值和代表值版本的时间戳组成的({row key, column( = +), version} )唯一确定单元格。cell中的数据是没有类型的,全部是字节码形式存储。

Timestamp(时间戳)

时间戳是写在值旁边的一个用于区分值的版本的数据。默认情况下,时间戳表示的是当数据写入时RegionSever的时间点,但你也可以在写入数据时指定一个不同的时间戳。
在HBase每个cell存储单元对同一份数据有多个版本,根据唯一的时间戳来区分每个版本之间的差异,不同版本的数据按照时间倒序排序,最新的数据版本排在最前面。
时间戳的类型是64位整型。时间戳可以由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。

概念视图

例子:
一个名为webable的表格,表格中有两行(com.cnn.www 和 com.example.www)和三个列族(contents, anchor和people)。在这个例子当中,第一行(com.cnn.www)中anchor包含两列(anchor:cssnsi.com, anchor:my.look.ca)和content包含一列(contents:html)。这个例子中com.cnn.www拥有5个版本而com.example.www有一个版本。contents:html列中包含给定网页的整个HTML。anchor限定符包含能够表示行的站点以及链接中文本。People列族表示跟站点有关的人。
Table webtable

在HBase中,表格中的单元如果是空将不占用空间或者事实上不存在。这就使得HBase看起来“稀疏”。表格视图不是唯一方式来查看HBase中数据,甚至不是最精确的。下面的方式以多维度映射的方式来表达相同的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"com.cnn.www": {
contents: {
t6: contents:html: "<html>..."
t5: contents:html: "<html>..."
t3: contents:html: "<html>..."
}
anchor: {
t9: anchor:cnnsi.com = "CNN"
t8: anchor:my.look.ca = "CNN.com"
}
people: {}
}
"com.example.www": {
contents: {
t5: contents:html: "<html>..."
}
anchor: {}
people: {
t5: people:author: "John Doe"
}
}
}

物理视图

尽管一个概念层次的表格可能看起来是由一些列稀疏的行组成,但他们是通过列族来存储的。一个新建的限定符(column_family:column_qualifier)可以随时地添加到已存在的列族中。
ColumnFamily anchor
ColumnFamily contents

概念视图中的空单元实际上是没有进行存储的。因此对于返回时间戳为t8的contents:html的值的请求,结果为空。同样的,一个返回时间戳为t9的anchor:my.look.ca的值的请求,结果也为空。然而,如果没有指定时间戳的话,那么会返回特定列的最新值。对有多个版本的列,优先返回最新的值,因为时间戳是按照递减顺序存储的。因此对于一个返回com.cnn.www里面所有的列的值并且没有指定时间戳的请求,返回的结果会是时间戳为t6的contents:html 的值、时间戳 t9的anchor:cnnsi.com f的值和时间戳t8的anchor:my.look.ca。

HBase体系架构

Client

包含访问HBase的接口并维护cache来加快对HBase的访问

Zookeeper

  • 保证任何时候,集群中只有一个工作状态和master。
  • 存储所有Region的寻址入口。
  • 实时监控Region server的上线和下线信息。并实时通知Master。
  • 存储HBase的schema和table元数据。

Master

  • 为Region server分配region
  • 负责Region server的负载均衡
  • 发现失效的Region server并重新分配其上的region
  • 管理用户对table的增删改操作

RegionServer

Region server维护region,处理对这些region的IO请求。
Region server负责切分在运行过程中变得过大的region。

存储模型

region

HBase自动把表水平划分成多个区域(region),每个region会保存一个表里面某段连续的数据;每个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值(hbase.hregion.max.filesize默认为256M)的时候,region就会等分会两个新的region(裂变);当table中的行不断增多,就会有越来越多的region。这样一张完整的表被保存在多个Regionserver 上。

HLog(WAL log)

HLog文件就是一个普通的Hadoop Sequence File,Sequence File的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是“写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中的sequence number。
HLog SequeceFile的Value是HBase的KeyValue对象,即对应HFile中的KeyValue。

数据模型:Memstore 与 storefile

一个region由多个store组成,一个store对应一个CF(列族)。store包括位于内存中的memstore和位于磁盘的storefile,写操作先写入memstore,当memstore中的数据达到某个阈值,hregionserver会启动flashcache进程写入storefile,每次写入形成单独的一个storefile,当storefile文件的数量增长到一定阈值后,系统会进行合并(minor、major compaction),在合并过程中会进行版本合并和删除工作(majar),形成更大的storefile
当一个region所有storefile的大小和数量超过一定阈值后,会把当前的region分割为两个,并由hmaster分配到相应的regionserver服务器,实现负载均衡,客户端检索数据,先在memstore找,找不到再找storefile。

HRegion是HBase中分布式存储和负载均衡的最小单元。最小单元就表示不同的HRegion可以分布在不同的 HRegion server上。HRegion由一个或者多个Store组成,每个store保存一个columns family。每个Strore又由一个memStore和0至多个StoreFile组成。如图:StoreFile以HFile格式保存在HDFS上。

HBase伪分布式搭建(node01)

0.启动zookeeper集群和hadoop集群

zkServer.sh start
start-dfs.sh
start-yarn.sh
yarn-daemon.sh start resourcemanager
JDK安装好,环境变量配置好

1.解压hbase安装包

[root@node01 sxt]# tar xf hbase-0.98.12.1-hadoop2-bin.tar.gz

2.配置环境变量
1
2
3
4
5
6
7
8
9
10
[root@node01 sxt]# vi + /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_67
export PATH=$PATH:$JAVA_HOME/bin
export HADOOP_PREFIX=/opt/sxt/hadoop-2.6.5
export PATH=$PATH:$HADOOP_PREFIX/bin:$HADOOP_PREFIX/sbin
export ZOOKEEPER_PREFIX=/opt/sxt/zookeeper-3.4.6
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export HBASE_HOME=/opt/sxt/hbase-0.98.12.1
export PATH=$PATH:$HBASE_HOME/bin
[root@node01 sxt]# . /etc/profile
3.hbase-env.sh中配置JAVA_HOME
1
2
3
[root@node01 sxt]# cd hbase-0.98.12.1/conf/
[root@node01 conf]# vi hbase-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_67
4.修改hbase-site.xml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@node01 conf]# vi hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///var/hbase/local</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/var/hbase/local/zookeeper</value>
</property>
</configuration>
5.启动hbase

[root@node01 conf]# start-hbase.sh

6.进入hbase命令行

[root@node01 conf]# hbase shell

基础命令练习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
hbase删除已经打好的命令:ctrl+backspace
hbase(main):002:0> version
0.98.12.1-hadoop2, rb00ec5da604d64a0bdc7d92452b1e0559f0f5d73, Sun May 17 12:55:03 PDT 2015
hbase(main):003:0> whoami
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/sxt/hbase-0.98.12.1/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/sxt/hadoop-2.6.5/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBi
nder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
root (auth:SIMPLE)
groups: root
建表
表名 列族 列族
hbase(main):004:0>hbase(main):006:0> create 'person','name', 'age'
查看有哪些表
hbase(main):007:0> list
TABLE
person
查看表描述
hbase(main):010:0> describe 'person'
Table person is ENABLED
person
COLUMN FAMILIES DESCRIPTION
{NAME => 'age', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE',
MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE',
BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
{NAME => 'name', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE',
MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE',
BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
NAME:列族名
VERSIONS:最大版本号
MIN_VERSIONS:最小版本号
TTL(Time To Live):存活时间
IN_MEMORY:是否开启缓存,默认false,应该开启,否则与BLOCKCACHE冲突
BLOCKCACHE:读缓存是否开启,默认开启,64M
插入数据
hbase(main):012:0> put 'person','0001','name:firstname','Jed'
hbase(main):013:0> put 'person','0001','age:zhousui','20'
查看全部数据
hbase(main):014:0> scan 'person'
ROW COLUMN+CELL
0001 column=age:zhousui, timestamp=1499929503879, value=20
0001 column=name:firstname, timestamp=1499929407656, value=Jed
查看个别数据
hbase(main):015:0> get 'person','0001','name:firstname'
COLUMN CELL
name:firstname timestamp=1499929407656, value=Jed
修改数据
hbase(main):016:0> put 'person','0001','name:firstname','Tom'
hbase(main):017:0> get 'person','0001','name:firstname'
COLUMN CELL
name:firstname timestamp=1499929924936, value=Tom
查看表空间
hbase(main):018:0> list_namespace
NAMESPACE
default #用户创建的表放在这里
hbase #系统表空间
进入存放数据的目录/var/hbase/local(在hbase-site.xml中配置过)
[root@node01 ~]# cd /var/hbase/local/
[root@node01 local]# ls
archive data hbase.id hbase.version oldWALs WALs zookeeper
data是存放数据的目录,oldWAL和WALs是Hlog
[root@node01 local]# cd data
[root@node01 data]# ls
default hbase
[root@node01 data]# cd default/
[root@node01 default]# ls
person
[root@node01 default]# cd person/
[root@node01 person]# ls
b14e1200e562fb736ce81df88d712823
[root@node01 person]# cd b14e1200e562fb736ce81df88d712823/
[root@node01 b14e1200e562fb736ce81df88d712823]# ls
age name
[root@node01 b14e1200e562fb736ce81df88d712823]# cd age
[root@node01 age]# cd ls
总用量 0
age和name下没有数据,因为数据还在内存中,我们设置强制溢写
hbase(main):028:0> flush 'person'
root@node01 b14e1200e562fb736ce81df88d712823]# cd age/
[root@node01 age]# ls
06c01947d23e4fafa3a95bd407cc8c94
[root@node01 age]# hbase hfile -p -f 06c01947d23e4fafa3a95bd407cc8c94
K: 0001/age:zhousui/1499931020776/Put/vlen=2/mvcc=0 V: 20
删除表
hbase(main):021:0> disable 'person' #先让表禁用
hbase(main):022:0> drop 'person' #再删除表

HBase完全分布式搭建

角色分布

在生产中,不要把namenode和HbaseMaster配置到一台机器上。

0. 再准备一台机器node05
1
2
3
4
5
6
7
8
vi /etc/sysconfig/network-scripts/ifcfg-eth0
vi /etc/sysconfig/network
vi /etc/hosts
编辑C:\Windows\System32\drivers\etc\hosts文件
node05免秘钥
[root@node01 .ssh]# vi authorized_keys 复制一行,末尾改为node05
[root@node05 ~]# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
[root@node01 .ssh]# scp ./* node04:`pwd`

安装jdk
一定要保证集群中所有机器的系统时间相同!

1.从node01上分发hbase目录到其他机器
1
2
3
4
[root@node01 sxt]# scp -r hbase-0.98.12.1 node02:`pwd`
[root@node01 sxt]# scp -r hbase-0.98.12.1 node03:`pwd`
[root@node01 sxt]# scp -r hbase-0.98.12.1 node04:`pwd`
[root@node01 sxt]# scp -r hbase-0.98.12.1 node05:`pwd`
2.修改node01中hbase的配置文件hbase-env.sh
1
2
3
[root@node01 conf]# vi hbase-env.sh
#不使用自带的zookeeper
export HBASE_MANAGES_ZK=false
3.修改node01中hbase的配置文件hbase-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@node01 conf]# vi hbase-site.xml
<configuration>
<!--
可以不配置,如果要配置,需要和zookeeper配置文件中的路径相同
$ZOOKEEPER_HOME/conf/zoo.cfg中dataDir=/var/sxt/zookeeper
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/var/sxt/zookeeper </value>
</property>
-->
<property>
<name>hbase.rootdir</name>
<value>hdfs://mycluster/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>node02,node03,node04</value>
</property>
</configuration>
4.修改node01中hbase的配置文件regionservers

[root@node01 conf]# vi regionservers
node02
node03
node04

5.新建backup-masters文件,并做修改

[root@node01 conf]# vi backup-masters
node05

6.把hdfs-site.xml copy到hbase的配置目录下

[root@node01 conf]# cp /opt/sxt/hadoop-2.6.5/etc/hadoop/hdfs-site.xml ./

7.同步配置文件
1
2
3
4
[root@node01 conf]# scp ./* node02:`pwd`
[root@node01 conf]# scp ./* node03:`pwd`
[root@node01 conf]# scp ./* node04:`pwd`
[root@node01 conf]# scp ./* node05:`pwd`
8.启动集群(先启动ZK和Hadoop集群)

[root@node01 conf]# start-hbase.sh

9.验证
1
2
3
4
hbase(main):004:0> create 'person','0001','col1','col2'
hbase(main):005:0> list
TABLE
person


配置成功!

验证集群的高可用

在node01的hbase中创建了一个表person
杀死node01的HMaster

1
2
3
4
5
6
7
8
9
10
11
12
[root@node01 ~]# jps
3220 DFSZKFailoverController
6210 Jps
5740 HMaster
2876 NameNode
3062 JournalNode
[root@node01 ~]# kill -9 5740
[root@node01 ~]# jps
3220 DFSZKFailoverController
2876 NameNode
6295 Jps
3062 JournalNode

此时node01的hbase已经不能访问了

node05变成了Master:

在node05上进入hbase客户端,查看表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hbase(main):001:0> list
TABLE
person
```
#### Bug解决1
配置好后启动hbase,发现node05的jps中有HMaster,而node01没有,访问网页,node01无法连接,node05无法获取master
Hadoop集群和Zookeeper没有任何问题,HBASE_MANAGES_ZK=false也设置了,查看日志,报错:
Caused by: org.apache.hadoop.hbase.MasterNotRunningException: java.io.IOException: Can't get master address from ZooKeeper; znode data == null
百思不得其解,最后发现,配置文件有问题:
```xml
<property>
<name>hbase.rootdir</name>
<value>hdfs://mycluster</value>
</property>
这里正确应该是:
<property>
<name>hbase.rootdir</name>
<value>hdfs://mycluster/hbase</value>
</property>

Bug解决2

验证hbase高可用,down掉主机后,备机并没有变成主机,两台机器的HMaster服务都关闭了,原因:解决上个bug修改配置文件后,没有同步!
解决办法:
(一) 停止hbase进程
主机上执行stop-hbase.sh
(二) 如果HregionServer进程还在,手动停止node02、node03、node03上的HregionServer
kill -9 HregionServerId
(三) 清除脏数据
删除HDFS中的hbase目录:hdfs dfs -rm -r /hbase
删除Zookeeper中的 hbase目录:

1
2
3
4
5
6
7
[root@node02 conf]# zkCli.sh
Welcome to ZooKeeper!
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, hadoop-ha, yarn-leader-election, zookeeper]
[zk: localhost:2181(CONNECTED) 1] rmr /hbase
[zk: localhost:2181(CONNECTED) 2] ls /
[hadoop-ha, yarn-leader-election, zookeeper]

(四) 修改之前出错的配置文件
同步node01和node05的配置文件
(五) 重启hbase,问题解决
一定要同步配置文件!

API

HBase依赖jar包为安装包lib目录下的jar包,可以全部引入,也可以只引入最少依赖包,再引入之前的hadoop依赖包,最少依赖包是hbase开头的包和high-scale-lib-1.1.1.jar、htrace-core-2.04.jar、netty-3.6.6.Final.jar三个包。
项目需要添加hbase-site.xml配置文件,与MapReduce整合时需要hadoop的相关配置文件。

需求:通话记录查询

表:本机号码 主叫/被叫(0/1) 通话时间 对方号码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
HBaseAdmin hbaseAdmin;
HTable hTable;
String tableName = "call";
     
@Before
public void begin() throws Exception {
Configuration conf = new Configuration();
//指定hbase的zk集群
//一定要手动设置
//如果是伪分布式,指定伪分布式那台服务器
       conf.set("hbase.zookeeper.quorum","node02,node03,node04");    
hbaseAdmin = new HBaseAdmin(conf);
hTable = new HTable(conf, tableName);
}
     
@After
public void end() throws Exception {
      if(hbaseAdmin != null){
        hbaseAdmin.close();
       }
if(hTable != null){
     hTable.close();
}
}

建表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Test
public void createTable() throws Exception {
         
    if(hbaseAdmin.tableExists(tableName)) {
        hbaseAdmin.disableTable(tableName);
        hbaseAdmin.deleteTable(tableName);
    }
         
    //表名
    HTableDescriptor desc = new
        HTableDescriptor(TableName.valueOf(tableName));
         
    /*
     * 列族的数目
     * HBase currently does not do well with anything 
     * above two or three column families 
     * so keep the number of column families in your schema low.
     * hbase对超过3个的列族,支持不太好
     */
     HColumnDescriptor family = new HColumnDescriptor("cf1");
         
     family.setBlockCacheEnabled(true);//开启读缓存,默认就为true
     family.setInMemory(true);//开启缓存,默认false
     family.setMaxVersions(1);//最大版本数,默认就是1
         
     desc.addFamily(family);
         
     hbaseAdmin.createTable(desc);  
}
插入数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void insert() throws Exception {
         
    //rowkey设置为:手机号_时间戳
    String rowkey = "18734590000_20170714104600";
    Put put = new Put(rowkey.getBytes());
    //通话类型:主叫/被叫(1/0)
    put.add("cf1".getBytes(), "type".getBytes(), "1".getBytes());
    //通话时间
    put.add("cf1".getBytes(), "time".getBytes(), 
        "2017-07-14 10:46:00".getBytes());
    //对方号码
    put.add("cf1".getBytes(), "oppoPhoneNum".getBytes(),       
        "13876580000".getBytes());
         
    hTable.put(put);
}
查询数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void get() throws Exception {
         
    String rowkey = "18734590000_20170714104600";
         
    Get get = new Get(rowkey.getBytes());
    get.addColumn("cf1".getBytes(), "time".getBytes());
    get.addColumn("cf1".getBytes(), "oppoPhoneNum".getBytes());
    
    Result result = hTable.get(get);
         
    Cell cell1 = result.getColumnLatestCell("cf1".getBytes(),              "time".getBytes());
    Cell cell2 = result.getColumnLatestCell("cf1".getBytes(), 
        "oppoPhoneNum".getBytes());
   
    System.out.println(new String(CellUtil.cloneValue(cell1)));
    System.out.println(new String(CellUtil.cloneValue(cell2)));
         
}
插入10个手机号,每个手机号有100条通话记录, 时间按降序排列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Test
public void insertDB() throws Exception {
         
    List<Put> puts = new ArrayList<>();
         
    for(int i=0; i<10; i++){
        String rowkey;
        String phoneNum = getPhoneNum("186");
        for(int j=0; j<100; j++){
            String date = getDate("2017");
            SimpleDateFormat sdf =
new SimpleDateFormat("yyyyMMddHHmmss");
            long time = sdf.parse(date).getTime();
            rowkey = phoneNum + (Long.MAX_VALUE - time);
            System.out.println(rowkey);
                  
            Put put = new Put(rowkey.getBytes());
            put.add("cf1".getBytes(),"type".getBytes(),
(random.nextInt(2)+"").getBytes());
            put.add("cf1".getBytes(),"time".getBytes(),
(date).getBytes());
            put.add("cf1".getBytes(),"oppoPhoneNum".getBytes(),
getPhoneNum("137").getBytes());
                  
            puts.add(put);
        }
    }
         
    hTable.put(puts);
}
public Random random = new Random();
     
/**
 * 随机生成手机号
 * @param prefix 手机号前三位
 */
public String getPhoneNum(String prefix) {
//生成的数字如果不够8位,用0补充
    return prefix + String.format("%08d", random.nextInt(99999999));
}
     
/**
 * 随机生成时间
 * @param year 年
 * @return 时间  格式:yyyyMMddHHmmss
 */
public String getDate(String year) {
    return year + String.format("%02d%02d%02d%02d%02d",
        new Object[] {random.nextInt(12)+1,random.nextInt(29)+1,
                      random.nextInt(24),random.nextInt(60),
                      random.nextInt(60)});
}
查询某个手机号某个月的通话记录:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * 手机号:18698423056
 * 时间:2017年1月
 */
@Test
public void scanDB() throws Exception {
    Scan scan = new Scan();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
String startRowKey = "18698423056" +
(Long.MAX_VALUE - sdf.parse("20170201000000").getTime());
String stopRowKey = "18698423056" +
(Long.MAX_VALUE - sdf.parse("20170101000000").getTime());
         
    scan.setStartRow(startRowKey.getBytes());
    scan.setStopRow(stopRowKey.getBytes());
         
    ResultScanner rss = hTable.getScanner(scan);
    for(Result rs : rss){
        Cell cell1 = rs.getColumnLatestCell(
     "cf1".getBytes(), "type".getBytes());
Cell cell2 = rs.getColumnLatestCell(
     "cf1".getBytes(), "time".getBytes());
Cell cell3 = rs.getColumnLatestCell(
     "cf1".getBytes(), "oppoPhoneNum".getBytes());
System.out.println(
     new String(CellUtil.cloneValue(cell1)) + "-" +
     new String(CellUtil.cloneValue(cell2)) + "-" +
     new String(CellUtil.cloneValue(cell3))
);
    }       
}
查询某个手机号所有的主叫(type=0)的通话记录:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * 手机号:18696384891
 */
 @Test
 public void scanDB2() throws Exception {
     FilterList list =
new FilterList(FilterList.Operator.MUST_PASS_ALL);
         
     PrefixFilter filter1 =
new PrefixFilter("18696384891".getBytes());
          
     SingleColumnValueFilter filter2 =
new SingleColumnValueFilter(
"cf1".getBytes(), "type".getBytes(),
      CompareOp.EQUAL, "0".getBytes());
         
     list.addFilter(filter1);
     list.addFilter(filter2);
         
     Scan scan = new Scan();
     scan.setFilter(list);
         
     ResultScanner rss = hTable.getScanner(scan);
     for (Result rs : rss) {
         Cell cell1 = rs.getColumnLatestCell(
"cf1".getBytes(), "type".getBytes());
         Cell cell2 = rs.getColumnLatestCell(
"cf1".getBytes(), "time".getBytes());
         Cell cell3 = rs.getColumnLatestCell(
"cf1".getBytes(), "oppoPhoneNum".getBytes());
         System.out.println(
             new String(CellUtil.cloneValue(cell1)) + "-" +
             new String(CellUtil.cloneValue(cell2)) + "-" +
             new String(CellUtil.cloneValue(cell3))
         );
     }
}

设计表

人员-角色

人员有多个角色,角色考虑优先级,角色有多个人员
人员可以添加删除角色
角色可以添加删除人员
人员和角色可以添加删除

组织架构组:部门-子部门

微博表设计

关注列表:添加、取消
粉丝列表
查看首页:所有关注过的好友微博,时间降序排序
查看某个用户的微博,时间降序排序
发布微博


Protobuf

Protocol Buffers是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC 数据交换格式。可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。目前提供了 C++、Java、Python 三种语言的 API。

安装protobuf(node05)

1
2
3
4
5
[root@node05 sxt]# tar zxvf protobuf-2.5.0.tar.gz
# 安装一些常用软件需要依赖的包
[root@node05 protobuf-2.5.0]# yum groupinstall Development tools -y
[root@node05 protobuf-2.5.0]# ./configure --prefix=/opt/sxt/protobuf
[root@node05 protobuf-2.5.0]# make && make install

对通话记录案例中的列和行进行整合优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@node05 ~]# vi call.proto
package com.sxt.hbase;
#通话记录详情
message Record {
required string type = 1;
required string time = 2;
required string oppoPhoneNum = 3;
}
#一天的通话记录
message RecordList {
repeated Record rlist = 1;
}
[root@node05 ~]# /opt/sxt/protobuf/bin/protoc --java_out=./ call.proto
[root@node05 ~]# ls
anaconda-ks.cfg call.proto com install.log install.log.syslog
[root@node05 ~]# cd com
[root@node05 com]# ls
sxt
[root@node05 com]# cd sxt
[root@node05 sxt]# ls
hbase
[root@node05 sxt]# cd hbase
[root@node05 hbase]# ls
Call.java
# Call.java是对call.proto的封装

把Call.java放到查询通话记录的项目中,Call.java对列和行分别进行了封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
 * 随机生成时间
 * @param time yyyyMMdd
 * @return 时间  格式:yyyyMMddHHmmss
 */
public String getDate2(String time) {
    return time + String.format("%02d%02d%02d",
                                new Object[] {
                                    random.nextInt(24),
                                    random.nextInt(60),
                                    random.nextInt(60)
                                });
}
/**
 * 插入10个手机号,每个手机号一天产生100条通话记录
 * 一天当中所有的通话记录封装到一起
 * 时间按降序排列
 */
@Test
public void insertDB2() throws Exception {
         
    List<Put> puts = new ArrayList<>();
         
    for(int i=0; i<10; i++){
        String rowkey;
        String phoneNum = getPhoneNum("186");
        String date = "20170714000000";
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        rowkey = phoneNum + "_" +
(Long.MAX_VALUE-(sdf.parse(date).getTime()));
              
        Call.RecordList.Builder recordList = Call.RecordList.newBuilder();
              
        for(int j=0; j<100; j++){
            String dnum = getPhoneNum("138");
            String dateStr = getDate2("20170714");
            String type = random.nextInt(2) + "";
                  
            Call.Record.Builder record = Call.Record.newBuilder();
            record.setType(type);
            record.setTime(dateStr);
            record.setOppoPhoneNum(dnum);
                  
            recordList.addRlist(record);
        }
              
        Put put = new Put(rowkey.getBytes());
        //这一个手机号一天的记录放到cf1中,名称为records,可以任意命名
        put.add("cf1".getBytes(), "records".getBytes(), recordList.build().toByteArray());
puts.add(put);
    }
         
    hTable.put(puts);
}
/**
 * 查询某个手机号1天的通话记录
 * @throws Exception
 */
@Test
public void getDB2() throws Exception {
    String rowkey = "186970848059223370536893175807";
    Get get = new Get(rowkey.getBytes());
         
    get.addColumn("cf1".getBytes(), "records".getBytes());
         
    Result rs = hTable.get(get);
    Cell cell = rs.getColumnLatestCell("cf1".getBytes(),     
        "records".getBytes());
         
    Call.RecordList recordList = Call.RecordList.parseFrom(
        CellUtil.cloneValue(cell));
         
    for(Call.Record record : recordList.getRlistList()) {
              System.out.println(record.getType() + " - "
                  record.getTime() + " - "
                  record.getOppoPhoneNum());
    }
         
}

HBase优化

设计表的优化

1.Pre-Creating Regions(预分区)

默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按照region分区情况,在集群内做数据的负载均衡。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static boolean createTable(HBaseAdmin admin, 
    HTableDescriptor table, byte[][] splits) 
    throws IOException {
    try {
        admin.createTable(table, splits);
        return true;
    } catch (TableExistsException e) {
        logger.info("table " + table.getNameAsString() +
" already exists");
        // the table already exists...
        return false;
    }
}
/**
 * 当rowkey是数字类型时,使用以下规则分区
 * start:001
 * endkey:100
 * region:10个,[001,010][011,020]...
 */
public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
    byte[][] splits = new byte[numRegions-1][];
    BigInteger lowestKey = new BigInteger(startKey, 16);
    BigInteger highestKey = new BigInteger(endKey, 16);
    BigInteger range = highestKey.subtract(lowestKey);
    BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
    lowestKey = lowestKey.add(regionIncrement);
    for(int i=0; i < numRegions-1;i++) {
     BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
     byte[] b = String.format("%016x", key).getBytes();
     splits[i] = b;
    }
    return splits;
}

2.rowkey的设计

HBase中row key用来检索表中的记录,支持以下三种方式:
• 通过单个row key访问:即按照某个row key键值进行get操作;
• 通过row key的range进行scan:即通过设置startRowKey和endRowKey,在这个范围内进行扫描;
• 全表扫描:即直接扫描整张表中所有行记录。
在HBase中,row key可以是任意字符串,最大长度64KB,实际应用中一般为10~100 bytes,存为byte[]字节数组,一般设计成定长的。
row key是按照字典序存储,因此,设计row key时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。
举个例子:如果最近写入HBase表中的数据是最可能被访问的,可以考虑将时间戳作为row key的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作为row key,这样能保证新写入的数据在读取时可以被快速命中。
Rowkey规则:
1) 越小越好
2) Rowkey的设计是要根据实际业务来
3) 散列性
a) 取反 001 002 :100 200 取反后,rowkey可能落在不同的region上
b) Hash rowkey取hash值后,可能会均匀分布在不同的region上
散列的弊端:降低了范围查找的效率

3.Column Family

不要在一张表里定义太多的column family。目前Hbase并不能很好的处理超过2~3个column family的表。因为某个column family在flush的时候,它邻近的column family也会因关联效应被触发flush,最终导致系统产生更多的I/O。

4.In Memory

创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到RegionServer的缓存中,保证在读取的时候被cache命中。

  1. 标记IN_MEMORY=>’true’的column family的总体积最好不要超过in-memory cache的大小(in-memory cache = heap size hfile.block.cache.size 0.85 * 0.25),特别是当总体积远远大于了in-memory cache时,会在in-memory cache上发生严重的颠簸。

  2. 换个角度再看,普遍提到的使用in-memory cache的场景是把元数据表的column family声明为IN_MEMORY=>’true’。实际上这里的潜台词是:元数据表都很小。其时我们也可以大胆地把一些需要经常访问的,总体积不会超过in-memory cache的column family都设为IN_MEMORY=>’true’从而更加充分地利用cache空间。就像前面提到的,普通的block永远是不会被放入in-memory cache的,只存放少量metadata是对in-memory cache资源的浪费(未来的版本应该提供三种区段的比例配置功能)

5.Max Version

创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置setMaxVersions(1)。

6.Time To Live

创建表的时候,可以通过HColumnDescriptor.setTimeToLive(int timeToLive)设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置setTimeToLive(2 24 60 * 60)。

7.Compact & Split

在HBase中,数据在更新时首先写入WAL 日志(HLog)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值(64M)时,就会创建一个新的MemStore,并且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。于此同时,系统会在zookeeper中记录一个redo point,表示这个时刻之前的变更已经持久化了(minor compact)。
StoreFile是只读的,一旦创建后就不可以再修改。因此Hbase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(major compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行分割(split),等分为两个StoreFile,这里相当于把一个大的region分割成两个region。
由于对表的更新是不断追加的,处理读请求时,需要访问Store中全部的StoreFile和MemStore,将它们按照row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,通常合并过程还是比较快的。
实际应用中,可以考虑必要时手动进行major compact,将同一个row key的修改进行合并形成一个大的StoreFile。同时,可以将StoreFile设置大些,减少split的发生。
hbase为了防止小文件(被刷到磁盘的menstore)过多,以保证保证查询效率,hbase需要在必要的时候将这些小的store file合并成相对较大的store file,这个过程就称之为compaction。在hbase中,主要存在两种类型的compaction:minor compaction和major compaction。
minor compaction是较小、很少文件的合并。
major compaction是将所有的store file合并成一个,触发major compaction的可能条件有:major_compact 命令、majorCompact() API、region server自动运行(相关参数:hbase.hregion.majoucompaction 默认为24 小时、hbase.hregion.majorcompaction.jetter 默认值为0.2 ,作用是防止region server在同一时间进行major compaction)。
hbase.hregion.majorcompaction.jetter参数的作用是:
对参数hbase.hregion.majoucompaction 规定的值起到浮动的作用,假如两个参数都为默认值24和0.2,那么major compact最终使用的数值为:19.2~28.8 这个范围。
major compaction执行时,就无法对HBase进行访问,所以通常是关闭自动方式,根据业务,手动编程来控制其操作时间。
1.关闭自动major compaction,使用手动方式。
2.手动编程major compaction:Timer类(java),contab(shell)

minor compaction的运行机制要复杂一些,我们一把也不会来对它进行控制, 它由一下几个参数共同决定:
hbase.hstore.compaction.min
默认值为 3,表示至少需要三个满足条件的store file时,minor compaction才会启动
hbase.hstore.compaction.max
默认值为10,表示一次minor compaction中最多选取10个store file
hbase.hstore.compaction.min.size
表示文件大小小于该值的store file 一定会加入到minor compaction的store file中
hbase.hstore.compaction.max.size
表示文件大小大于该值的store file一定会被minor compaction排除
hbase.hstore.compaction.ratio
将store file 按照文件年龄排序(older to younger),minor compaction总是从older store file开始选择

写表的优化

1.多HTable并发写

创建多个HTable客户端用于写操作,提高写数据的吞吐量,一个例子:

1
2
3
4
5
6
7
8
static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = "user_log";
wTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    wTableLog[i] = new HTable(conf, table_log_name);
    wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
    wTableLog[i].setAutoFlush(false);
}

2.HTable参数设置
Auto Flush

通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。

Write Buffer

通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。

WAL Flag

在HBae中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机后的数据恢复。
因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。
值得注意的是:谨慎选择关闭WAL日志,因为这样的话,一旦RegionServer宕机,Put/Delete的数据将会无法根据WAL日志进行恢复。

3.批量写

通过调用HTable.put(Put)方法可以将一个指定的row key记录写入HBase,同样HBase提供了另一个方法:通过调用HTable.put(List)方法可以将指定的row key列表,批量写入多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高,网络传输RTT高的情景下可能带来明显的性能提升。

4.多线程并发写

在客户端开启多个HTable写线程,每个写线程负责一个HTable对象的flush操作,这样结合定时flush和写buffer(writeBufferSize),可以既保证在数据量小的时候,数据可以在较短时间内被flush(如1秒内),同时又保证在数据量大的时候,写buffer一满就及时进行flush。下面给个具体的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for (int i = 0; i < threadN; i++) {
    Thread th = new Thread() {
        public void run() {
            while (true) {
                try {
                    sleep(1000); //1 second
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (wTableLog[i]) {
                    try {
                        wTableLog[i].flushCommits();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };
    th.setDaemon(true);
    th.start();
}

但实际上,自己写多线程或多Htable维护比较麻烦且不稳定,用mapreduce或spark写数据时,本身就是多线程的只需要批量写就可以了。

读表的优化

1. 多个HTable并发读

创建多个HTable客户端用于读操作,提高读数据的吞吐量,一个例子:

1
2
3
4
5
6
7
static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = "user_log";
rTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    rTableLog[i] = new HTable(conf, table_log_name);
    rTableLog[i].setScannerCaching(50);
}

2.HTable参数设置
Scanner Caching

hbase.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以进行配置:
a) 在HBase的conf配置文件中进行配置;
b) 通过调用HTable.setScannerCaching(int scannerCaching)进行配置;
c) 通过调用Scan.setCaching(int caching)进行配置。
三者的优先级越来越高。
少的RPC是提高hbase执行效率的一种方法,理论上一次性获取越多数据就会越少的RPC,也就越高效。但是内存是最大的障碍。设置这个值的时候要选择合适的大小,一面一次性获取过多数据占用过多内存,造成其他程序使用内存过少。或者造成程序超时等错误(这个超时与hbase.regionserver.lease.period相关)。
hbase.regionserver.lease.period默认值:60000
说明:客户端租用HRegion server 期限,即超时阀值。
调优:这个配合hbase.client.scanner.caching使用,如果内存够大,但是取出较多数据后计算过程较长,可能超过这个阈值,适当可设置较长的响应时间以防被认为宕机。

Scan Attribute Selection

scan时指定需要的Column Family,可以减少网络传输数据量,否则默认scan操作会返回整行所有Column Family的数据。

Close ResultScanner

通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。

3.批量读

通过调用HTable.get(Get)方法可以根据一个指定的row key获取一行记录,同样HBase提供了另一个方法:通过调用HTable.get(List)方法可以根据一个指定的row key列表,批量获取多行记录,这样做的好处是批量执行,只需要一次网络I/O开销,这对于对数据实时性要求高而且网络传输RTT高的情景下可能带来明显的性能提升。

4.多线程并发读

在客户端开启多个HTable读线程,每个读线程负责通过HTable对象进行get操作。下面是一个多线程并发读取HBase,获取店铺一天内各分钟PV值的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class DataReaderServer {
     // 获取店铺一天内各分钟PV值的入口函数
     public static ConcurrentHashMap<String, String> 
         getUnitMinutePV(long uid, long startStamp, long endStamp) {
         long min = startStamp;
         int count = (int) ((endStamp - startStamp) / (60 * 1000));
         List<String> lst = new ArrayList<String>();
         for (int i = 0; i <= count; i++) {
              min = startStamp + i * 60 * 1000;
              lst.add(uid + "_" + min);
         }
         return parallelBatchMinutePV(lst);
     }
     // 多线程并发查询,获取分钟PV值
     private static ConcurrentHashMap<String, String> 
         parallelBatchMinutePV(List<String> lstKeys) {
         ConcurrentHashMap<String, String> hashRet = 
             new ConcurrentHashMap<String, String>();
         int parallel = 3;
         List<List<String>> lstBatchKeys = null;
         if (lstKeys.size() < parallel) {
              lstBatchKeys = new ArrayList<List<String>>(1);
              lstBatchKeys.add(lstKeys);
         } else {
              lstBatchKeys = new ArrayList<List<String>>(parallel);
              for (int i = 0; i < parallel; i++) {
                  List<String> lst = new ArrayList<String>();
                  lstBatchKeys.add(lst);
              }
              for (int i = 0; i < lstKeys.size(); i++) {
                  lstBatchKeys.get(i % parallel).add(lstKeys.get(i));
              }
         }
         List<Future<ConcurrentHashMap<String, String>>> futures = 
             new ArrayList<Future<ConcurrentHashMap<String,
String>>>(5);
         ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
         builder.setNameFormat("ParallelBatchQuery");
         ThreadFactory factory = builder.build();
         ThreadPoolExecutor executor = 
             (ThreadPoolExecutor) Executors.
                 newFixedThreadPool(lstBatchKeys.size(), factory);
         for (List<String> keys : lstBatchKeys) {
              Callable<ConcurrentHashMap<String, String>> callable = 
                  new BatchMinutePVCallable(keys);
              FutureTask<ConcurrentHashMap<String, String>> future = 
                  (FutureTask<ConcurrentHashMap<String, String>>)
executor.submit(callable);
              futures.add(future);
         }
         executor.shutdown();
         // Wait for all the tasks to finish
         try {
              boolean stillRunning =
!executor.awaitTermination(
5000000, TimeUnit.MILLISECONDS);
              if (stillRunning) {
                  try {
                       executor.shutdownNow();
                  } catch (Exception e) {
                       e.printStackTrace();
                  }
              }
         } catch (InterruptedException e) {
              try {
                  Thread.currentThread().interrupt();
              } catch (Exception e1) {
                  // TODO Auto-generated catch block
                  e1.printStackTrace();
              }
         }
         // Look for any exception
         for (Future f : futures) {
              try {
                  if (f.get() != null) {
                       hashRet.putAll(
(ConcurrentHashMap<String, String>) f.get());
                  }
              } catch (InterruptedException e) {
                  try {
                       Thread.currentThread().interrupt();
                  } catch (Exception e1) {
                       e1.printStackTrace();
                  }
              } catch (ExecutionException e) {
                  e.printStackTrace();
              }
         }
         return hashRet;
     }
     // 一个线程批量查询,获取分钟PV值
     protected static ConcurrentHashMap<String, String> 
         getBatchMinutePV(List<String> lstKeys) {
         ConcurrentHashMap<String, String> hashRet = null;
         List<Get> lstGet = new ArrayList<Get>();
         String[] splitValue = null;
         for (String s : lstKeys) {
              splitValue = s.split("_");
              long uid = Long.parseLong(splitValue[0]);
              long min = Long.parseLong(splitValue[1]);
              byte[] key = new byte[16];
              Bytes.putLong(key, 0, uid);
              Bytes.putLong(key, 8, min);
              Get g = new Get(key);
              g.addFamily(fp);
              lstGet.add(g);
         }
         Result[] res = null;
         try {
              res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
         } catch (IOException e1) {
              logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
         }
         if (res != null && res.length > 0) {
              hashRet = new ConcurrentHashMap<String, String>(res.length);
              for (Result re : res) {
                  if (re != null && !re.isEmpty()) {
                       try {
                            byte[] key = re.getRow();
                            byte[] value = re.getValue(fp, cp);
                            if (key != null && value != null) {
                                 hashRet.put(
                                     String.valueOf(
                                         Bytes.toLong(key, Bytes.SIZEOF_LONG)),
                                      String.valueOf(Bytes.toLong(
value)));
                            }
                       } catch (Exception e2) {
                            logger.error(e2.getStackTrace());
                       }
                  }
              }
         }
         return hashRet;
     }
}
// 调用接口类,实现Callable接口
class BatchMinutePVCallable implements 
    Callable<ConcurrentHashMap<String, String>> {
     private List<String> keys;
     public BatchMinutePVCallable(List<String> lstKeys) {
         this.keys = lstKeys;
     }
     public ConcurrentHashMap<String, String> call() 
         throws Exception {
         return DataReadServer.getBatchMinutePV(keys);
     }
}

5.缓存查询结果

对于频繁查询HBase的应用场景,可以考虑在应用程序中做缓存,当有新的查询请求时,首先在缓存中查找,如果存在则直接返回,不再查询HBase;否则对HBase发起读请求查询,然后在应用程序中将查询结果缓存起来。至于缓存的替换策略,可以考虑LRU等常用的策略。

6.Blockcache

HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。
写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize hbase.regionserver.global.memstore.upperLimit 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。
读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize hfile.block.cache.size 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。
一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能启动。默认BlockCache为0.2,而Memstore为0.4。对于注重读响应时间的系统,可以将 BlockCache设大些,比如设置BlockCache=0.4,Memstore=0.39,以加大缓存的命中率。

HTable和HTablePool

HTable和HTablePool都是HBase客户端API的一部分,可以使用它们对HBase表进行CRUD操作。下面结合在项目中的应用情况,对二者使用过程中的注意事项做一下概括总结。

1
2
3
4
5
6
Configuration conf = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(conf)) {
     try (Table table = connection.getTable(TableName.valueOf(tablename)) {
    //use table as needed, the table returned is lightweight
     }
}

HTable

HTable是HBase客户端与HBase服务端通讯的Java API对象,客户端可以通过HTable对象与服务端进行CRUD操作(增删改查)。它的创建很简单:
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, “tablename”);
//TODO CRUD Operation……

HTable使用时的一些注意事项:
1) 规避HTable对象的创建开销
因为客户端创建HTable对象后,需要进行一系列的操作:检查.META.表确认指定名称的HBase表是否存在,表是否有效等等,整个时间开销比较重,可能会耗时几秒钟之长,因此最好在程序启动时一次性创建完成需要的HTable对象,如果使用Java API,一般来说是在构造函数中进行创建,程序启动后直接重用。
2) HTable对象不是线程安全的
HTable对象对于客户端读写数据来说不是线程安全的,因此多线程时,要为每个线程单独创建复用一个HTable对象,不同对象间不要共享HTable对象使用,特别是在客户端auto flash被置为false时,由于存在本地write buffer,可能导致数据不一致。
3) HTable对象之间共享Configuration
HTable对象共享Configuration对象,这样的好处在于:
• 共享ZooKeeper的连接:每个客户端需要与ZooKeeper建立连接,查询用户的table regions位置,这些信息可以在连接建立后缓存起来共享使用;
• 共享公共的资源:客户端需要通过ZooKeeper查找-ROOT-和.META.表,这个需要网络传输开销,客户端缓存这些公共资源后能够减少后续的网络传输开销,加快查找过程速度。
因此,与以下这种方式相比:

1
2
HTable table1 = new HTable("table1");
HTable table2 = new HTable("table2");

下面的方式更有效些:

1
2
3
Configuration conf = HBaseConfiguration.create();
HTable table1 = new HTable(conf, "table1");
HTable table2 = new HTable(conf, "table2");

备注:
即使是高负载的多线程程序,也并没有发现因为共享Configuration而导致的性能问题;如果你的实际情况中不是如此,那么可以尝试不共享Configuration。

HTablePool

HTablePool可以解决HTable存在的线程不安全问题,同时通过维护固定数量的HTable对象,能够在程序运行期间复用这些HTable资源对象。
Configuration conf = HBaseConfiguration.create();
HTablePool pool = new HTablePool(conf, 10);

1) HTablePool可以自动创建HTable对象,而且对客户端来说使用上是完全透明的,可以避免多线程间数据并发修改问题。
2) HTablePool中的HTable对象之间是公用Configuration连接的,能够可以减少网络开销。
HTablePool的使用很简单:每次进行操作前,通过HTablePool的getTable方法取得一个HTable对象,然后进行put/get/scan/delete等操作,最后通过HTablePool的putTable方法将HTable对象放回到HTablePool中。
下面是个使用HTablePool的简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void createUser(String username, String firstName, 
    String lastName, String email, 
    String password, String roles)
         throws IOException {
     HTable table = rm.getTable(UserTable.NAME);
   Put put = new Put(Bytes.toBytes(username));
   put.add(UserTable.DATA_FAMILY,             
        UserTable.FIRSTNAME,Bytes.toBytes(firstName));
   put.add(UserTable.DATA_FAMILY, 
        UserTable.LASTNAME,Bytes.toBytes(lastName));
   put.add(UserTable.DATA_FAMILY, UserTable.EMAIL, 
        Bytes.toBytes(email));
   put.add(UserTable.DATA_FAMILY,  
        UserTable.CREDENTIALS,Bytes.toBytes(password));
   put.add(UserTable.DATA_FAMILY, UserTable.ROLES, 
        Bytes.toBytes(roles));
   table.put(put);
   table.flushCommits();
   rm.putTable(table);
}

HBase和关系型数据库的区别

1.不能使用column之间过滤查询
2.不支持全文索引。需要和solr整合完成全文搜索。
a) 使用MR批量读取hbase中的数据,在solr里面建立索引(no store)之保存rowkey的值。
b) 根据关键词从索引中搜索到rowkey(分页)
c) 根据rowkey从hbase查询所有数据

HBase与MapReduce整合

使用HBase与MapReduce完成wordcount,输出的结果存入HBase的表中

创建表:
hbase(main):004:0> create 'wc','cf'
Job类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class WCJob {
     public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS", "hdfs://node01:8020");
         // 指定hbase的zk集群
         // 如果是伪分布式指定伪分布式那台服务器
         conf.set("hbase.zookeeper.quorum", "node02,node03,node04");
         // 在eclipse中直接运行MR程序,加这个配置
         conf.set("mapreduce.app-submission.cross-platform", "true");
         
         Job job = Job.getInstance(conf);
         
         job.setJobName("wc job");
         
         job.setJarByClass(WCJob.class);
         // 在eclipse中直接运行MR程序,加这个配置,
         // 把项目打成jar包,并在这里说明jar包存放位置
         job.setJar("E:\\package\\HBaseWC.jar");
         
         job.setMapperClass(WCMapper.class);
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(IntWritable.class);
         
         TableMapReduceUtil.initTableReducerJob(
                  "wc", // output table
                  WCReducer.class, // reducer class
                  job);
         
         // 参数boolean addDependencyJars 
// 当以本地方式运行MR时 ,必须设置为false
         /*TableMapReduceUtil.initTableReducerJob(
                  "wc",
                  WCReducer.class,
                  job,
                  null,
                  null,
                  null,
                  null,
                  false);*/
// 对initTableReducerJob()方法的参数的解析
         
         Path path = new Path(
"/user/root/wordcount/input/wordcount.txt");
         FileInputFormat.addInputPath(job, path);
         
         if(job.waitForCompletion(true)) {
              System.out.println("success!");
         }
     }
}

Mapper类

1
2
3
4
5
6
7
8
9
10
11
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
     @Override
     protected void map(LongWritable key, Text value, Context context)
              throws IOException, InterruptedException {
         
         String[] strs = StringUtils.split(value.toString(), ' ');
         for (String s : strs) {
              context.write(new Text(s), new IntWritable(1));
         }
     }
}

Reducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pulic class WCReducer extends
         TableReducer<Text, IntWritable, ImmutableBytesWritable> {
     
     public static final byte[] CF = "cf".getBytes();
       public static final byte[] COUNT = "count".getBytes();
       public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
         int i = 0;
         for (IntWritable val : values) {
           i += val.get();
         }
         Put put = new Put(Bytes.toBytes(key.toString()));
         put.add(CF, COUNT, Bytes.toBytes(i));
         context.write(null, put);
       }
}

查看结果:

1
2
3
4
5
6
7
8
9
hbase(main):007:0> scan 'wc'
ROW COLUMN+CELL
C column=cf:count, timestamp=1500100742828, value=\x00\x00\x00\x04
C++ column=cf:count, timestamp=1500100742828, value=\x00\x00\x00\x02
Hadoop column=cf:count, timestamp=1500100742828, value=\x00\x00\x00\x01
Java column=cf:count, timestamp=1500100742828, value=\x00\x00\x00\x04
MySQL column=cf:count, timestamp=1500100742828, value=\x00\x00\x00\x03
Oracle column=cf:count, timestamp=1500100742828, value=\x00\x00\x00\x01
Spring column=cf:count, timestamp=1500100742828, value=\x00\x00\x00\x01

参考资料

  1. HBase Block Cache的重要实现细节和In-Memory Cache的特点
  2. HBase的Block Cache实现机制分析