#ZooKeeper.分布式锁
- 看了好多版本的分布式锁,大同小异,其中有几个自旋exists监听节点,觉得很笨,那就再造个轮子吧
- 思路无非都是相同的,利用ZooKeeper强一致特性
- 进程并发在固定父节点下创建不同的临时节点
- 查看创建的子节点,排序,取第一个,等于自己的节点,算获取到锁,执行操作,其他监听第一个节点,进入等待
- 第一个节点变动,所有机器收到事件,唤醒线程,删除自己的节点,进入步骤1
- CountDownLatch 比较简洁,但是还是lock更直白,更明确。
tips
- 这个轮子仅仅是个toy,真正生产环境考虑更多,如释放锁失败异常各种情况
public class ZKLock implements Lock{ private static Logger log = Logger.getLogger(ZKLock.class.getSimpleName()); private ZooKeeper zookpeer; private Listacl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private static final String ROOT = "/lock"; private byte[] b = new byte[]{1}; private String lock; private ReentrantLock reenLock = new ReentrantLock(); private Condition condition = reenLock.newCondition(); //private CountDownLatch c; private int i; public ZKLock(String zookpeer,int i) { try { this.i = i; this.zookpeer = new ZooKeeper(zookpeer,3000,new Watcher(){ @Override public void process(WatchedEvent event) { System.out.println("==启动回调=="); }}); init(); } catch (IOException e) { } } @Override public boolean tryLock() { reenLock.lock(); try { System.out.println("==================="); System.out.println("开始抢锁 机器" + i); //c = new CountDownLatch(1); String myLock = zookpeer.create(getLockName(), b, acl, CreateMode.EPHEMERAL); this.lock = getFirstNode(); if(myLock.equals(this.lock)) { //获取到锁 System.out.println("我获得锁,我是 机器" + i + ":" + this.lock + " !!!!!!!!!!!!"); } else { System.out.println("我没抢到锁 机器" + i); } reg(zookpeer); condition.await(); //c.await(); System.out.println("释放锁 机器:" + i); if(zookpeer.exists(myLock, false) != null) { zookpeer.delete(myLock, -1); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ reenLock.unlock(); } return false; } //初始化锁目录 public void init(){ try { if(zookpeer.exists(ROOT, false) == null) { zookpeer.create(ROOT,b, acl, CreateMode.PERSISTENT); } } catch (KeeperException e) { log.log(Level.WARNING,"root already exist",e); } catch (InterruptedException e) { log.log(Level.SEVERE,"connection fail"); } } public void reg(ZooKeeper zk) { try { zk.exists(lock, new M()); } catch (KeeperException e) { } catch (InterruptedException e) { } } class M implements Watcher{ @Override public void process(WatchedEvent event) { reenLock.lock(); try { System.out.println("节点变更 机器i" + i); //c.countDown(); condition.signal(); reg(zookpeer); } finally{ reenLock.unlock(); } } } @Override public void lock() { while(true){ tryLock(); } } private String getFirstNode() throws KeeperException, InterruptedException { List children = zookpeer.getChildren(ROOT, false); Collections.sort(children); System.out.println("当前节点:" + Arrays.toString(children.toArray())); return ROOT + "/" + children.get(0); } private String getLockName() { String name = UUID.randomUUID().toString(); return ROOT + "/" + name.substring(0,name.indexOf("-")); } .....}
//3线程模拟争夺 手动删除节点,可看到重新争锁public class LockWatch { public static void main(String[] args) throws Exception { ExecutorService p = Executors.newCachedThreadPool(); for(int i = 0; i < 3; i ++) { p.submit(new T(i)); } }}class T implements Runnable{ private int i; public T(int i){ this.i = i; } @Override public void run() { final ZKLock z = new ZKLock("127.0.0.1",i); z.lock(); }}
==启动回调====启动回调====启动回调===========================================================开始抢锁 机器2开始抢锁 机器0开始抢锁 机器1当前节点:[a63a788c, c61a117b, decc9d68]当前节点:[a63a788c, c61a117b, decc9d68]我没抢到锁 机器1当前节点:[a63a788c, c61a117b, decc9d68]我没抢到锁 机器0我获得锁,我是 机器2:/lock/a63a788c !!!!!!!!!!!!节点变更 机器i2节点变更 机器i1节点变更 机器i0释放锁 机器:1释放锁 机器:0释放锁 机器:2