分布式锁实现方式
分布式锁使用背景:
在集群环境中,一个应用需要部署到多台电脑上然后做负载均衡,
分布式系统多线程、多进程分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要跨JVM的互斥机制来控制共享资源的访问,这时候需要使用到分布式
一 基于数据库
在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,必须使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁
INSERT INTO method_lock_table (method_name, desc) VALUES ('xx类.xx方法.xx参数', '方法的描述');
因为method_name做了唯一性约束,如果多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的按个线程或得了该方法得锁,可以执行方法
执行方法完成之后需要删除
delete from method_lock_table where method_name ='xx类.xx方法.xx参数'
数据库实现的一些缺点:
1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;
2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;
4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。
二 基于Redis
选择redis又很高的性能,而且redis命令对此支持比较好,实现起来比较方便
使用的命令:
1) setnx(key,value) 配合 expire(key,秒数)
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁
2)jedis.set(key, value,“NX”, “PX”, expireTime)
第一个为key,我们使用key来当锁,因为key是唯一的。
第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
第五个为time,与第四个参数相呼应,代表key的过期时间
三 基于Zookeeper
具体步骤
- 创建一个永久性节点,作锁的根目录;2、当要获取一个锁时,在锁目录下创建一个临时有序列的节点;3、检查锁目录的子节点是否有序列比它小,若有则监听比它小的上一个节点,当前锁处于等待状态;4、当等待时间超过Zookeeper session的连接时间(sessionTimeout)时,当前session过期,Zookeeper自动删除此session创建的临时节点,等待状态结束,获取锁失败;5、当监听器触发时,等待状态结束,获得锁
-
package org.massive.lock;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.concurrent.CountDownLatch;public class ZookeeperClient { private static String connectionString = "localhost:2181"; private static int sessionTimeout = 10000; public static ZooKeeper getInstance() throws IOException, InterruptedException { //-------------------------------------------------------------- // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss) // 这里等Zookeeper的连接完成才返回实例 //-------------------------------------------------------------- final CountDownLatch connectedSignal = new CountDownLatch(1); ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(sessionTimeout,TimeUnit.MILLISECONDS); return zk; } public static String getConnectionString() { return connectionString; } public static void setConnectionString(String connectionString) { ZookeeperClient.connectionString = connectionString; } public static int getSessionTimeout() { return sessionTimeout; } public static void setSessionTimeout(int sessionTimeout) { ZookeeperClient.sessionTimeout = sessionTimeout; }}package org.massive.lock;import org.apache.commons.lang3.RandomUtils;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.List;import java.util.SortedSet;import java.util.TreeSet;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * Created by massive on 2016-12-15. */public class DistributedLock { private String lockId; private static final String LOCK_ROOT = "/LOCKS"; //-------------------------------------------------------------- // data为存储的节点数据内容 // 由于锁机制用的是序列功能的特性,data的值不重要,只要利于网络传输即可 //-------------------------------------------------------------- private final static byte[] data = {0x12, 0x34}; private final CountDownLatch latch = new CountDownLatch(1); private ZooKeeper zk; private int sessionTimeout; public DistributedLock(ZooKeeper zk,int sessionTimeout) { this.zk = zk; this.sessionTimeout = sessionTimeout; } public DistributedLock() throws IOException, KeeperException, InterruptedException { this.zk = ZookeeperClient.getInstance(); this.sessionTimeout = ZookeeperClient.getSessionTimeout(); } class LockWatcher implements Watcher { @Override public void process(WatchedEvent event) { //-------------------------------------------------------------- // 监控节点变化(本程序为序列的上一节点) // 若为节点删除,证明序列的上一节点已删除,此时释放阀门让当前的lock获得锁 //-------------------------------------------------------------- if (event.getType() == Event.EventType.NodeDeleted) latch.countDown(); } } /** * @return * @throws KeeperException * @throws InterruptedException */ public synchronized boolean lock() { //-------------------------------------------------------------- // 保证锁根节点存在,若不存在则创建它 //-------------------------------------------------------------- createLockRootIfNotExists(); try { lockId = zk.create(LOCK_ROOT + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("thread " + Thread.currentThread().getName() + " create the lock node: " + lockId + ", trying to get lock now"); //-------------------------------------------------------------- // 获得锁根节点下的各锁子节点,并排序 //-------------------------------------------------------------- List
nodes = zk.getChildren(LOCK_ROOT, true); SortedSet sortedNode = new TreeSet (); for (String node : nodes) { sortedNode.add(LOCK_ROOT + "/" + node); } String first = sortedNode.first(); SortedSet lessThanMe = sortedNode.headSet(lockId); //-------------------------------------------------------------- // 检查是否有比当前锁节点lockId更小的节点,若有则监控当前节点的前一节点 //-------------------------------------------------------------- if (lockId.equals(first)) { System.out.println("thread " + Thread.currentThread().getName() + " has get the lock, lockId is " + lockId); return true; } else if (!lessThanMe.isEmpty()) { String prevLockId = lessThanMe.last(); zk.exists(prevLockId, new LockWatcher()); //-------------------------------------------------------------- // 阀门等待sessionTimeout的时间 // 当等待sessionTimeout的时间过后,上一个lock的Zookeeper连接会过期,删除所有临时节点,触发监听器 //-------------------------------------------------------------- latch.await(sessionTimeout, TimeUnit.MILLISECONDS); System.out.println("thread " + Thread.currentThread().getName() + " has get the lock, lockId is " + lockId); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return true; } public synchronized boolean unlock() { //-------------------------------------------------------------- // 删除lockId节点以释放锁 //-------------------------------------------------------------- try { System.out.println("thread " + Thread.currentThread().getName() + " unlock the lock: " + lockId + ", the node: " + lockId + " had been deleted"); zk.delete(lockId, -1); return true; } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } finally { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } return false; } /** * 保证锁根节点存在,若不存在则创建它 */ public void createLockRootIfNotExists() { try { Stat stat = zk.exists(LOCK_ROOT, false); if (stat == null) { zk.create(LOCK_ROOT, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { final CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { DistributedLock lock = null; try { lock = new DistributedLock(); latch.countDown(); latch.await(); lock.lock(); Thread.sleep(RandomUtils.nextInt(200, 500)); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (lock != null) { lock.unlock(); } } } }).start(); } }}