This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 616b50d4851e712ae563e337e7f12c155697b1c9 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Mon Feb 28 13:51:57 2022 +0100 CAMEL-15562: minor cleanups for the Kafka strategy --- .../resume/kafka/AbstractKafkaResumeStrategy.java | 37 ++++++++++++++-------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java index a779c00..079d936 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java @@ -87,7 +87,13 @@ public abstract class AbstractKafkaResumeStrategy<K, V> init(); } - private Properties createProducer(String bootstrapServers) { + /** + * Creates a basic string-based producer + * + * @param bootstrapServers the Kafka host + * @return A set of default properties for producing string-based key/pair records from Kafka + */ + public static Properties createProducer(String bootstrapServers) { Properties config = new Properties(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); @@ -96,15 +102,16 @@ public abstract class AbstractKafkaResumeStrategy<K, V> StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - // set up the producer to remove all batching on send, we want all sends - // to be fully synchronous - config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1"); - config.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0"); - return config; } - private Properties createConsumer(String bootstrapServers) { + /** + * Creates a basic string-based consumer + * + * @param bootstrapServers the Kafka host + * @return A set of default properties for consuming string-based key/pair records from Kafka + */ + public static Properties createConsumer(String bootstrapServers) { Properties config = new Properties(); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -124,13 +131,12 @@ public abstract class AbstractKafkaResumeStrategy<K, V> /** * Sends data to a topic - * - * @param topic the topic to send data to + * * @param message the message to send * @throws ExecutionException * @throws InterruptedException */ - public void produce(String topic, K key, V message) throws ExecutionException, InterruptedException { + public void produce(K key, V message) throws ExecutionException, InterruptedException { ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, message); errorCount = 0; @@ -151,7 +157,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V> LOG.debug("Updating offset on Kafka with key {} to {}", key, offsetValue); - produce(topic, key, offsetValue); + produce(key, offsetValue); resumeCache.add(key, offsetValue); } @@ -231,9 +237,14 @@ public abstract class AbstractKafkaResumeStrategy<K, V> public abstract void subscribe() throws Exception; - // TODO: bad method public void unsubscribe() { - consumer.unsubscribe(); + try { + consumer.unsubscribe(); + } catch (IllegalStateException e) { + LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", topic); + } catch (Exception e) { + LOG.error("Error unsubscribing from the Kafka topic {}: {}", topic, e.getMessage(), e); + } } /**
