首页 AI科技文章正文

Springboot消息堆积如何破局,如何实现高吞吐零失误

AI科技 2025年08月02日 02:32 0 aa

Springboot消息堆积如何破局,如何实现高吞吐零失误

在 Spring Boot 应用中,消息堆积是一个常见的问题,它可能会导致系统性能下降、响应时间变长甚至出现故障。以下是几种解决消息堆积问题以实现高吞吐零失误的方案,并结合代码示例进行说明。

高吞吐

方案一:增加消费者并发度

增加消费者的并发度可以提高消息处理的速度,从而减少消息堆积。在 Spring Boot 中,对于不同的消息中间件,实现方式略有不同,下面以 RabbitMQ 为例。

代码实现

import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;@SpringBootApplication@EnableRabbitpublic class RabbitMQConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(RabbitMQConsumerApplication.class, args);    }    // 配置并发消费者    @Bean    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        factory.setConcurrentConsumers(5); // 设置并发消费者数量        factory.setMaxConcurrentConsumers(10); // 设置最大并发消费者数量        return factory;    }    @RabbitListener(queues = "testQueue")    public void listen(String message) {        System.out.println("Received message: " + message);        // 处理消息的逻辑    }}

方案二:批量消费消息

批量消费可以减少与消息中间件的交互次数,提高处理效率。以 Kafka 为例。

