[PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell opened a new pull request, #11959: URL: https://github.com/apache/camel/pull/11959 # Description This PR offers a fix for breakOnFirstError where the incorrect partition:offset when a forceCommit is done # Target This PR is off the camel-3.21.x branch # Tracking CAMEL-20044 CAMEL-19894 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1804324086 :star2: Thank you for your contribution to the Apache Camel project! :star2: :warning: Please note that the changes on this PR may be **tested automatically**. If necessary Apache Camel Committers may access logs and test results in the job summaries! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1388410012 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); Review Comment: this change to use record.offset instead of lastResult offset fixes CAMEL-19894 and CAMEL-20044 as it will correctly manage the offset. the lastResult will still occasionally get corrupted however. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1388411473 ## components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java: ## @@ -0,0 +1,146 @@ +package org.apache.camel.component.kafka.integration; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + + +/** + * this will test breakOnFirstError functionality + * and the issue that was surfaced in CAMEL-19894 + * regarding failure to correctly commit the offset + * in a batch using the Synch Commit Manager + * + * mimics the reproduction of the problem in + * https://github.com/Krivda/camel-bug-reproduction + */ +class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport { + Review Comment: I was adding integration tests for this fix There are 3 that are basic and show how breakOnFirstError will work Still working on adding IT for the 2 issues Note: need to create multiple partitions in the BaseEmbeddedKafkaTestSupport That doesn't seem to work The 2 basic tests and the 2 tests that were part of reproducing the issues in [CAMEL-20044](https://issues.apache.org/jira/browse/CAMEL-20044) and [CAMEL-19894](https://issues.apache.org/jira/browse/CAMEL-19894) are captured in this repo https://github.com/CodeSmell/CamelKafkaOffset -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1388412530 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: This is the key change in behavior It goes beyond a fix and gives route implementers ability to choose how breakOnFirstError should work depending on the commit manager they use -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1804335053 I am marking it as a draft just to avoid us from merging this inadvertently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1804451240 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1804451295 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1389114928 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: > This is the key change in behavior It goes beyond a fix and gives route implementers ability to choose how breakOnFirstError should work depending on the commit manager they use Thanks for the heads up. I am OK with this given the scope of the change. The important thing here is that it needs to be documented on both the component doc as well as on the upgrade guide. ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: > This is the key change in beh
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1805385597 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1805385569 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1805393172 [components-camel-kafka.log](https://github.com/apache/camel/files/13317850/components-camel-kafka.log) Attaching test logs from Github. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1389321489 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: Happy to do that. Can you point me to where to update those -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1389342201 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: The following docs need to be adjusted: * [The Camel 3.21 upgrade guide](https://github.com/apache/camel/blob/main/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc) * The [`breakOnFirstError`](https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java#L856-L866) documentation. * On the the 3.21 upgrade guide is OK to be very brief: just saying that the behavior for the `breakOnFirstError` has changed and how, and then tell that the users should consult the component documentation. You can complement the details about how it was changed, how it behaves, what to expect, etc on the `breakOnFirstError` documentation. If necessary, you can also add a section on the [The Kafka Component doc](https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/docs/kafka-component.adoc). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1805726128 > components-camel-kafka.log looks like checkstyle related issues also I see I need to add Apache license to the new test classes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1389613976 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: Updated docs here https://github.com/apache/camel/pull/11970 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1389620969 ## components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java: ## @@ -0,0 +1,146 @@ +package org.apache.camel.component.kafka.integration; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + + +/** + * this will test breakOnFirstError functionality + * and the issue that was surfaced in CAMEL-19894 + * regarding failure to correctly commit the offset + * in a batch using the Synch Commit Manager + * + * mimics the reproduction of the problem in + * https://github.com/Krivda/camel-bug-reproduction + */ +class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport { + Review Comment: At this point, they should still fail. Right? I noticed this one failed on my manual tests ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1806129992 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1806129965 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1806288578 Per request, moved the doc changes over from this PR - https://github.com/apache/camel/pull/11970 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1806761949 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1806761947 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1390232215 ## components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java: ## @@ -0,0 +1,146 @@ +package org.apache.camel.component.kafka.integration; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + + +/** + * this will test breakOnFirstError functionality + * and the issue that was surfaced in CAMEL-19894 + * regarding failure to correctly commit the offset + * in a batch using the Synch Commit Manager + * + * mimics the reproduction of the problem in + * https://github.com/Krivda/camel-bug-reproduction + */ +class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport { + Review Comment: That's correct. This test will fail as-is. I have not had a chance to look at the infra test to get the partitions increased. The same test is in my personal repo (link above). That was passing when I ran it with my local build of the camel-kafka component with these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1391339271 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: Thanks for adding the doc changes to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1808488277 > ### Components test results: > Total Tested Failed ❌Passed ✅ > 1 1 1 0 Adding the latest Github CI logs. Seems to still be complaining about checkstyle issues. [components-camel-kafka.log](https://github.com/apache/camel/files/13336893/components-camel-kafka.log) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1808814517 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1808814558 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1808818498 [components-camel-kafka.log](https://github.com/apache/camel/files/13339902/components-camel-kafka.log) Last build / test log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1392702916 ## components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java: ## @@ -149,14 +146,30 @@ private boolean processException( Exception exc = exchange.getException(); LOG.warn("Error during processing {} from topic: {} due to {}", exchange, topicPartition.topic(), exc.getMessage()); -LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", -lastResult.getPartitionLastOffset(), lastResult.getPartition()); +LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", +record.offset(), record.partition()); } // force commit, so we resume on next poll where we failed // except when the failure happened at the first message in a poll if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) { -commitManager.forceCommit(topicPartition, lastResult.getPartitionLastOffset()); +// the record we are processing had the error +// so we will force commit the offset prior +// this will enable the current desired behavior to +// retry the message 1 more time +// +// Note: without a more extensive look at handling of breakOnFirstError +// we will still need the lastResult so that we don't force +// retrying this message over and over +// commitManager.forceCommit(topicPartition, record.offset() - 1); + +// we should just do a commit (vs the original forceCommit) +// when route uses NOOP Commit Manager it will rely +// on the route implementation to explicitly commit offset +// when route uses Synch/Asynch Commit Manager it will +// ALWAYS commit the offset for the failing record +// and will ALWAYS retry it +commitManager.commit(topicPartition); } Review Comment: np -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1810440562 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1810440607 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1810523994 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1810523853 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1810713033 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1810713063 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1810775210 [test-logs.zip](https://github.com/apache/camel/files/13352613/test-logs.zip) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1393023227 ## components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; + +/** + * this will test breakOnFirstError functionality and the issue that was surfaced in CAMEL-20044 regarding incorrectly + * handling the offset commit resulting in replaying messages + * + * mimics the reproduction of the problem in https://github.com/CodeSmell/CamelKafkaOffset + */ +class KafkaBreakOnFirstErrorReplayOldMessagesIT extends BaseEmbeddedKafkaTestSupport { + +public static final String ROUTE_ID = "breakOnFirstError-20044"; +public static final String TOPIC = "breakOnFirstError-20044"; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class); + +@EndpointInject("kafka:" + TOPIC ++ "?groupId=KafkaBreakOnFirstErrorIT" ++ "&autoOffsetReset=earliest" ++ "&autoCommitEnable=false" ++ "&allowManualCommit=true" ++ "&breakOnFirstError=true" ++ "&maxPollRecords=1" +// here multiple threads was an issue ++ "&consumersCount=3" ++ "&pollTimeoutMs=1000" ++ "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" ++ "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" ++ "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") +private Endpoint from; + +@EndpointInject("mock:result") +private MockEndpoint to; + +private org.apache.kafka.clients.producer.KafkaProducer producer; + +@BeforeAll +public static void setupTopic() { +AdminClient kafkaAdminClient = createAdminClient(service); + +// create the topic w/ 3 partitions +final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1); +kafkaAdminClient.createTopics(Collections.singleton(mytopic)); +} + +@BeforeEach +public void init() { + +// setup the producer +Properties props = getDefaultProperties(); +producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); +MockConsumerInterceptor.recordsCaptured.clear(); +} + +@AfterEach +public void after() { +if (producer != null) { +producer.close(); +} +// clean all test topics +kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); +} + +@Test +void testCamel20044TestFix() throws Exception { +to.reset(); +to.expectedMessageCount(13); +to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR", +"6", "7", "ERROR", "8", "9", "10", "11"); + +context.getRouteController().stopRoute(ROU
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1393025011 ## components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +/** + * this will test basic breakOnFirstError functionality uses allowManualCommit and set Synch Commit Manager this allows + * Camel to handle when to commit an offset + */ +class KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT extends BaseEmbeddedKafkaTestSupport { +public static final String ROUTE_ID = "breakOnFirstErrorBatchIT"; +public static final String TOPIC = "breakOnFirstErrorBatchIT"; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.class); + +private final List errorPayloads = new CopyOnWriteArrayList<>(); + +@EndpointInject("kafka:" + TOPIC ++ "?groupId=KafkaBreakOnFirstErrorIT" ++ "&autoOffsetReset=earliest" ++ "&autoCommitEnable=false" ++ "&allowManualCommit=true" ++ "&breakOnFirstError=true" ++ "&maxPollRecords=3" ++ "&pollTimeoutMs=1000" ++ "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" ++ "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" +// asynch commit factory ++ "&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory" ++ "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") +private Endpoint from; + +@EndpointInject("mock:result") +private MockEndpoint to; + +private org.apache.kafka.clients.producer.KafkaProducer producer; + +@BeforeEach +public void before() { +Properties props = getDefaultProperties(); +producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); +MockConsumerInterceptor.recordsCaptured.clear(); +} + +@AfterEach +public void after() { +if (producer != null) { +producer.close(); +} +// clean all test topics +kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); +} + +/** + * will continue to retry the message that is in error + */ +@Test +public void kafkaBreakOnFirstErrorBasicCapability() throws Exception { +to.reset(); +to.expectedMessageCount(3); +// message-3 causes an error +// and breakOnFirstError will cause it to be retried forever +// we will never get to message-4 +to.expectedBodiesReceived("message-0", "message-1", "message-2"); + +context.getRouteController().stopRoute(ROUTE_ID); + +this.publishMessagesToKafka(); + +context.getRouteController().startRoute(ROUTE_ID); + +Awaitility.await() +.atMost(3, TimeUnit.SECONDS) +.until(() -> errorPayloads.size() > 3); + +to.assertIsSatisfied(); + +for (String payload : erro
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1393078767 ## components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; + +/** + * this will test breakOnFirstError functionality and the issue that was surfaced in CAMEL-20044 regarding incorrectly + * handling the offset commit resulting in replaying messages + * + * mimics the reproduction of the problem in https://github.com/CodeSmell/CamelKafkaOffset + */ +class KafkaBreakOnFirstErrorReplayOldMessagesIT extends BaseEmbeddedKafkaTestSupport { + +public static final String ROUTE_ID = "breakOnFirstError-20044"; +public static final String TOPIC = "breakOnFirstError-20044"; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class); + +@EndpointInject("kafka:" + TOPIC ++ "?groupId=KafkaBreakOnFirstErrorIT" ++ "&autoOffsetReset=earliest" ++ "&autoCommitEnable=false" ++ "&allowManualCommit=true" ++ "&breakOnFirstError=true" ++ "&maxPollRecords=1" +// here multiple threads was an issue ++ "&consumersCount=3" ++ "&pollTimeoutMs=1000" ++ "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" ++ "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" ++ "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") +private Endpoint from; + +@EndpointInject("mock:result") +private MockEndpoint to; + +private org.apache.kafka.clients.producer.KafkaProducer producer; + +@BeforeAll +public static void setupTopic() { +AdminClient kafkaAdminClient = createAdminClient(service); + +// create the topic w/ 3 partitions +final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1); +kafkaAdminClient.createTopics(Collections.singleton(mytopic)); +} + +@BeforeEach +public void init() { + +// setup the producer +Properties props = getDefaultProperties(); +producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); +MockConsumerInterceptor.recordsCaptured.clear(); +} + +@AfterEach +public void after() { +if (producer != null) { +producer.close(); +} +// clean all test topics +kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); +} + +@Test +void testCamel20044TestFix() throws Exception { +to.reset(); +to.expectedMessageCount(13); +to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR", +"6", "7", "ERROR", "8", "9", "10", "11"); + +context.getRouteController().stopRoute(R
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
CodeSmell commented on code in PR #11959: URL: https://github.com/apache/camel/pull/11959#discussion_r1393081195 ## components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.MockConsumerInterceptor; +import org.apache.camel.component.kafka.consumer.KafkaManualCommit; +import org.apache.camel.component.kafka.testutil.CamelKafkaUtil; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; + +/** + * this will test breakOnFirstError functionality and the issue that was surfaced in CAMEL-20044 regarding incorrectly + * handling the offset commit resulting in replaying messages + * + * mimics the reproduction of the problem in https://github.com/CodeSmell/CamelKafkaOffset + */ +class KafkaBreakOnFirstErrorReplayOldMessagesIT extends BaseEmbeddedKafkaTestSupport { + +public static final String ROUTE_ID = "breakOnFirstError-20044"; +public static final String TOPIC = "breakOnFirstError-20044"; + +private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class); + +@EndpointInject("kafka:" + TOPIC ++ "?groupId=KafkaBreakOnFirstErrorIT" ++ "&autoOffsetReset=earliest" ++ "&autoCommitEnable=false" ++ "&allowManualCommit=true" ++ "&breakOnFirstError=true" ++ "&maxPollRecords=1" +// here multiple threads was an issue ++ "&consumersCount=3" ++ "&pollTimeoutMs=1000" ++ "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" ++ "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" ++ "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") +private Endpoint from; + +@EndpointInject("mock:result") +private MockEndpoint to; + +private org.apache.kafka.clients.producer.KafkaProducer producer; + +@BeforeAll +public static void setupTopic() { +AdminClient kafkaAdminClient = createAdminClient(service); + +// create the topic w/ 3 partitions +final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1); +kafkaAdminClient.createTopics(Collections.singleton(mytopic)); +} + +@BeforeEach +public void init() { + +// setup the producer +Properties props = getDefaultProperties(); +producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); +MockConsumerInterceptor.recordsCaptured.clear(); +} + +@AfterEach +public void after() { +if (producer != null) { +producer.close(); +} +// clean all test topics +kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all(); +} + +@Test +void testCamel20044TestFix() throws Exception { +to.reset(); +to.expectedMessageCount(13); +to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR", +"6", "7", "ERROR", "8", "9", "10", "11"); + +context.getRouteController().stopRoute(R
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1811134385 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1811134245 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 0 | 1 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1811284133 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1811284112 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1811506665 ### Components test results: | Total | Tested | Failed :x: | Passed :white_check_mark: | | --- | --- | --- | --- | | 1 | 1 | 1 | 0 | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
github-actions[bot] commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1811506701 :no_entry_sign: There are (likely) no changes in core core to be tested in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske commented on PR #11959: URL: https://github.com/apache/camel/pull/11959#issuecomment-1812569845 [camel-kafka-test.log](https://github.com/apache/camel/files/13365493/camel-kafka-test.log) [components-camel-kafka.log](https://github.com/apache/camel/files/13365495/components-camel-kafka.log) Strange, I don't see the failure ... but uploading it anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]
orpiske merged PR #11959: URL: https://github.com/apache/camel/pull/11959 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org