This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.21.x by this push: new b494de6956c CAMEL-20044: fix to handle commit on breakOnFirstError (#11959) b494de6956c is described below commit b494de6956ca31121531759b9fca4b1c76efdc26 Author: Mike Barlotta <codesm...@users.noreply.github.com> AuthorDate: Wed Nov 15 12:08:28 2023 -0500 CAMEL-20044: fix to handle commit on breakOnFirstError (#11959) * fix to handle commit on breakOnFirstError * add asynch commit manager test * update breakOnFirstError JavaDoc in KafkaConfig * update Kafka Component doc * fixed checkstyle issues * added support for creating topics with multiple partitions * add 2 unit tests needing partitions * fix comment in KafkaRecordProcessor * change topic names * force partitions in test * fix var names * test for breakOnFirstError false and NOOP --------- Co-authored-by: Otavio Rodolfo Piske <angusyo...@gmail.com> --- .../camel-kafka/src/main/docs/kafka-component.adoc | 46 ++++- .../camel/component/kafka/KafkaConfiguration.java | 13 +- .../camel/component/kafka/KafkaFetchRecords.java | 6 +- .../consumer/support/KafkaRecordProcessor.java | 15 +- .../support/KafkaRecordProcessorFacade.java | 9 +- ...eakOnFirstErrorOffUsingKafkaManualCommitIT.java | 162 ++++++++++++++++++ .../KafkaBreakOnFirstErrorReplayOldMessagesIT.java | 190 +++++++++++++++++++++ .../KafkaBreakOnFirstErrorSeekIssueIT.java | 171 +++++++++++++++++++ ...stErrorWithBatchUsingAsynchCommitManagerIT.java | 155 +++++++++++++++++ ...irstErrorWithBatchUsingKafkaManualCommitIT.java | 172 +++++++++++++++++++ ...rrorWithBatchUsingKafkaManualCommitRetryIT.java | 176 +++++++++++++++++++ ...rstErrorWithBatchUsingSynchCommitManagerIT.java | 155 +++++++++++++++++ .../component/kafka/testutil/CamelKafkaUtil.java | 57 +++++++ .../ROOT/pages/camel-3x-upgrade-guide-3_21.adoc | 16 ++ 14 files changed, 1326 insertions(+), 17 deletions(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index f1ff83cf551..0c6423c810a 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -85,6 +85,24 @@ Camel components have. For advanced control a custom implementation of `org.apache.camel.component.kafka.PollExceptionStrategy` can be configured on the component level, which allows to control which exceptions causes which of the strategies above. +== Consumer error handling (advanced) + +By default Camel will poll using the *ERROR_HANDLER* to process exceptions. +How Camel handles a message that results in an exception can be altered using the `breakOnFirstError` attribute in the configuration. +Instead of continuing to poll next message, Camel will instead commit the offset so that the message that caused the exception will be retried. +This is similar to the *RETRY* polling strategy above. + +[source,java] +---- +KafkaComponent kafka = new KafkaComponent(); +kafka.setBreakOnFirstError(true); +... +camelContext.addComponent("kafka", kafka); +---- + +It is recommended that you read the section below "Using manual commit with Kafka consumer" to understand how `breakOnFirstError` +will work based on the `CommitManager` that is configured. + == Samples === Consuming messages from Kafka @@ -351,12 +369,15 @@ or on the endpoint, for example: [source,java] ---- KafkaComponent kafka = new KafkaComponent(); +kafka.setAutoCommitEnable(false); kafka.setAllowManualCommit(true); ... camelContext.addComponent("kafka", kafka); ---- -Then you can use the `KafkaManualCommit` from Java code such as a Camel `Processor`: +By default this will use the `NoopCommitManager` behind the scenes. In order to commit an offset you will +required you to use the `KafkaManualCommit` from Java code such as a Camel `Processor`: + [source,java] ---- public void process(Exchange exchange) { @@ -366,7 +387,7 @@ public void process(Exchange exchange) { } ---- -This will force a synchronous commit which will block until the commit is acknowledged on Kafka, or if it fails an exception is thrown. +The `KafkaManualCommit` will force a synchronous commit which will block until the commit is acknowledged on Kafka, or if it fails an exception is thrown. You can use an asynchronous commit as well by configuring the `KafkaManualCommitFactory` with the `DefaultKafkaManualAsyncCommitFactory` implementation. Then the commit will be done in the next consumer loop using the kafka asynchronous commit api. @@ -374,6 +395,27 @@ Then the commit will be done in the next consumer loop using the kafka asynchron If you want to use a custom implementation of `KafkaManualCommit` then you can configure a custom `KafkaManualCommitFactory` on the `KafkaComponent` that creates instances of your custom implementation. +When configuring a consumer to use manual commit and a specific `CommitManager` it is important to understand how these influence the behavior +of `breakOnFirstError` + +[source,java] +---- +KafkaComponent kafka = new KafkaComponent(); +kafka.setAutoCommitEnable(false); +kafka.setAllowManualCommit(true); +kafka.setBreakOnFirstError(true); +kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory()); +... +camelContext.addComponent("kafka", kafka); +---- + +When the `CommitManager` is left to the default `NoopCommitManager` then `breakOnFirstError` will not automatically commit the offset so that the +message with an error is retried. The consumer must manage that in the route using `KafkaManualCommit`. + + +When the `CommitManager` is changed to either the synch or asynch manager then `breakOnFirstError` will automatically commit the offset so that the +message with an error is retried. This message will be continually retried until it can be processed without an error. + *Note 1*: records from a partition must be processed and committed by the same thread as the consumer. This means that certain EIPs, async or concurrent operations in the DSL, may cause the commit to fail. In such circumstances, tyring to commit the transaction will cause the Kafka client to throw a `java.util.ConcurrentModificationException` exception with the message `KafkaConsumer is not safe for multi-threaded access`. To prevent this from happening, redesign your route to avoid those operations. diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 38ee58318a8..cd279679980 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -819,10 +819,15 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware /** * This options controls what happens when a consumer is processing an exchange and it fails. If the option is * <tt>false</tt> then the consumer continues to the next message and processes it. If the option is <tt>true</tt> - * then the consumer breaks out, and will seek back to offset of the message that caused a failure, and then - * re-attempt to process this message. However this can lead to endless processing of the same message if its bound - * to fail every time, eg a poison message. Therefore its recommended to deal with that for example by using Camel's - * error handler. + * then the consumer breaks out. + * + * Using the default NoopCommitManager will cause the consumer to not commit the offset so that the message is + * re-attempted. The consumer should use the KafkaManualCommit to determine the best way to handle the message. + * + * Using either the SynchCommitManager or the AsynchCommitManager the consumer will seek back to the offset of the + * message that caused a failure, and then re-attempt to process this message. However this can lead to endless + * processing of the same message if its bound to fail every time, eg a poison message. Therefore its recommended to + * deal with that for example by using Camel's error handler. */ public void setBreakOnFirstError(boolean breakOnFirstError) { this.breakOnFirstError = breakOnFirstError; diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index a213dbc1fe8..e13f7d86e99 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -332,7 +332,6 @@ public class KafkaFetchRecords implements Runnable { LOG.trace("This polling iteration is using lastresult on partition {} and offset {}", lastResult.getPartition(), lastResult.getPartitionLastOffset()); } - } else { if (LOG.isTraceEnabled()) { LOG.trace("This polling iteration is using lastresult of null"); @@ -346,7 +345,6 @@ public class KafkaFetchRecords implements Runnable { LOG.trace("This polling iteration had a result returned for partition {} and offset {}", result.getPartition(), result.getPartitionLastOffset()); } - } else { if (LOG.isTraceEnabled()) { LOG.trace("This polling iteration had a result returned as null"); @@ -354,7 +352,7 @@ public class KafkaFetchRecords implements Runnable { } updateTaskState(); - if (result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) { + if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) { LOG.debug("We hit an error ... setting flags to force reconnect"); // force re-connect setReconnect(true); @@ -363,7 +361,7 @@ public class KafkaFetchRecords implements Runnable { lastResult = result; if (LOG.isTraceEnabled()) { - LOG.trace("setting lastresult to partition {} and offset {}", + LOG.trace("Setting lastresult to partition {} and offset {}", lastResult.getPartition(), lastResult.getPartitionLastOffset()); } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java index 97875b097f1..e731e4c4763 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java @@ -63,7 +63,7 @@ public class KafkaRecordProcessor { message.setHeader(KafkaConstants.KEY, record.key()); } - LOG.debug("setting up the exchange for message from partition {} and offset {}", + LOG.debug("Setting up the exchange for message from partition {} and offset {}", record.partition(), record.offset()); message.setBody(record.value()); @@ -115,13 +115,11 @@ public class KafkaRecordProcessor { exchange.setException(e); } if (exchange.getException() != null) { - LOG.debug("An exception was thrown for record at partition {} and offset {}", record.partition(), record.offset()); boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult, exceptionHandler); - return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true); } else { return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null); @@ -135,7 +133,6 @@ public class KafkaRecordProcessor { // processing failed due to an unhandled exception, what should we do if (configuration.isBreakOnFirstError()) { - if (lastResult.getPartition() != -1 && lastResult.getPartition() != record.partition()) { LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " + @@ -150,13 +147,19 @@ public class KafkaRecordProcessor { LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", - lastResult.getPartitionLastOffset(), lastResult.getPartition()); + record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { - commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); + // we should just do a commit (vs the original forceCommit) + // when route uses NOOP Commit Manager it will rely + // on the route implementation to explicitly commit offset + // when route uses Synch/Asynch Commit Manager it will + // ALWAYS commit the offset for the failing record + // and will ALWAYS retry it + commitManager.commit(topicPartition); } // continue to next partition diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java index 134246891fb..44573daa60d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java @@ -61,12 +61,16 @@ public class KafkaRecordProcessorFacade { Set<TopicPartition> partitions = allRecords.partitions(); Iterator<TopicPartition> partitionIterator = partitions.iterator(); + LOG.debug("Poll received records on {} partitions", partitions.size()); + ProcessingResult lastResult = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll; while (partitionIterator.hasNext() && !isStopping()) { TopicPartition partition = partitionIterator.next(); + LOG.debug("Processing records on partition {}", partition.partition()); + List<ConsumerRecord<Object, Object>> partitionRecords = allRecords.records(partition); Iterator<ConsumerRecord<Object, Object>> recordIterator = partitionRecords.iterator(); @@ -75,10 +79,13 @@ public class KafkaRecordProcessorFacade { while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) { ConsumerRecord<Object, Object> record = recordIterator.next(); + LOG.debug("Processing record on partition {} with offset {}", record.partition(), record.offset()); + lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult, kafkaRecordProcessor, record); - LOG.debug("Processed record on partition {} and offset {} and got result for partition {} and offset {}", + LOG.debug( + "Processed record on partition {} with offset {} and got ProcessingResult for partition {} and offset {}", record.partition(), record.offset(), lastResult.getPartition(), lastResult.getPartitionLastOffset()); if (consumerListener != null) { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java new file mode 100644 index 00000000000..cd0d48e2394 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * this will test basic breakOnFirstError functionality when it is turned off and consumer uses allowManualCommit and + * KafkaManualCommit and NOOP Commit Manager + */ +class KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT extends BaseEmbeddedKafkaTestSupport { + public static final String ROUTE_ID = "breakOnFirstErrorOff"; + public static final String TOPIC = "breakOnFirstErrorOff"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.class); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=breakOnFirstErrorOff" + + "&autoOffsetReset=earliest" + + "&autoCommitEnable=false" + + "&allowManualCommit=true" + // set BOFE to false + + "&breakOnFirstError=false" + + "&maxPollRecords=1" + + "&pollTimeoutMs=1000" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + /** + * will continue to retry the message that is in error + */ + @Test + public void kafkaBreakOnFirstErrorBasicCapability() throws Exception { + to.reset(); + to.expectedMessageCount(4); + + // message-3 and message-4 were never committed + // by the consumer route + // but we moved past them anyway + // because breakOnFirstError was false + // then when we encounter a successful message + // we commit that one and keep going + to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-5"); + + context.getRouteController().stopRoute(ROUTE_ID); + + this.publishMessagesToKafka(); + + context.getRouteController().startRoute(ROUTE_ID); + + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .until(() -> to.getExchanges().size() > 3); + + to.assertIsSatisfied(3000); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + + from(from) + .routeId(ROUTE_ID) + .process(exchange -> { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true)); + }) + .process(exchange -> { + ifIsPayloadWithErrorThrowException(exchange); + }) + .to(to) + .process(exchange -> { + doCommitOffset(exchange); + }); + } + }; + } + + private void publishMessagesToKafka() { + for (int i = 0; i < 6; i++) { + String msg = "message-" + i; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, msg); + producer.send(data); + } + } + + private void doCommitOffset(Exchange exchange) { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, true)); + KafkaManualCommit manual = exchange.getMessage() + .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + assertNotNull(manual); + manual.commit(); + } + + private void ifIsPayloadWithErrorThrowException(Exchange exchange) { + String payload = exchange.getMessage().getBody(String.class); + if (payload.equals("message-3") || payload.equals("message-4")) { + throw new RuntimeException("ERROR TRIGGERED BY TEST"); + } + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java new file mode 100644 index 00000000000..e8ae0f732cc --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; + +/** + * this will test breakOnFirstError functionality and the issue that was surfaced in CAMEL-20044 regarding incorrectly + * handling the offset commit resulting in replaying messages + * + * mimics the reproduction of the problem in https://github.com/CodeSmell/CamelKafkaOffset + */ +class KafkaBreakOnFirstErrorReplayOldMessagesIT extends BaseEmbeddedKafkaTestSupport { + + public static final String ROUTE_ID = "breakOnFirstError-20044"; + public static final String TOPIC = "breakOnFirstError-20044"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=KafkaBreakOnFirstErrorIT" + + "&autoOffsetReset=earliest" + + "&autoCommitEnable=false" + + "&allowManualCommit=true" + + "&breakOnFirstError=true" + + "&maxPollRecords=1" + // here multiple threads was an issue + + "&consumersCount=3" + + "&pollTimeoutMs=1000" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeAll + public static void setupTopic() { + AdminClient kafkaAdminClient = createAdminClient(service); + + // create the topic w/ 3 partitions + final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1); + kafkaAdminClient.createTopics(Collections.singleton(mytopic)); + } + + @BeforeEach + public void init() { + + // setup the producer + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + @Test + void testCamel20044TestFix() throws Exception { + to.reset(); + to.expectedMessageCount(13); + to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR", + "6", "7", "ERROR", "8", "9", "10", "11"); + + context.getRouteController().stopRoute(ROUTE_ID); + + this.publishMessagesToKafka(); + + context.getRouteController().startRoute(ROUTE_ID); + + // let test run for awhile + Awaitility.await() + .timeout(10, TimeUnit.SECONDS) + .pollDelay(8, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(true)); + + to.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + onException(RuntimeException.class) + .handled(false) + .process(exchange -> { + doCommitOffset(exchange); + }) + .end(); + + from(from) + .routeId(ROUTE_ID) + .autoStartup(false) + .process(exchange -> { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true)); + }) + // capturing all of the payloads + .to(to) + .process(exchange -> { + ifIsPayloadWithErrorThrowException(exchange); + }) + .process(exchange -> { + doCommitOffset(exchange); + }) + .end(); + } + }; + } + + private void ifIsPayloadWithErrorThrowException(Exchange exchange) { + String payload = exchange.getMessage().getBody(String.class); + if (payload.equals("ERROR")) { + throw new RuntimeException("NON RETRY ERROR TRIGGERED BY TEST"); + } + } + + private void publishMessagesToKafka() { + final List<String> producedRecords = List.of("1", "2", "3", "4", "5", "ERROR", + "6", "7", "ERROR", "8", "9", "10", "11"); + + producedRecords.forEach(v -> { + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, v); + producer.send(data); + }); + + } + + private void doCommitOffset(Exchange exchange) { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, true)); + KafkaManualCommit manual = exchange.getMessage() + .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + if (Objects.nonNull(manual)) { + manual.commit(); + } else { + LOG.error("KafkaManualCommit is MISSING"); + } + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java new file mode 100644 index 00000000000..b1ebc51a8a6 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; + +/** + * this will test breakOnFirstError functionality and the issue that was surfaced in CAMEL-19894 regarding failure to + * correctly commit the offset in a batch using the Synch Commit Manager + * + * mimics the reproduction of the problem in https://github.com/Krivda/camel-bug-reproduction + */ +class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport { + + public static final String ROUTE_ID = "breakOnFirstError-19894"; + public static final String TOPIC = "breakOnFirstError-19894"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorSeekIssueIT.class); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=KafkaBreakOnFirstErrorIT" + + "&autoOffsetReset=earliest" + + "&autoCommitEnable=false" + + "&allowManualCommit=true" + + "&breakOnFirstError=true" + + "&maxPollRecords=8" + + "&pollTimeoutMs=1000" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeAll + public static void setupTopic() { + AdminClient kafkaAdminClient = createAdminClient(service); + + // create the topic w/ 2 partitions + final NewTopic mytopic = new NewTopic(TOPIC, 2, (short) 1); + kafkaAdminClient.createTopics(Collections.singleton(mytopic)); + } + + @BeforeEach + public void init() { + + // setup the producer + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + @Test + void testCamel19894TestFix() throws Exception { + to.reset(); + // will consume the payloads from partition 0 + // and will continually retry the payload with "5" + to.expectedMessageCount(4); + to.expectedBodiesReceived("1", "2", "3", "4"); + + context.getRouteController().stopRoute(ROUTE_ID); + + this.publishMessagesToKafka(); + + context.getRouteController().startRoute(ROUTE_ID); + + // let test run for awhile + Awaitility.await() + .timeout(10, TimeUnit.SECONDS) + .pollDelay(8, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(true)); + + // the replaying of the message with an error + // will prevent other paylods from being + // processed + to.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from(from) + .routeId(ROUTE_ID) + .autoStartup(false) + .process(exchange -> { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true)); + }) + .process(exchange -> { + ifIsFifthRecordThrowException(exchange); + }) + .to(to) + .end(); + } + }; + } + + private void ifIsFifthRecordThrowException(Exchange e) { + if (e.getMessage().getBody().equals("5")) { + throw new RuntimeException("ERROR_TRIGGERED_BY_TEST"); + } + } + + private void publishMessagesToKafka() { + final List<String> producedRecordsPartition1 = List.of("5", "6", "7", "8", "9", "10", "11"); + final List<String> producedRecordsPartition0 = List.of("1", "2", "3", "4"); + + producedRecordsPartition1.forEach(v -> { + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 1, null, null, v); + producer.send(data); + }); + + producedRecordsPartition0.forEach(v -> { + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, 0, null, null, v); + producer.send(data); + }); + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java new file mode 100644 index 00000000000..6ea0a467902 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +/** + * this will test basic breakOnFirstError functionality uses allowManualCommit and set Synch Commit Manager this allows + * Camel to handle when to commit an offset + */ +class KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT extends BaseEmbeddedKafkaTestSupport { + public static final String ROUTE_ID = "breakOnFirstErrorBatchIT"; + public static final String TOPIC = "breakOnFirstErrorBatchIT"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.class); + + private final List<String> errorPayloads = new CopyOnWriteArrayList<>(); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=KafkaBreakOnFirstErrorIT" + + "&autoOffsetReset=earliest" + + "&autoCommitEnable=false" + + "&allowManualCommit=true" + + "&breakOnFirstError=true" + + "&maxPollRecords=3" + + "&pollTimeoutMs=1000" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + // asynch commit factory + + "&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + /** + * will continue to retry the message that is in error + */ + @Test + public void kafkaBreakOnFirstErrorBasicCapability() throws Exception { + to.reset(); + to.expectedMessageCount(3); + // message-3 causes an error + // and breakOnFirstError will cause it to be retried forever + // we will never get to message-4 + to.expectedBodiesReceived("message-0", "message-1", "message-2"); + + context.getRouteController().stopRoute(ROUTE_ID); + + this.publishMessagesToKafka(); + + context.getRouteController().startRoute(ROUTE_ID); + + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .until(() -> errorPayloads.size() > 3); + + to.assertIsSatisfied(); + + for (String payload : errorPayloads) { + assertEquals("message-3", payload); + } + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from(from) + .routeId(ROUTE_ID) + .process(exchange -> { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true)); + }) + .process(exchange -> { + ifIsPayloadWithErrorThrowException(exchange); + }) + .to(to) + .end(); + } + }; + } + + private void publishMessagesToKafka() { + for (int i = 0; i < 5; i++) { + String msg = "message-" + i; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, msg); + producer.send(data); + } + } + + private void ifIsPayloadWithErrorThrowException(Exchange exchange) { + String payload = exchange.getMessage().getBody(String.class); + if (payload.equals("message-3")) { + errorPayloads.add(payload); + throw new RuntimeException("ERROR TRIGGERED BY TEST"); + } + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java new file mode 100644 index 00000000000..61b2b894682 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * this will test basic breakOnFirstError functionality uses allowManualCommit and KafkaManualCommit because relying on + * Camel default to use NOOP Commit Manager this means the route implementation MUST manage all offset commits + */ +class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT extends BaseEmbeddedKafkaTestSupport { + public static final String ROUTE_ID = "breakOnFirstErrorBatchOnExceptionIT"; + public static final String TOPIC = "breakOnFirstErrorBatchOnExceptionIT"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.class); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=KafkaBreakOnFirstErrorIT" + + "&autoOffsetReset=earliest" + + "&autoCommitEnable=false" + + "&allowManualCommit=true" + + "&breakOnFirstError=true" + + "&maxPollRecords=3" + + "&pollTimeoutMs=1000" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + /** + * will continue to retry the message that is in error + */ + @Test + public void kafkaBreakOnFirstErrorBasicCapability() throws Exception { + to.reset(); + to.expectedMessageCount(7); + + // old behavior before the fix in CAMEL-20044 + // message-3 causes an error + // and breakOnFirstError will cause it to be retried 1x + // then we move on + //to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-3", "message-4", "message-5"); + + // new behavior w/ NOOP Commit Manager + to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4", "message-5"); + + this.publishMessagesToKafka(); + + context.getRouteController().stopRoute(ROUTE_ID); + context.getRouteController().startRoute(ROUTE_ID); + + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .until(() -> to.getExchanges().size() > 5); + + to.assertIsSatisfied(3000); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + onException(Exception.class) + .handled(false) + // adding error message to end + // so we can account for it + .to(to) + .process(exchange -> { + // if we don't commit + // camel will continuously + // retry the message with an error + doCommitOffset(exchange); + }); + + from(from) + .routeId(ROUTE_ID) + .process(exchange -> { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true)); + }) + .process(exchange -> { + ifIsPayloadWithErrorThrowException(exchange); + }) + .to(to) + .process(exchange -> { + doCommitOffset(exchange); + }); + } + }; + } + + private void publishMessagesToKafka() { + for (int i = 0; i < 6; i++) { + String msg = "message-" + i; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, msg); + producer.send(data); + } + } + + private void doCommitOffset(Exchange exchange) { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, true)); + KafkaManualCommit manual = exchange.getMessage() + .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + assertNotNull(manual); + manual.commit(); + } + + private void ifIsPayloadWithErrorThrowException(Exchange exchange) { + String payload = exchange.getMessage().getBody(String.class); + if (payload.equals("message-3")) { + throw new RuntimeException("ERROR TRIGGERED BY TEST"); + } + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java new file mode 100644 index 00000000000..39c1fa1c511 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * this will test basic breakOnFirstError functionality uses allowManualCommit and KafkaManualCommit because relying on + * Camel default to use NOOP Commit Manager this means the route implementation MUST manage all offset commits + * + * will demonstrate how to retry + */ +class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT extends BaseEmbeddedKafkaTestSupport { + + public static final String ROUTE_ID = "breakOnFirstErrorBatchRetryIT"; + public static final String TOPIC = "breakOnFirstErrorBatchRetryIT"; + + private static final Logger LOG + = LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.class); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=KafkaBreakOnFirstErrorIT" + + "&autoOffsetReset=earliest" + + "&autoCommitEnable=false" + + "&allowManualCommit=true" + + "&breakOnFirstError=true" + + "&maxPollRecords=3" + + "&pollTimeoutMs=1000" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + /** + * will continue to retry the message that is in error + */ + @Test + public void kafkaBreakOnFirstErrorBasicCapabilityRetry() throws Exception { + to.reset(); + + this.publishMessagesToKafka(); + + context.getRouteController().stopRoute(ROUTE_ID); + context.getRouteController().startRoute(ROUTE_ID); + + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .until(() -> to.getExchanges().size() > 7); + + assertFalse(to.getExchanges().stream() + .anyMatch(exc -> "message-4".equals(exc.getMessage().getBody(String.class)))); + + assertFalse(to.getExchanges().stream() + .anyMatch(exc -> "message-5".equals(exc.getMessage().getBody(String.class)))); + + assertTrue(to.getExchanges().stream() + .filter(exc -> "message-3".equals(exc.getMessage().getBody(String.class))) + .count() > 1); + + to.assertIsSatisfied(3000); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + onException(Exception.class) + .handled(false) + // adding error message to end + // so we can account for it + .to(to) + // we are not + // going to commit offset + // so will retry + .end(); + + from(from) + .routeId(ROUTE_ID) + .process(exchange -> { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true)); + }) + .process(exchange -> { + ifIsPayloadWithErrorThrowException(exchange); + }) + .to(to) + .process(exchange -> { + doCommitOffset(exchange); + }); + } + }; + } + + private void publishMessagesToKafka() { + for (int i = 0; i < 6; i++) { + String msg = "message-" + i; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, msg); + producer.send(data); + } + } + + private void doCommitOffset(Exchange exchange) { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, true)); + KafkaManualCommit manual = exchange.getMessage() + .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + assertNotNull(manual); + manual.commit(); + } + + private void ifIsPayloadWithErrorThrowException(Exchange exchange) { + String payload = exchange.getMessage().getBody(String.class); + if (payload.equals("message-3")) { + throw new RuntimeException("ERROR TRIGGERED BY TEST"); + } + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java new file mode 100644 index 00000000000..0fabfe01587 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +/** + * this will test basic breakOnFirstError functionality uses allowManualCommit and set Synch Commit Manager this allows + * Camel to handle when to commit an offset + */ +class KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT extends BaseEmbeddedKafkaTestSupport { + public static final String ROUTE_ID = "breakOnFirstErrorBatchIT"; + public static final String TOPIC = "breakOnFirstErrorBatchIT"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.class); + + private final List<String> errorPayloads = new CopyOnWriteArrayList<>(); + + @EndpointInject("kafka:" + TOPIC + + "?groupId=KafkaBreakOnFirstErrorIT" + + "&autoOffsetReset=earliest" + + "&autoCommitEnable=false" + + "&allowManualCommit=true" + + "&breakOnFirstError=true" + + "&maxPollRecords=3" + + "&pollTimeoutMs=1000" + + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + // synch commit factory + + "&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory" + + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + private Endpoint from; + + @EndpointInject("mock:result") + private MockEndpoint to; + + private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer; + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); + } + + @AfterEach + public void after() { + if (producer != null) { + producer.close(); + } + // clean all test topics + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); + } + + /** + * will continue to retry the message that is in error + */ + @Test + public void kafkaBreakOnFirstErrorBasicCapability() throws Exception { + to.reset(); + to.expectedMessageCount(3); + // message-3 causes an error + // and breakOnFirstError will cause it to be retried forever + // we will never get to message-4 + to.expectedBodiesReceived("message-0", "message-1", "message-2"); + + context.getRouteController().stopRoute(ROUTE_ID); + + this.publishMessagesToKafka(); + + context.getRouteController().startRoute(ROUTE_ID); + + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .until(() -> errorPayloads.size() > 3); + + to.assertIsSatisfied(); + + for (String payload : errorPayloads) { + assertEquals("message-3", payload); + } + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from(from) + .routeId(ROUTE_ID) + .process(exchange -> { + LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true)); + }) + .process(exchange -> { + ifIsPayloadWithErrorThrowException(exchange); + }) + .to(to) + .end(); + } + }; + } + + private void publishMessagesToKafka() { + for (int i = 0; i < 5; i++) { + String msg = "message-" + i; + ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, msg); + producer.send(data); + } + } + + private void ifIsPayloadWithErrorThrowException(Exchange exchange) { + String payload = exchange.getMessage().getBody(String.class); + if (payload.equals("message-3")) { + errorPayloads.add(payload); + throw new RuntimeException("ERROR TRIGGERED BY TEST"); + } + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/testutil/CamelKafkaUtil.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/testutil/CamelKafkaUtil.java new file mode 100644 index 00000000000..2acdb9746ab --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/testutil/CamelKafkaUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.testutil; + +import java.util.Objects; + +import org.apache.camel.Exchange; +import org.apache.camel.component.kafka.KafkaConstants; + +public final class CamelKafkaUtil { + + private CamelKafkaUtil() { + } + + public static String buildKafkaLogMessage(String msg, Exchange exchange, boolean includeBody) { + String eol = "\n"; + + StringBuilder sb = new StringBuilder(); + if (Objects.nonNull(msg)) { + sb.append(msg); + sb.append(eol); + } + + sb.append("Message consumed from "); + sb.append(exchange.getMessage().getHeader(KafkaConstants.TOPIC, String.class)); + sb.append(eol); + sb.append("The Partition:Offset is "); + sb.append(exchange.getMessage().getHeader(KafkaConstants.PARTITION, String.class)); + sb.append(":"); + sb.append(exchange.getMessage().getHeader(KafkaConstants.OFFSET, String.class)); + sb.append(eol); + sb.append("The Key is "); + sb.append(exchange.getMessage().getHeader(KafkaConstants.KEY, String.class)); + + if (includeBody) { + sb.append(eol); + sb.append(exchange.getMessage().getBody(String.class)); + } + + return sb.toString(); + } + +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc index 4e6f65b78b8..4fc6dacf057 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc @@ -4,6 +4,22 @@ This document is for helping you upgrade your Apache Camel application from Camel 3.x to 3.y. For example if you are upgrading Camel 3.0 to 3.2, then you should follow the guides from both 3.0 to 3.1 and 3.1 to 3.2. +== Upgrading Camel 3.21 to 3.21.3 + +=== camel-kafka + +The behavior for `breakOnFirstError` was altered as numerous issues were fixed. The behavior related to committing +the offset is now determined by the `CommitManager` that is configured. + +When the default `CommitManager` is used (`NoopCommitManager`) then no commit is performed. The route implementation will +be responsible for managing the offset using `KafkaManualCommit` to manage the retrying of the payload. + +When using the `SyncCommitManager` then the offset will be committed so that the payload is continually retried. This was +the behavior described in the documentation. + +When using the `AsyncCommitManager` then the offset will be committed so that the payload is continually retried. This was +the behavior described in the documentation. + == Upgrading Camel 3.20 to 3.21 === camel-core