【zookeeper】Curator实现基于zookeeper的分布式锁

分布式锁理论

       分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
       由于在平时的工作中,线上服务器是分布式多台部署的,经常会面临解决分布式场景下数据一致性的问题,在集群环境中都是多个JVM协同工作,需要一些全局锁,那么就要利用分布式锁来解决这些问题。在单机环境中,Java中其实提供了很多并发处理相关的API,但是这些API在分布式场景中就无能为力了。也就是说单纯的Java Api并不能提供分布式锁的能力

实现方案

  • 基于数据库实现分布式锁
  • 基于缓存(redis,memcached,tair)实现分布式锁
  • 基于Zookeeper实现分布式锁

分布式锁是什么?

  • 保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
  • 锁是可重入锁,避免死锁
  • 锁是阻塞锁,根据业务需求考虑是否需要
  • 高可用和高性能的获取锁和释放锁

归根结底:利用一个互斥方能够访问的公共资源来实现分布式锁,具体这个公共资源是redis来setnx,还是zookeeper,相反没有这么重要。

Maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
</dependency>

Zookeeper锁原理

  1. 首先要创建一个锁的根节点,比如/mylock。
  2. 想要获取锁的客户端在锁的根节点下面创建znode,作为/mylock的子节点,节点的类型要选择CreateMode.PERSISTENT_SEQUENTIAL(默认创建为CreateMode.EPHEMERAL_SEQUENTIAL),节点的名字最好用uuid(如果不这么做在某种情况下会发生死锁),假设目前同时有3个客户端想要获得锁,那么/mylock下的目录应该是这个样子的。xxx为uuid , 0000000001,0000000002,0000000003 是zook服务端自动生成的自增数字。

    1
    xxx-lock-0000000001,xxx-lock-0000000002,xxx-lock-0000000003
  3. 当前客户端通过getChildren(/mylock)获取所有子节点列表并根据自增数字排序,然后判断一下自己创建的节点的顺序是不是在列表当中最小的,如果是 那么获取到锁,如果不是,那么获取自己的前一个节点,并设置监听这个节点的变化,当节点变化时重新执行步骤3 直到自己是编号最小的一个为止举例:假设当前客户端创建的节点是0000000002,因为它的编号不是最小的,所以获取不到锁,那么它就找到它前面的一个节点0000000001 并对它设置监听。

  4. 释放锁,当前获得锁的客户端在操作完成后删除自己创建的节点,这样会激发zookeeper的事件给其它客户端知道,这样其它客户端会重新执行(步骤3)。举例:加入客户端0000000001获取到锁,然后客户端0000000002加入进来获取锁,发现自己不是编号最小的,那么它会监听它前面节点的事件(0000000001的事件)然后执行步骤(3),当客户端0000000001操作完成后删除自己的节点,这时zook服务端会发送事件,这时客户端0000000002会接收到该事件,然后重复步骤3直到获取到锁)

上面的步骤实现了一个有序锁,也就是先进入等待锁的客户端在锁可用时先获得锁。如果想要实现一个随机锁,那么只需要把子节点名称后的自增数字换成一个随机数即可。

代码实现

简单版

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CuratorLock {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.40.40:2181,192.168.40.41:2181,192.168.40.42:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
cf.start();
InterProcessMutex lock = new InterProcessMutex(cf, "/mylock");
try {
lock.acquire();
System.out.println("已经获取到锁");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
}
Thread.sleep(10000);
cf.close();
}
}

