最近在使用redis时,就想能不能用其实现消息队列?也在网上看了下其他小伙伴写的实现,结合自身业务实现了如下消息队列,希望对大家有用。
废话不多说,直接开撸。
1、为什么zset可以做消息队列?
首先我们来看下,设计消息队列需要考虑的需求:有序性,消息重复性,可靠性。
- 有序性:zset所有元素可以根据成员关联的score来进行从低到高的排序,例如,我们可以利用时间戳来进行排序
- 消息重复性:在zset中每个元素都是唯一的,这也保证了消息的唯一性
- 可靠性:zset会自动维护元素之间的顺序,在添加或删除元素时无需手动排序,提升操作速度。
2、使用的zset命令
命令 | 描述 |
---|---|
zadd | 将一个给定score的成员添加到有序集合中,返回添加元素的个数 |
zrange | 根据元素在有序排序中的位置,从有序集合中获取多个元素 |
rank(K key, Object o) | 获取指定元素在集合中的索引,索引从0开始 |
3、代码实现
使用zset实现消息队列时,具体的流程,如下:
生产者流程:
- 用户获取消息Id,并封装消息体
- 用户发送数据到生产者,先获取锁
- 如果获取到锁,则校验该消息体是否已添加到队列中,已添加则直接返回提醒。
- 若未添加则调用方法将数据保存到zset集合中,否则等到指定时间后再获取锁。
- 推送数据后,释放锁
消费者流程:
- 调用方法获取数据
- 获取到数据,则直接返回,否则到指定时间后再次获取数据,直到获取到数据并返回。
统一返回类:
/**
* @Author: jiangjs
* @Description:
* @Date: 2021/11/12 15:46
**/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResultUtil<T> implements Serializable {
private int code;
private String msg;
private T data;
public static <T> ResultUtil<T> success(){
return ResultUtil.<T>builder().code(1000).msg("成功").build();
}
public static <T> ResultUtil<T> success(T data){
return ResultUtil.<T>builder().code(1000).msg("成功").data(data).build();
}
public static <T> ResultUtil<T> error(String msg){
return ResultUtil.<T>builder().code(5000).msg(msg).data(null).build();
}
public static <T> ResultUtil<T> error(int code,String msg){
return ResultUtil.<T>builder().code(code).msg(msg).build();
}
}
3.1 消息实体
需添加消息Id,主要防止消息重复提交。
/**
* @author: jiangjs
* @description: 消息实体
* @date: 2023/5/30 11:11
**/
@Data
@Accessors(chain = true)
public class QueueTask<T> {
/**
* 消息Id
*/
private String taskId;
/**
* 任务
*/
private T task;
}
3.2 队列类型
队列类型可以理解为队列的名称,通过枚举,可以随意添加队列名称。
/**
* @author: jiangjs
* @description: 队列类型
* @date: 2023/5/30 10:53
**/
public enum QueueTypeEnum {
/**
* 订单
*/
ORDER("order");
private final String type;
QueueTypeEnum(String type){
this.type = type;
}
public String getType(){
return type;
}
}
3.3 创建消息工具
package com.jiashn.springbootproject.redis.utils;
import com.jiashn.springbootproject.redis.domain.QueueTask;
import com.jiashn.springbootproject.utils.ResultUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author: jiangjs
* @description: redis实现消息队列
* @date: 2023/5/30 10:51
**/
public class RedisQueueUtil<T> {
private static final Logger log = LoggerFactory.getLogger(RedisQueueUtil.class);
private RedisTemplate<String,QueueTask<T>> redisTemplate;
/**
* 队列类型,即名称
*/
private final QueueTypeEnum typeEnum;
public RedisQueueUtil(QueueTypeEnum typeEnum,RedisTemplate<String,QueueTask<T>> redisTemplate){
this.typeEnum = typeEnum;
this.redisTemplate = redisTemplate;
}
/**
* 添加消息数据
* @param queueTask 消息
* @param time 延迟时间,单位s
*/
public ResultUtil<String> sendQueueTask(QueueTask<T> queueTask, long time){
//加锁
if (getLock()){
try {
Long rank = redisTemplate.opsForZSet().rank(typeEnum.getType(), queueTask);
if (Objects.nonNull(rank)){
return ResultUtil.error(6000,"消息数据已经存在,不予添加......");
}
Boolean result = redisTemplate.opsForZSet().add(typeEnum.getType(), queueTask, System.currentTimeMillis() + time*1000);
if (Objects.nonNull(result) && result){
log.info("添加消息数据成功:" + queueTask + ",添加时间:" + LocalDateTime.now());
return ResultUtil.success("添加消息数据成功");
}
return ResultUtil.error("添加消息数据失败");
}finally {
//释放锁
releaseLock();
}
} else {
log.info("未获取到锁,稍后再试");
return ResultUtil.error("未获取到锁,稍后再试");
}
}
/**
* 获取zset前count数据
* @param count 数据数
* @return 返回获取到数据
*/
public Set<QueueTask<T>> loopGetTask(int count) {
//rangeByScore,根据score顺序获取zset数据的值
return redisTemplate.opsForZSet().rangeByScore(typeEnum.getType(), 0, System.currentTimeMillis(), 0, count-1);
}
/**
* 注销消息队列
* @param typeEnum 消息队列名称
*/
public void destroy(QueueTypeEnum typeEnum){
redisTemplate.opsForZSet().remove(typeEnum.getType());
}
/**
* 获取任务Id
* @return 返回消息Id
*/
public String getTaskId(){
return typeEnum.getType() + "_" + UUID.randomUUID().toString().replace("-","");
}
/**
* 获取锁
* @return 返回加锁状态
*/
private boolean getLock(){
Boolean absent = redisTemplate.opsForValue().setIfAbsent(typeEnum.getType() + "_Locked", null, 30L, TimeUnit.MINUTES);
return Objects.nonNull(absent) ? absent : false;
}
/**
* 释放锁
*/
public void releaseLock(){
redisTemplate.delete(typeEnum.getType() + "_Locked");
}
}
在消息工具类中,创建消息任务时添加了锁,只有在获取锁的前提下才能添加消息任务。
提供获取消息Id的方法是为了让提交消息任务前,先获取Id,即使在提交时网络发生问题,提交的Id还是同一个,再进行消息消费时,可以根据这个Id来进行判断该消息任务是否已被消费,被消费则直接丢弃。
3.4 消费消息
/**
* @author: jiangjs
* @description: 启动消费
* @date: 2023/5/30 14:27
**/
@Component
public class CustomerTaskLineRunner implements CommandLineRunner {
@Resource
private RedisTemplate<String,QueueTask<String>> redisTemplate;
private final static String QUEUE_TYPE = QueueTypeEnum.ORDER.getType();
private final static Logger log = LoggerFactory.getLogger(CustomerTaskLineRunner.class);
@Override
public void run(String... args) throws Exception {
RedisQueueUtil<String> queueUtil = new RedisQueueUtil<>(QueueTypeEnum.ORDER,redisTemplate);
while (true){
Set<QueueTask<String>> queueTasks = queueUtil.loopGetTask(10);
if (CollectionUtils.isNotEmpty(queueTasks)){
for (QueueTask<String> queueTask : queueTasks) {
//校验当前消息是否已消费,主要防止网络延时,导致多次提交同一任务 存在
QueueTask<String> stringQueueTask = redisTemplate.opsForValue().get(QUEUE_TYPE + "_" + queueTask.getTaskId());
if (Objects.nonNull(stringQueueTask)){
log.info("该任务已经消费,不能重复消费");
redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
continue;
}
Long removeNum = redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
if (Objects.nonNull(removeNum) && removeNum > 0){
String task = queueTask.getTask();
log.info("消费任务数据:" + task);
//设置过期时间,10分钟内则默认是重复提交
redisTemplate.opsForValue().set(QUEUE_TYPE + "_" + queueTask.getTaskId(),queueTask,10L, TimeUnit.MINUTES);
}
}
}
log.info("------1分钟后再次获取------");
Thread.sleep(60000);
}
}
}
校验重复消息,若消息重复且在10分钟内未被消费,则直接将该消息从队列中删除。在消息任务被消费后,将数据从队列中移除。
执行结果:
到此这篇关于redis使用zset实现延时队列的示例代码的文章就介绍到这了,更多相关redis zset延时队列内容请搜索萤火虫技术以前的文章或继续浏览下面的相关文章希望大家以后多多支持萤火虫技术!
发表评论 取消回复