在 Spring Boot 应用中,消息堆积是一个常见的问题,它可能会导致系统性能下降、响应时间变长甚至出现故障。以下是几种解决消息堆积问题以实现高吞...
2025-08-02 0
在 Spring Boot 应用中,消息堆积是一个常见的问题,它可能会导致系统性能下降、响应时间变长甚至出现故障。以下是几种解决消息堆积问题以实现高吞吐零失误的方案,并结合代码示例进行说明。
增加消费者的并发度可以提高消息处理的速度,从而减少消息堆积。在 Spring Boot 中,对于不同的消息中间件,实现方式略有不同,下面以 RabbitMQ 为例。
import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;@SpringBootApplication@EnableRabbitpublic class RabbitMQConsumerApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQConsumerApplication.class, args); } // 配置并发消费者 @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(5); // 设置并发消费者数量 factory.setMaxConcurrentConsumers(10); // 设置最大并发消费者数量 return factory; } @RabbitListener(queues = "testQueue") public void listen(String message) { System.out.println("Received message: " + message); // 处理消息的逻辑 }}
批量消费可以减少与消息中间件的交互次数,提高处理效率。以 Kafka 为例。
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.context.annotation.Bean;import org.springframework.kafka.listener.AbstractMessageListenerContainer;import java.util.List;@SpringBootApplicationpublic class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } // 配置批量消费 @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); // 开启批量消费 factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } @KafkaListener(topics = "testTopic", containerFactory = "kafkaListenerContainerFactory") public void listen(List<ConsumerRecord<String, String>> records) { for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: " + record.value()); // 处理消息的逻辑 } }}
将消息处理逻辑异步化可以避免阻塞主线程,提高系统的吞吐量。以下是一个简单的示例。
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.stereotype.Service;@SpringBootApplication@EnableAsyncpublic class AsyncProcessingApplication { public static void main(String[] args) { SpringApplication.run(AsyncProcessingApplication.class, args); }}@Serviceclass MessageProcessor { @Async public void processMessage(String message) { try { // 模拟耗时操作 Thread.sleep(1000); System.out.println("Processed message: " + message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}
if (blockingQueue.remainingCapacity() == 1000) { System.out.println("可以睡眠一段时间"); }
在 Spring Boot 中利用 Java 的BlockingQueue阻塞队列来实现 MQ(消息队列)消息的高吞吐,核心思路是将从 MQ 接收到的消息暂存到BlockingQueue中,然后使用多个消费者线程从队列中取出消息进行处理,以此提高消息处理的并发度和吞吐量
import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;@SpringBootApplication@EnableRabbitpublic class MqBlockingQueueApplication implements CommandLineRunner { // 定义阻塞队列,这里使用 LinkedBlockingQueue private static final BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(1000); // 定义线程池 private static final ExecutorService executorService = Executors.newFixedThreadPool(10); public static void main(String[] args) { SpringApplication.run(MqBlockingQueueApplication.class, args); } @Override public void run(String... args) throws Exception { // 启动多个消费者线程 for (int i = 0; i < 5; i++) { executorService.submit(new MessageConsumer(blockingQueue)); } } // 消息生产者,从MQ接收消息并放入阻塞队列 @RabbitListener(queues = "testQueue") public void receiveMessage(String message) { try { blockingQueue.put(message); System.out.println("Received message from MQ and put into queue: " + message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 消息消费者类 static class MessageConsumer implements Runnable { private final BlockingQueue<String> queue; public MessageConsumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { while (true) { String message = queue.take(); System.out.println(Thread.currentThread().getName() + " consumed message: " + message); // 处理消息的逻辑 Thread.sleep(100); // 模拟处理耗时 } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }}
在 Spring Boot 中利用 Redis 实现 MQ 消息的高吞吐可以借助 Redis 的 List、Pub/Sub、Stream 等数据结构
Redis 的 List 是一个双向链表,支持从列表两端进行元素的插入和删除操作。生产者将消息从列表一端插入,消费者从另一端取出消息进行处理,从而实现消息队列的功能。
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Service;@Servicepublic class RedisListMessageProducer { private static final String QUEUE_KEY = "redis_list_queue"; @Autowired private RedisTemplate<String, String> redisTemplate; public void sendMessage(String message) { redisTemplate.opsForList().rightPush(QUEUE_KEY, message); }}
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Service;@Servicepublic class RedisListMessageConsumer { private static final String QUEUE_KEY = "redis_list_queue"; @Autowired private RedisTemplate<String, String> redisTemplate; @Scheduled(fixedRate = 100) // 每100毫秒执行一次 public void consumeMessage() { String message = redisTemplate.opsForList().leftPop(QUEUE_KEY); if (message != null) { System.out.println("Consumed message: " + message); // 处理消息的逻辑 } }}
Redis Stream 是 Redis 5.0 引入的一种新的数据结构,专门用于消息队列。它支持消息的持久化、多消费者组、消息确认等功能,能够更好地满足高并发、高吞吐的消息队列场景。
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.stream.MapRecord;import org.springframework.data.redis.connection.stream.RecordId;import org.springframework.data.redis.connection.stream.StreamRecords;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Service;import java.util.HashMap;import java.util.Map;@Servicepublic class RedisStreamMessageProducer { private static final String STREAM_KEY = "redis_stream_queue"; @Autowired private RedisTemplate<String, String> redisTemplate; public void sendMessage(String message) { Map<String, String> messageData = new HashMap<>(); messageData.put("message", message); MapRecord<String, String, String> record = StreamRecords.newRecord() .in(STREAM_KEY) .ofMap(messageData); RecordId recordId = redisTemplate.opsForStream().add(record); System.out.println("Sent message with ID: " + recordId); }}
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.stream.MapRecord;import org.springframework.data.redis.connection.stream.ReadOffset;import org.springframework.data.redis.connection.stream.StreamOffset;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Service;import java.util.List;@Servicepublic class RedisStreamMessageConsumer { private static final String STREAM_KEY = "redis_stream_queue"; @Autowired private RedisTemplate<String, String> redisTemplate; @Scheduled(fixedRate = 100) // 每100毫秒执行一次 public void consumeMessage() { StreamOffset<String> streamOffset = StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()); List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(streamOffset); if (records != null) { for (MapRecord<String, String, String> record : records) { String message = record.getValue().get("message"); System.out.println("Consumed message: " + message); // 处理消息的逻辑 redisTemplate.opsForStream().acknowledge(STREAM_KEY, "consumer_group", record.getId()); } } }}
对于处理失败的消息,进行重试可以避免消息丢失,同时使用死信队列可以隔离无法处理的消息。以 RabbitMQ 为例。
import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;@SpringBootApplication@EnableRabbitpublic class RabbitMQRetryApplication { public static void main(String[] args) { SpringApplication.run(RabbitMQRetryApplication.class, args); } // 定义队列和交换器 @Bean public Queue testQueue() { return QueueBuilder.durable("testQueue") .withArgument("x-dead-letter-exchange", "dlxExchange") .withArgument("x-dead-letter-routing-key", "dlxKey") .build(); } @Bean public DirectExchange testExchange() { return new DirectExchange("testExchange"); } @Bean public Binding testBinding() { return BindingBuilder.bind(testQueue()).to(testExchange()).with("testKey"); } @Bean public Queue dlxQueue() { return new Queue("dlxQueue", true); } @Bean public DirectExchange dlxExchange() { return new DirectExchange("dlxExchange"); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlxKey"); } // 配置重试和死信队列 @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setRecoveryCallback(retryContext -> { // 重试逻辑 return null; }); factory.setErrorHandler(new RejectAndDontRequeueRecoverer()); return factory; } @RabbitListener(queues = "testQueue") public void listen(String message) { try { // 处理消息的逻辑 if (Math.random() < 0.2) { throw new RuntimeException("Simulated error"); } System.out.println("Received message: " + message); } catch (Exception e) { throw e; } } @RabbitListener(queues = "dlxQueue") public void listenDlx(String message) { System.out.println("Received dead letter message: " + message); }}
相关文章
在 Spring Boot 应用中,消息堆积是一个常见的问题,它可能会导致系统性能下降、响应时间变长甚至出现故障。以下是几种解决消息堆积问题以实现高吞...
2025-08-02 0
说起买耳机这事儿,我可是从大学就开始折腾了。最早买过9块9的地摊货,后来也剁手过近万的旗舰。现在耳朵越来越挑,虽说现在无线耳机确实方便,但真要享受音乐...
2025-08-02 0
“信任电商”风头正盛,药店却还在卖产品?原创 谢小童 中国药店 随着数字经济的发展,医药零售行业普遍面临传统渠道增长乏力、客流量下降、获客成本上升等挑...
2025-08-02 0
电脑用久了,总会出现一些奇奇怪怪的小毛病,比如电脑开始菜单不好用、桌面的回收站图标没了、资源管理器偶尔会启动不了、无法上网等等。这类的毛病你说大不大,...
2025-08-02 0
在竞争激烈的 7 月手机市场,谁能想到最大的黑马竟是 Hi 畅享 80 Pro。根据研究机构W29最新数据,Hi畅享 80 Pro上市当日热销2.6万...
2025-08-02 0
今天给各位分享网上金花开挂是真的吗的知识,其中也会对那个金花开挂怎么买进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!赛金花一生...
2025-08-02 0
今天给各位分享JJ斗地主哪里买记牌器的知识,其中也会对进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!JJ斗地主记牌器 1、打开...
2025-08-02 1
金融界2025年8月1日消息,国家知识产权局信息显示,陕西星环聚能科技有限公司取得一项名为“用于静电探针系统的电源电路和静电探针系统”的专利,授权公告...
2025-08-02 0
发表评论