/** * 是否监听,默认不监听 */ @Value("${spring.kafka.kafkaListenerSwitch:false}") private Boolean kafkaListenerSwitch; /** * 消费者批量工程 */ @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); factory.setConcurrency(1); factory.setAutoStartup(kafkaListenerSwitch); logger.info("kafka监听开启状态:"+kafkaListenerSwitch); return factory; }
本文为胖虎原创文章,转载无需和我联系,但请注明来自胖虎博客panghucat.cn