Redis实现分布式锁与任务队列的思路,附上源代码
场景:
电商网站上有很多秒杀活动,会迎来一个用户请求的高峰期,可能会有几十万几百万的并发量,来抢这个手机,在高并发的情形下会对数据库服务器或者是文件服务器应用服务器造成巨大的压力,严重时说不定就宕机了;
另一个问题是,秒杀的东西都是有量的,一款手机只有10台的量秒杀,在高并发的情况下,成千上万条数据更新数据库(例如10台的量被人抢一台就会在数据集某些记录下 减1),那次这个时候的先后顺序是很乱的,很容易出现10台的量,抢到的人就不止10个这种严重的问题。
对于redis的并发的处理:
a)Redis为单进程单线程模式,Redis本身没有锁的概念,Redis对于多个客户端连接并不存在竞争,
但是在Jedis客户端对Redis进行并发访问时会发生 连接超时、连接阻塞、客户端关闭连接等问题,对于这些问题对此有2种解决方法:
1.客户端角度,为保证每个客户端间正常有序与Redis进行通信,对连接进行池化,同时对客户端读写Redis操作采用内部锁synchronized。
2..服务器角度,利用setnx实现锁。
Redis分布式锁的实现:
思路很简单,主要用到的redis函数是setnx(),首先是将某一任务标识名(这里用Lock:order作为标识名的例子)作为键存到redis里,并为其设个过期时间。
对于再次Lock:order请求过来,先是通过setnx()看看是否能将Lock:order插入到redis里,可以的话就返回true,不可以就返回false。当然,在我的代码里会比这个思路复杂一些,我会在分析代码时进一步说明。
Redis实现任务队列:
实现会用到上面的Redis分布式的锁机制,主要是用到了Redis里的有序集合这一数据结构,例如入队时,通过zset的add()函数进行入队,而出对时,可以用到zset的getScore()函数。另外还可以弹出顶部的几个任务。
Redis实现分布式锁代码:需要注意的问题
1:为避免特殊原因导致锁无法释放,在加锁成功后,锁会被赋予一个生存时间(通过lock方法的参数设置或者使用默认值),超出生存时间锁会被自动释放锁;如果需要长时间加锁,可以通过expire方法延长锁的生存时间。
2:系统级的锁在进程无论何种原因时出现崩溃时,操作系统会自己回收锁,所以不会出现资源丢失;但是分布式锁则不同,如果设置的锁生成时间过长,一旦由于某个原因出现系统崩溃的时候,其他进程就会获取不到锁, 这个锁就会变成垃圾锁,其他进程也用不到这个锁,进不到加锁区。
3:加锁代码中主要的两个参数,一个是timeout,这个是循环获取锁的等待时间,在这个时间内会一直尝试获取锁知道超时,如果为0,则表示获取锁失败后直接返回而不再等待;另一个重要参数的expire,这个参数指当前锁的最大生存时间,以秒为单位的,它必须大于0,如果超过生存时间锁仍未被释放,则系统会自动强制释放
代码实现过程:先取得当前时间,然后再获取到锁失败时的等待超时的时刻(是个时间戳),再获取到锁的最大生存时间。
key用这种格式:”Lock:锁的标识名”,进入循环了,先是插入数据到redis里,使用setnx()函数,key键不存在则插入数据,如果插入成功,则对该键进行失效时间的设置,并将该key键放在$lockedName数组里,返回true,也就是上锁成功 如果该key键存在,则不会插入操作了。
public class RedisBillLockHandler implements IBatchBillLockHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RedisBillLockHandler.class); private static final int DEFAULT_SINGLE_EXPIRE_TIME = 3; private static final int DEFAULT_BATCH_EXPIRE_TIME = 6; private final JedisPool jedisPool; public RedisBillLockHandler(JedisPool jedisPool) { this.jedisPool = jedisPool; } /** * 获取锁 如果锁可用 立即返回true, 否则返回false * @see com.fx.platform.components.lock.IBillLockHandler#tryLock(com.fx.platform.components.lock.IBillIdentify) * @param billIdentify * @return */ public boolean tryLock(IBillIdentify billIdentify) { return tryLock(billIdentify, 0L, null); } /** * 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false * @param billIdentify * @param timeout * @param unit * @return */ public boolean tryLock(IBillIdentify billIdentify, long timeout, TimeUnit unit) { String key = (String) billIdentify.uniqueIdentify(); Jedis jedis = null; try { jedis = getResource(); long nano = System.nanoTime(); do { LOGGER.debug("try lock key: " + key); Long i = jedis.setnx(key, key); if (i == 1) { jedis.expire(key, DEFAULT_SINGLE_EXPIRE_TIME); LOGGER.debug("get lock, key: " + key + " , expire in " + DEFAULT_SINGLE_EXPIRE_TIME + " seconds."); return Boolean.TRUE; } else { // 存在锁 if (LOGGER.isDebugEnabled()) { String desc = jedis.get(key); LOGGER.debug("key: " + key + " locked by another business:" + desc); } } if (timeout == 0) { break; } Thread.sleep(300); } while ((System.nanoTime() - nano) < unit.toNanos(timeout)); return Boolean.FALSE; } catch (JedisConnectionException je) { LOGGER.error(je.getMessage(), je); returnBrokenResource(jedis); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { returnResource(jedis); } return Boolean.FALSE; } /** * 如果锁空闲立即返回 获取失败 一直等待 * @param billIdentify */ public void lock(IBillIdentify billIdentify) { String key = (String) billIdentify.uniqueIdentify(); Jedis jedis = null; try { jedis = getResource(); do { LOGGER.debug("lock key: " + key); Long i = jedis.setnx(key, key); if (i == 1) { jedis.expire(key, DEFAULT_SINGLE_EXPIRE_TIME); LOGGER.debug("get lock, key: " + key + " , expire in " + DEFAULT_SINGLE_EXPIRE_TIME + " seconds."); return; } else { if (LOGGER.isDebugEnabled()) { String desc = jedis.get(key); LOGGER.debug("key: " + key + " locked by another business:" + desc); } } Thread.sleep(300); } while (true); } catch (JedisConnectionException je) { LOGGER.error(je.getMessage(), je); returnBrokenResource(jedis); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { returnResource(jedis); } } /** * 释放锁 * @param billIdentify */ public void unLock(IBillIdentify billIdentify) { List list = new ArrayList(); list.add(billIdentify); unLock(list); } /** * 批量获取锁 如果全部获取 立即返回true, 部分获取失败 返回false * @param billIdentifyList * @return */ public boolean tryLock(List billIdentifyList) { return tryLock(billIdentifyList, 0L, null); } /** * 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false * @param billIdentifyList * @param timeout * @param unit * @return */ public boolean tryLock(List billIdentifyList, long timeout, TimeUnit unit) { Jedis jedis = null; try { List needLocking = new CopyOnWriteArrayList(); List locked = new CopyOnWriteArrayList(); jedis = getResource(); long nano = System.nanoTime(); do { // 构建pipeline,批量提交 Pipeline pipeline = jedis.pipelined(); for (IBillIdentify identify : billIdentifyList) { String key = (String) identify.uniqueIdentify(); needLocking.add(key); pipeline.setnx(key, key); } LOGGER.debug("try lock keys: " + needLocking); // 提交redis执行计数 List results = pipeline.syncAndReturnAll(); for (int i = 0; i < results.size(); ++i) { Long result = (Long) results.get(i); String key = needLocking.get(i); if (result == 1) { // setnx成功,获得锁 jedis.expire(key, DEFAULT_BATCH_EXPIRE_TIME); locked.add(key); } } needLocking.removeAll(locked); // 已锁定资源去除 if (CollectionUtils.isEmpty(needLocking)) { return true; } else { // 部分资源未能锁住 LOGGER.debug("keys: " + needLocking + " locked by another business:"); } if (timeout == 0) { break; } Thread.sleep(500); } while ((System.nanoTime() - nano) < unit.toNanos(timeout)); // 得不到锁,释放锁定的部分对象,并返回失败 if (!CollectionUtils.isEmpty(locked)) { jedis.del(locked.toArray(new String[0])); } return false; } catch (JedisConnectionException je) { LOGGER.error(je.getMessage(), je); returnBrokenResource(jedis); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { returnResource(jedis); } return true; } /** * 批量释放锁 * @param billIdentifyList */ public void unLock(List billIdentifyList) { List keys = new CopyOnWriteArrayList(); for (IBillIdentify identify : billIdentifyList) { String key = (String) identify.uniqueIdentify(); keys.add(key); } Jedis jedis = null; try { jedis = getResource(); jedis.del(keys.toArray(new String[0])); LOGGER.debug("release lock, keys :" + keys); } catch (JedisConnectionException je) { LOGGER.error(je.getMessage(), je); returnBrokenResource(jedis); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { returnResource(jedis); } } /** * @return */ private Jedis getResource() { return jedisPool.getResource(); } /** * 销毁连接 * @author http://blog.csdn.net/java2000_wl * @param jedis */ private void returnBrokenResource(Jedis jedis) { if (jedis == null) { return; } try { //容错 jedisPool.returnBrokenResource(jedis); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } /** * @param jedis */ private void returnResource(Jedis jedis) { if (jedis == null) { return; } try { jedisPool.returnResource(jedis); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }
评论 (0)