代码解释:

  • InterProcessMutex 是线程安全的,一个JVM创建一个就好。
  • mylock为锁的根目录,我们可以针对不同业务创建不同的根目录。
  • lock.acquire()会阻塞方法,获取不到锁的线程会挂起。
  • lock.release()释放锁,必须要放到finally里面,已确保上面方法出现异常时也能够释放锁。

    节点图

           image.png

    多线程版

    具体代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    public class CuratorLockThread {
    public static void main(String[] args) throws Exception {
    for (int i = 0; i < 10; i++) {
    //启动10个线程模拟多个客户端
    JvmLock jl = new JvmLock(i);
    new Thread(jl).start();
    //这里加上300毫秒是为了让线程按顺序启动,不然有可能4号线程比3号线程先启动了,这样测试就不准了。
    Thread.sleep(100);
    }
    }
    public static class JvmLock implements Runnable {
    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.40.40:2181,192.168.40.41:2181,192.168.40.42:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms
    private int num;
    public JvmLock(int num) {
    this.num = num;
    }
    @Override
    public void run() {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    CuratorFramework cf = CuratorFrameworkFactory.builder()
    .connectString(CONNECT_ADDR)
    .sessionTimeoutMs(SESSION_OUTTIME)
    .retryPolicy(retryPolicy)
    .build();
    cf.start();
    InterProcessMutex lock = new InterProcessMutex(cf, "/mylock");
    try {
    System.out.println("我是第" + num + "号线程,我开始获取锁");
    lock.acquire();
    System.out.println("我是第" + num + "号线程,我已经获取锁");
    Thread.sleep(10000);
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    try {
    lock.release();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    cf.close();
    }
    }
    }

代码解释:

  • 那如果NODE0拿到了锁,而在执行锁内业务的时候 服务器挂了之类的行为,那那个NODE不是永远不会被删除,那后面的节点不是一直阻塞在那了吗?这是个临时节点,如果服务器没有收到客户端的心跳连接,则服务器会依据sessionid,将该节点删除的。

    节点图

           image558ba.png

非公平锁版

重写创建节点方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class NoFairLockDriver extends StandardLockInternalsDriver {
/**
* 随机数的长度
*/
private int numLength;
private static int DEFAULT_LENGTH = 5;
public NoFairLockDriver() {
this(DEFAULT_LENGTH);
}
public NoFairLockDriver(int numLength) {
this.numLength = numLength;
}
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String newPath = path + getRandomSuffix();
String ourPath;
if(lockNodeBytes != null){
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL).forPath(newPath, lockNodeBytes);
//ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}else{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL).forPath(newPath);
//ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
/**
* 获得随机数字符串
*/
public String getRandomSuffix() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numLength; i++) {
sb.append((int) (Math.random() * 10));
}
return sb.toString();
}
}

代码解释:

  • 原来使用的是CreateMode.EPHEMERAL_SEQUENTIAL类型的节点,节点名称最终是这样的_c_c8e86826-d3dd-46cc-8432-d91aed763c2e-lock-0000000025。
  • 其中0000000025是zook服务器端资自动生成的自增序列 从0000000000开始,所以每个客户端创建节点的顺序都是按照0,1,2,3这样递增的顺序排列的,所以他们获取锁的顺序与他们进入的顺序是一致的,这也就是所谓的公平锁。
  • 现在将有序的编号换成随机的数字,这样在获取锁的时候变成非公平锁。

注册新类

1
InterProcessMutex lock = new InterProcessMutex(cf,"/mylock", new NoFairLockDriver());

节点图

       image01f2f.png

代码地址: https://github.com/liuzhongpo/af-learning/tree/master/08-zookeeper/src/main/java/com/lzp/lock

文章目录
  1. 1. 分布式锁理论
    1. 1.1. 实现方案
    2. 1.2. 分布式锁是什么?
  2. 2. Maven依赖
  3. 3. Zookeeper锁原理
  4. 4. 代码实现
    1. 4.1. 简单版
      1. 4.1.1. 具体代码
      2. 4.1.2. 节点图
    2. 4.2. 多线程版
      1. 4.2.1. 具体代码
      2. 4.2.2. 节点图
    3. 4.3. 非公平锁版
      1. 4.3.1. 重写创建节点方法
      2. 4.3.2. 注册新类
      3. 4.3.3. 节点图
|