/**
* 是否监听,默认不监听
*/
@Value("${spring.kafka.kafkaListenerSwitch:false}")
private Boolean kafkaListenerSwitch;
/**
* 消费者批量工程
*/
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.setConcurrency(1);
factory.setAutoStartup(kafkaListenerSwitch);
logger.info("kafka监听开启状态:"+kafkaListenerSwitch);
return factory;
}本文为胖虎原创文章,转载无需和我联系,但请注明来自胖虎博客panghucat.cn