代码实现

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.context.annotation.Bean;import org.springframework.kafka.listener.AbstractMessageListenerContainer;import java.util.List;@SpringBootApplicationpublic class KafkaConsumerApplication {    public static void main(String[] args) {        SpringApplication.run(KafkaConsumerApplication.class, args);    }    // 配置批量消费    @Bean    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(            ConsumerFactory<String, String> consumerFactory) {        ConcurrentKafkaListenerContainerFactory<String, String> factory =                new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory);        factory.setBatchListener(true); // 开启批量消费        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);        return factory;    }    @KafkaListener(topics = "testTopic", containerFactory = "kafkaListenerContainerFactory")    public void listen(List<ConsumerRecord<String, String>> records) {        for (ConsumerRecord<String, String> record : records) {            System.out.println("Received message: " + record.value());            // 处理消息的逻辑        }    }}

方案三:异步处理消息

将消息处理逻辑异步化可以避免阻塞主线程,提高系统的吞吐量。以下是一个简单的示例。

代码实现

import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.Async;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.stereotype.Service;@SpringBootApplication@EnableAsyncpublic class AsyncProcessingApplication {    public static void main(String[] args) {        SpringApplication.run(AsyncProcessingApplication.class, args);    }}@Serviceclass MessageProcessor {    @Async    public void processMessage(String message) {        try {            // 模拟耗时操作            Thread.sleep(1000);            System.out.println("Processed message: " + message);        } catch (InterruptedException e) {            Thread.currentThread().interrupt();        }    }}

方案四Java 的BlockingQueue

#当消息阻塞队列满了时可以暂停一段时间,停止拉取等待消费者消费完数据

if (blockingQueue.remainingCapacity() == 1000) {                System.out.println("可以睡眠一段时间");            }

在 Spring Boot 中利用 Java 的BlockingQueue阻塞队列来实现 MQ(消息队列)消息的高吞吐,核心思路是将从 MQ 接收到的消息暂存到BlockingQueue中,然后使用多个消费者线程从队列中取出消息进行处理,以此提高消息处理的并发度和吞吐量

import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;@SpringBootApplication@EnableRabbitpublic class MqBlockingQueueApplication implements CommandLineRunner {    // 定义阻塞队列,这里使用 LinkedBlockingQueue    private static final BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(1000);    // 定义线程池    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);    public static void main(String[] args) {        SpringApplication.run(MqBlockingQueueApplication.class, args);    }    @Override    public void run(String... args) throws Exception {        // 启动多个消费者线程        for (int i = 0; i < 5; i++) {            executorService.submit(new MessageConsumer(blockingQueue));        }    }    // 消息生产者,从MQ接收消息并放入阻塞队列    @RabbitListener(queues = "testQueue")    public void receiveMessage(String message) {        try {            blockingQueue.put(message);            System.out.println("Received message from MQ and put into queue: " + message);        } catch (InterruptedException e) {            Thread.currentThread().interrupt();        }    }    // 消息消费者类    static class MessageConsumer implements Runnable {        private final BlockingQueue<String> queue;        public MessageConsumer(BlockingQueue<String> queue) {            this.queue = queue;        }        @Override        public void run() {            try {                while (true) {                    String message = queue.take();                    System.out.println(Thread.currentThread().getName() + " consumed message: " + message);                    // 处理消息的逻辑                    Thread.sleep(100); // 模拟处理耗时                }            } catch (InterruptedException e) {                Thread.currentThread().interrupt();            }        }    }}

代码解释

  1. BlockingQueue的使用:使用LinkedBlockingQueue作为消息的缓冲队列,容量为 1000。当队列满时,生产者线程会被阻塞;当队列为空时,消费者线程会被阻塞。
  2. 消息生产者:使用@RabbitListener注解监听testQueue队列,当接收到消息时,将其放入BlockingQueue中。
  3. 消息消费者:MessageConsumer类实现了Runnable接口,从BlockingQueue中取出消息进行处理。使用take方法,如果队列为空则会阻塞。
  4. 线程池管理:使用Executors.newFixedThreadPool(10)创建一个固定大小为 10 的线程池,管理多个消费者线程,提高并发处理能力。

注意事项

  • 队列容量:需要根据实际情况合理设置BlockingQueue的容量,避免队列过小导致生产者频繁阻塞,或者队列过大导致内存占用过高。
  • 线程池配置:根据系统的硬件资源和任务特点,合理配置线程池的大小。
  • 异常处理:在生产者和消费者的代码中,需要捕获InterruptedException异常并正确处理,避免线程被意外中断。

方案五 利用redis做缓存实现高吞吐

在 Spring Boot 中利用 Redis 实现 MQ 消息的高吞吐可以借助 Redis 的 List、Pub/Sub、Stream 等数据结构

基于 Redis List 实现消息队列

原理

Redis 的 List 是一个双向链表,支持从列表两端进行元素的插入和删除操作。生产者将消息从列表一端插入,消费者从另一端取出消息进行处理,从而实现消息队列的功能。

3. 编写生产者服务

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Service;@Servicepublic class RedisListMessageProducer {    private static final String QUEUE_KEY = "redis_list_queue";    @Autowired    private RedisTemplate<String, String> redisTemplate;    public void sendMessage(String message) {        redisTemplate.opsForList().rightPush(QUEUE_KEY, message);    }}

4. 编写消费者服务

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Service;@Servicepublic class RedisListMessageConsumer {    private static final String QUEUE_KEY = "redis_list_queue";    @Autowired    private RedisTemplate<String, String> redisTemplate;    @Scheduled(fixedRate = 100) // 每100毫秒执行一次    public void consumeMessage() {        String message = redisTemplate.opsForList().leftPop(QUEUE_KEY);        if (message != null) {            System.out.println("Consumed message: " + message);            // 处理消息的逻辑        }    }}

基于 Redis Stream 实现消息队列

原理

Redis Stream 是 Redis 5.0 引入的一种新的数据结构,专门用于消息队列。它支持消息的持久化、多消费者组、消息确认等功能,能够更好地满足高并发、高吞吐的消息队列场景。

3. 编写生产者服务

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.stream.MapRecord;import org.springframework.data.redis.connection.stream.RecordId;import org.springframework.data.redis.connection.stream.StreamRecords;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Service;import java.util.HashMap;import java.util.Map;@Servicepublic class RedisStreamMessageProducer {    private static final String STREAM_KEY = "redis_stream_queue";    @Autowired    private RedisTemplate<String, String> redisTemplate;    public void sendMessage(String message) {        Map<String, String> messageData = new HashMap<>();        messageData.put("message", message);        MapRecord<String, String, String> record = StreamRecords.newRecord()               .in(STREAM_KEY)               .ofMap(messageData);        RecordId recordId = redisTemplate.opsForStream().add(record);        System.out.println("Sent message with ID: " + recordId);    }}

4. 编写消费者服务

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.stream.MapRecord;import org.springframework.data.redis.connection.stream.ReadOffset;import org.springframework.data.redis.connection.stream.StreamOffset;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Service;import java.util.List;@Servicepublic class RedisStreamMessageConsumer {    private static final String STREAM_KEY = "redis_stream_queue";    @Autowired    private RedisTemplate<String, String> redisTemplate;    @Scheduled(fixedRate = 100) // 每100毫秒执行一次    public void consumeMessage() {        StreamOffset<String> streamOffset = StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed());        List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().read(streamOffset);        if (records != null) {            for (MapRecord<String, String, String> record : records) {                String message = record.getValue().get("message");                System.out.println("Consumed message: " + message);                // 处理消息的逻辑                redisTemplate.opsForStream().acknowledge(STREAM_KEY, "consumer_group", record.getId());            }        }    }}

总结

  • Redis List:实现简单,适合对消息处理顺序有严格要求、不需要复杂消息确认机制的场景。
  • Redis Stream:功能更强大,支持持久化、多消费者组、消息确认等,适合高并发、高吞吐、需要消息可靠处理的场景。

零失误方案

方案一:消息重试和死信队列

对于处理失败的消息,进行重试可以避免消息丢失,同时使用死信队列可以隔离无法处理的消息。以 RabbitMQ 为例。

代码实现

import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;@SpringBootApplication@EnableRabbitpublic class RabbitMQRetryApplication {    public static void main(String[] args) {        SpringApplication.run(RabbitMQRetryApplication.class, args);    }    // 定义队列和交换器    @Bean    public Queue testQueue() {        return QueueBuilder.durable("testQueue")                .withArgument("x-dead-letter-exchange", "dlxExchange")                .withArgument("x-dead-letter-routing-key", "dlxKey")                .build();    }    @Bean    public DirectExchange testExchange() {        return new DirectExchange("testExchange");    }    @Bean    public Binding testBinding() {        return BindingBuilder.bind(testQueue()).to(testExchange()).with("testKey");    }    @Bean    public Queue dlxQueue() {        return new Queue("dlxQueue", true);    }    @Bean    public DirectExchange dlxExchange() {        return new DirectExchange("dlxExchange");    }    @Bean    public Binding dlxBinding() {        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlxKey");    }    // 配置重试和死信队列    @Bean    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        factory.setRecoveryCallback(retryContext -> {            // 重试逻辑            return null;        });        factory.setErrorHandler(new RejectAndDontRequeueRecoverer());        return factory;    }    @RabbitListener(queues = "testQueue")    public void listen(String message) {        try {            // 处理消息的逻辑            if (Math.random() < 0.2) {                throw new RuntimeException("Simulated error");            }            System.out.println("Received message: " + message);        } catch (Exception e) {            throw e;        }    }    @RabbitListener(queues = "dlxQueue")    public void listenDlx(String message) {        System.out.println("Received dead letter message: " + message);    }}

方案而二:消费不了之后直接写入磁盘

方案三:消费不了直接放入到新的mq消息队列中

发表评论

长征号 Copyright © 2013-2024 长征号. All Rights Reserved.  sitemap