【zookeeper】Curator基础

简介

       Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端、zkClient相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator目前是Apache的顶级项目,里面提供了更多丰富的操作,例如session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂的zookeeper场景的API封装。

基础API

创建和开启连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.40.40:2181,192.168.40.41:2181,192.168.40.42:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms
//1 重试策略:初试时间为1S,重试10次
RetryPolicy retry = new ExponentialBackoffRetry(1000, 10);
//2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retry)
.build();
//3 开启连接
cf.start();

参数解释:

  • connectString 连接串
  • sessionTimeoutMs 会话超时时间,默认为60 000 ms
  • retryPolicy 重试连接策略。有四种实现分别为ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed
  • connectionTimeoutMs 连接超时时间,默认为15 000ms

注意:对于retryPolicy策略通过一个接口来让用户自定义实现。

新加和删除节点

创建节点create方法:可选链式项:createParentsIfNeeded、withMode、forPath、withACL等
删除节点delete方法,可选链式项:deletingChildrenIfNeeded、guaranteed、withVersion、forPath等

1
2
3
4
5
6
//4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
cf.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/super/c1","c1内容".getBytes());
//5 删除节点
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");

读取和修改节点

1
2
3
4
5
6
7
8
9
10
11
//读取、修改
// 创建节点
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes());
//读取节点
String ret1 = new String(cf.getData().forPath("/super/c2"));
System.out.println(ret1);
//修改节点
cf.setData().forPath("/super/c2","c2更新内容".getBytes());
String ret2 = new String(cf.getData().forPath("/super/c2"));
System.out.println(ret2);

绑定回调函数

异步绑定回调方法。比如创建节点时绑定一个回调函数,该回调函数可以输出服务器的状态码以及服务器事件类型。还可以加入一个线程池进行优化操作。

1
2
3
4
5
6
7
8
9
10
11
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("code:"+ curatorEvent.getResultCode());
System.out.println("type:"+ curatorEvent.getType());
System.out.println("当前线程:"+ Thread.currentThread().getName());
}
},pool)
.forPath("/super/c3","c3内容".getBytes());

读取子节点

1
2
3
4
5
//读取子节点getChildren方法和判断节点是否存在checkExists方法
List<String> list = cf.getChildren().forPath("/super");
for(String s : list) {
System.out.println(s);
}

判断节点是否存在

1
2
3
//checkExists() 不存在节点,返回的stat为null。存在则返回的stat结构体信息
Stat stat = cf.checkExists().forPath("/super");
System.out.println(stat);

Watch

如果要使用类似Watch的监听功能,Curator必须依赖一个jar包,maven依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>

有了这个依赖包,我们使用NodeCache的方式去客户端实例中注册一个监听缓存,然后实现对应的监听方法即可。这里我们主要有两种监听方式:

  • NodeCacheListener:监听节点的新增、修改操作
  • PathChildrenCacheListener:监听子节点的新增、修改、删除操作

NodeCacheListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//4 建立一个cache缓存
final NodeCache cache = new NodeCache(cf, "/super", false);
//boolean buildInitial:是否构建的时候初始化
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
/**
* 触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作。
*/
@Override
public void nodeChanged() throws Exception {
System.out.println("路径为:" + cache.getCurrentData().getPath());
System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
System.out.println("状态为:" + cache.getCurrentData().getStat());
System.out.println("---------------------------------------");
}
});

代码解释:
1、NodeCache

1
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed){...}

  • CuratorFramework client:连接
  • String path : 节点的路径
  • boolean dataIsCompressed: 是否压缩数据,不写底层默认为false

2、cache.start(true)

1
public void start(boolean buildInitial) throws Exception{...}

  • boolean buildInitial:是否构建的时候初始化

PathChildren…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//4 建立一个PathChildrenCache缓存,第三个参数是否为接收节点数据内容
PathChildrenCache cache = new PathChildrenCache(cf, "/super", false);
//5 在初始化的时候就进行缓存监听
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
/**
* 新建、修改、删除
*/
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED :" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED :" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED :" + event.getData().getPath());
break;
default:
break;
}
}
});

代码解释:
1、PathChildrenCache

1
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData){...}

  • CuratorFramework client:连接
  • String path : 节点的路径
  • boolean cacheData: 是否缓存数据,这里一般是为true

2、cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)

1
public void start(boolean buildInitial) throws Exception{...}

  • PathChildrenCache.StartMode.POST_INITIALIZED_EVENT:在初始化的时候就进行缓存监听
文章目录
  1. 1. 简介
  2. 2. 基础API
    1. 2.1. 创建和开启连接
    2. 2.2. 新加和删除节点
    3. 2.3. 读取和修改节点
    4. 2.4. 绑定回调函数
    5. 2.5. 读取子节点
    6. 2.6. 判断节点是否存在
  3. 3. Watch
    1. 3.1. NodeCacheListener
    2. 3.2. PathChildren…
|