[PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


CodeSmell opened a new pull request, #11959:
URL: https://github.com/apache/camel/pull/11959

   # Description
   
   This PR offers a fix for breakOnFirstError where the incorrect 
partition:offset when a forceCommit is done 
   
   
   # Target
   
   This PR is off the camel-3.21.x branch
   
   # Tracking
   CAMEL-20044 
   CAMEL-19894


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1804324086

   :star2: Thank you for your contribution to the Apache Camel project! :star2: 
   
   :warning: Please note that the changes on this PR may be **tested 
automatically**. 
   
   If necessary Apache Camel Committers may access logs and test results in the 
job summaries!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1388410012


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);

Review Comment:
   this change to use record.offset instead of lastResult offset fixes 
CAMEL-19894 and CAMEL-20044 as it will correctly manage the offset. the 
lastResult will still occasionally get corrupted however.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1388411473


##
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java:
##
@@ -0,0 +1,146 @@
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * this will test breakOnFirstError functionality
+ * and the issue that was surfaced in CAMEL-19894
+ * regarding failure to correctly commit the offset
+ * in a batch using the Synch Commit Manager
+ * 
+ *  mimics the reproduction of the problem in 
+ *  https://github.com/Krivda/camel-bug-reproduction
+ */
+class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport {
+

Review Comment:
   I was adding integration tests for this fix
   There are 3 that are basic and show how breakOnFirstError will work
   
   Still working on adding IT for the 2 issues
   Note: need to create multiple partitions in the BaseEmbeddedKafkaTestSupport 
   That doesn't seem to work
   
   
   The 2 basic tests and the 2 tests that were part of reproducing the issues 
in [CAMEL-20044](https://issues.apache.org/jira/browse/CAMEL-20044) and 
[CAMEL-19894](https://issues.apache.org/jira/browse/CAMEL-19894) are captured 
in this repo
   
   https://github.com/CodeSmell/CamelKafkaOffset



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1388412530


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   This is the key change in behavior
   It goes beyond a fix and gives route implementers ability to choose how 
breakOnFirstError should work depending on the commit manager they use



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


orpiske commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1804335053

   I am marking it as a draft just to avoid us from merging this inadvertently. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1804451240

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-09 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1804451295

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


orpiske commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1389114928


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   > This is the key change in behavior It goes beyond a fix and gives route 
implementers ability to choose how breakOnFirstError should work depending on 
the commit manager they use
   
   Thanks for the heads up. 
   
   I am OK with this given the scope of the change. The important thing here is 
that it needs to be documented on both the component doc as well as on the 
upgrade guide. 
   
   



##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   > This is the key change in beh

Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1805385597

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1805385569

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


orpiske commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1805393172

   
[components-camel-kafka.log](https://github.com/apache/camel/files/13317850/components-camel-kafka.log)
   
   
   Attaching test logs from Github. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1389321489


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   Happy to do that. Can you point me to where to update those



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


orpiske commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1389342201


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   The following docs need to be adjusted:
   
   * [The Camel 3.21 upgrade 
guide](https://github.com/apache/camel/blob/main/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_21.adoc)
   * The 
[`breakOnFirstError`](https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java#L856-L866)
 documentation.
   *  
   
   On the the 3.21 upgrade guide is OK to be very brief: just saying that the 
behavior for the `breakOnFirstError` has changed and how, and then tell that 
the users should consult the component documentation. 
   
   You can complement the details about how it was changed, how it behaves, 
what to expect, etc on the `breakOnFirstError` documentation. If necessary, you 
can also add a section on the [The Kafka Component 
doc](https://github.com/apache/camel/blob/main/components/camel-kafka/src/main/docs/kafka-component.adoc).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


CodeSmell commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1805726128

   > components-camel-kafka.log
   
   looks like checkstyle related issues
   also I see I need to add Apache license to the new test classes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1389613976


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   Updated docs here
   https://github.com/apache/camel/pull/11970



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


orpiske commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1389620969


##
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java:
##
@@ -0,0 +1,146 @@
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * this will test breakOnFirstError functionality
+ * and the issue that was surfaced in CAMEL-19894
+ * regarding failure to correctly commit the offset
+ * in a batch using the Synch Commit Manager
+ * 
+ *  mimics the reproduction of the problem in 
+ *  https://github.com/Krivda/camel-bug-reproduction
+ */
+class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport {
+

Review Comment:
   At this point, they should still fail. Right? I noticed this one failed on 
my manual tests ... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1806129992

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1806129965

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-10 Thread via GitHub


CodeSmell commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1806288578

   Per request, moved the doc changes over from this PR
   - https://github.com/apache/camel/pull/11970


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-11 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1806761949

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-11 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1806761947

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-11 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1390232215


##
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java:
##
@@ -0,0 +1,146 @@
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * this will test breakOnFirstError functionality
+ * and the issue that was surfaced in CAMEL-19894
+ * regarding failure to correctly commit the offset
+ * in a batch using the Synch Commit Manager
+ * 
+ *  mimics the reproduction of the problem in 
+ *  https://github.com/Krivda/camel-bug-reproduction
+ */
+class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport {
+

Review Comment:
   That's correct. This test will fail as-is. 
   I have not had a chance to look at the infra test to get the partitions 
increased. 
   The same test is in my personal repo (link above). That was passing when I 
ran it with my local build of the camel-kafka component with these changes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-13 Thread via GitHub


orpiske commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1391339271


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   Thanks for adding the doc changes to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-13 Thread via GitHub


orpiske commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1808488277

   > ### Components test results:
   > Total  Tested  Failed ❌Passed ✅
   > 1  1   1   0
   
   
   Adding the latest Github CI logs. Seems to still be complaining about 
checkstyle issues.
   
   
[components-camel-kafka.log](https://github.com/apache/camel/files/13336893/components-camel-kafka.log)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-13 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1808814517

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-13 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1808814558

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-13 Thread via GitHub


orpiske commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1808818498

   
[components-camel-kafka.log](https://github.com/apache/camel/files/13339902/components-camel-kafka.log)
   
   Last build / test log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1392702916


##
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##
@@ -149,14 +146,30 @@ private boolean processException(
 Exception exc = exchange.getException();
 LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
 exc.getMessage());
-LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
-lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
+LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.", 
+record.offset(), record.partition());
 }
 
 // force commit, so we resume on next poll where we failed 
 // except when the failure happened at the first message in a poll
 if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
-commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
+// the record we are processing had the error 
+// so we will force commit the offset prior 
+// this will enable the current desired behavior to 
+// retry the message 1 more time
+//
+// Note: without a more extensive look at handling of 
breakOnFirstError
+// we will still need the lastResult so that we don't force 
+// retrying this message over and over
+// commitManager.forceCommit(topicPartition, record.offset() - 
1);
+
+// we should just do a commit (vs the original forceCommit)
+// when route uses NOOP Commit Manager it will rely
+// on the route implementation to explicitly commit offset
+// when route uses Synch/Asynch Commit Manager it will 
+// ALWAYS commit the offset for the failing record
+// and will ALWAYS retry it
+commitManager.commit(topicPartition);
 }

Review Comment:
   np



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1810440562

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1810440607

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1810523994

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1810523853

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1810713033

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1810713063

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


orpiske commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1810775210

   [test-logs.zip](https://github.com/apache/camel/files/13352613/test-logs.zip)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


orpiske commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1393023227


##
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * this will test breakOnFirstError functionality and the issue that was 
surfaced in CAMEL-20044 regarding incorrectly
+ * handling the offset commit resulting in replaying messages
+ * 
+ * mimics the reproduction of the problem in 
https://github.com/CodeSmell/CamelKafkaOffset
+ */
+class KafkaBreakOnFirstErrorReplayOldMessagesIT extends 
BaseEmbeddedKafkaTestSupport {
+
+public static final String ROUTE_ID = "breakOnFirstError-20044";
+public static final String TOPIC = "breakOnFirstError-20044";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class);
+
+@EndpointInject("kafka:" + TOPIC
++ "?groupId=KafkaBreakOnFirstErrorIT"
++ "&autoOffsetReset=earliest"
++ "&autoCommitEnable=false"
++ "&allowManualCommit=true"
++ "&breakOnFirstError=true"
++ "&maxPollRecords=1"
+// here multiple threads was an issue
++ "&consumersCount=3"
++ "&pollTimeoutMs=1000"
++ 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
++ 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
++ 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+private Endpoint from;
+
+@EndpointInject("mock:result")
+private MockEndpoint to;
+
+private org.apache.kafka.clients.producer.KafkaProducer 
producer;
+
+@BeforeAll
+public static void setupTopic() {
+AdminClient kafkaAdminClient = createAdminClient(service);
+
+// create the topic w/ 3 partitions
+final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1);
+kafkaAdminClient.createTopics(Collections.singleton(mytopic));
+}
+
+@BeforeEach
+public void init() {
+
+// setup the producer
+Properties props = getDefaultProperties();
+producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+MockConsumerInterceptor.recordsCaptured.clear();
+}
+
+@AfterEach
+public void after() {
+if (producer != null) {
+producer.close();
+}
+// clean all test topics
+kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+}
+
+@Test
+void testCamel20044TestFix() throws Exception {
+to.reset();
+to.expectedMessageCount(13);
+to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR",
+"6", "7", "ERROR", "8", "9", "10", "11");
+
+context.getRouteController().stopRoute(ROU

Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


orpiske commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1393025011


##
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * this will test basic breakOnFirstError functionality uses allowManualCommit 
and set Synch Commit Manager this allows
+ * Camel to handle when to commit an offset
+ */
+class KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT extends 
BaseEmbeddedKafkaTestSupport {
+public static final String ROUTE_ID = "breakOnFirstErrorBatchIT";
+public static final String TOPIC = "breakOnFirstErrorBatchIT";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.class);
+
+private final List errorPayloads = new CopyOnWriteArrayList<>();
+
+@EndpointInject("kafka:" + TOPIC
++ "?groupId=KafkaBreakOnFirstErrorIT"
++ "&autoOffsetReset=earliest"
++ "&autoCommitEnable=false"
++ "&allowManualCommit=true"
++ "&breakOnFirstError=true"
++ "&maxPollRecords=3"
++ "&pollTimeoutMs=1000"
++ 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
++ 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+// asynch commit factory
++ 
"&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory"
++ 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+private Endpoint from;
+
+@EndpointInject("mock:result")
+private MockEndpoint to;
+
+private org.apache.kafka.clients.producer.KafkaProducer 
producer;
+
+@BeforeEach
+public void before() {
+Properties props = getDefaultProperties();
+producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+MockConsumerInterceptor.recordsCaptured.clear();
+}
+
+@AfterEach
+public void after() {
+if (producer != null) {
+producer.close();
+}
+// clean all test topics
+kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+}
+
+/**
+ * will continue to retry the message that is in error
+ */
+@Test
+public void kafkaBreakOnFirstErrorBasicCapability() throws Exception {
+to.reset();
+to.expectedMessageCount(3);
+// message-3 causes an error 
+// and breakOnFirstError will cause it to be retried forever
+// we will never get to message-4
+to.expectedBodiesReceived("message-0", "message-1", "message-2");
+
+context.getRouteController().stopRoute(ROUTE_ID);
+
+this.publishMessagesToKafka();
+
+context.getRouteController().startRoute(ROUTE_ID);
+
+Awaitility.await()
+.atMost(3, TimeUnit.SECONDS)
+.until(() -> errorPayloads.size() > 3);
+
+to.assertIsSatisfied();
+
+for (String payload : erro

Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1393078767


##
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * this will test breakOnFirstError functionality and the issue that was 
surfaced in CAMEL-20044 regarding incorrectly
+ * handling the offset commit resulting in replaying messages
+ * 
+ * mimics the reproduction of the problem in 
https://github.com/CodeSmell/CamelKafkaOffset
+ */
+class KafkaBreakOnFirstErrorReplayOldMessagesIT extends 
BaseEmbeddedKafkaTestSupport {
+
+public static final String ROUTE_ID = "breakOnFirstError-20044";
+public static final String TOPIC = "breakOnFirstError-20044";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class);
+
+@EndpointInject("kafka:" + TOPIC
++ "?groupId=KafkaBreakOnFirstErrorIT"
++ "&autoOffsetReset=earliest"
++ "&autoCommitEnable=false"
++ "&allowManualCommit=true"
++ "&breakOnFirstError=true"
++ "&maxPollRecords=1"
+// here multiple threads was an issue
++ "&consumersCount=3"
++ "&pollTimeoutMs=1000"
++ 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
++ 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
++ 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+private Endpoint from;
+
+@EndpointInject("mock:result")
+private MockEndpoint to;
+
+private org.apache.kafka.clients.producer.KafkaProducer 
producer;
+
+@BeforeAll
+public static void setupTopic() {
+AdminClient kafkaAdminClient = createAdminClient(service);
+
+// create the topic w/ 3 partitions
+final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1);
+kafkaAdminClient.createTopics(Collections.singleton(mytopic));
+}
+
+@BeforeEach
+public void init() {
+
+// setup the producer
+Properties props = getDefaultProperties();
+producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+MockConsumerInterceptor.recordsCaptured.clear();
+}
+
+@AfterEach
+public void after() {
+if (producer != null) {
+producer.close();
+}
+// clean all test topics
+kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+}
+
+@Test
+void testCamel20044TestFix() throws Exception {
+to.reset();
+to.expectedMessageCount(13);
+to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR",
+"6", "7", "ERROR", "8", "9", "10", "11");
+
+context.getRouteController().stopRoute(R

Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1393081195


##
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * this will test breakOnFirstError functionality and the issue that was 
surfaced in CAMEL-20044 regarding incorrectly
+ * handling the offset commit resulting in replaying messages
+ * 
+ * mimics the reproduction of the problem in 
https://github.com/CodeSmell/CamelKafkaOffset
+ */
+class KafkaBreakOnFirstErrorReplayOldMessagesIT extends 
BaseEmbeddedKafkaTestSupport {
+
+public static final String ROUTE_ID = "breakOnFirstError-20044";
+public static final String TOPIC = "breakOnFirstError-20044";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBreakOnFirstErrorReplayOldMessagesIT.class);
+
+@EndpointInject("kafka:" + TOPIC
++ "?groupId=KafkaBreakOnFirstErrorIT"
++ "&autoOffsetReset=earliest"
++ "&autoCommitEnable=false"
++ "&allowManualCommit=true"
++ "&breakOnFirstError=true"
++ "&maxPollRecords=1"
+// here multiple threads was an issue
++ "&consumersCount=3"
++ "&pollTimeoutMs=1000"
++ 
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
++ 
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
++ 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+private Endpoint from;
+
+@EndpointInject("mock:result")
+private MockEndpoint to;
+
+private org.apache.kafka.clients.producer.KafkaProducer 
producer;
+
+@BeforeAll
+public static void setupTopic() {
+AdminClient kafkaAdminClient = createAdminClient(service);
+
+// create the topic w/ 3 partitions
+final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1);
+kafkaAdminClient.createTopics(Collections.singleton(mytopic));
+}
+
+@BeforeEach
+public void init() {
+
+// setup the producer
+Properties props = getDefaultProperties();
+producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+MockConsumerInterceptor.recordsCaptured.clear();
+}
+
+@AfterEach
+public void after() {
+if (producer != null) {
+producer.close();
+}
+// clean all test topics
+kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+}
+
+@Test
+void testCamel20044TestFix() throws Exception {
+to.reset();
+to.expectedMessageCount(13);
+to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR",
+"6", "7", "ERROR", "8", "9", "10", "11");
+
+context.getRouteController().stopRoute(R

Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1811134385

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1811134245

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 0 | 1 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1811284133

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1811284112

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1811506665

   ### Components test results:
   
   | Total | Tested | Failed :x: | Passed :white_check_mark: | 
   | --- | --- | --- |  --- |
   | 1 | 1 | 1 | 0 |


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-14 Thread via GitHub


github-actions[bot] commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1811506701

   :no_entry_sign: There are (likely) no changes in core core to be tested in 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-15 Thread via GitHub


orpiske commented on PR #11959:
URL: https://github.com/apache/camel/pull/11959#issuecomment-1812569845

   
[camel-kafka-test.log](https://github.com/apache/camel/files/13365493/camel-kafka-test.log)
   
[components-camel-kafka.log](https://github.com/apache/camel/files/13365495/components-camel-kafka.log)
   
   Strange, I don't see the failure ... but uploading it anyway.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] CAMEL-20044 fix to handle commit on breakOnFirstError [camel]

2023-11-15 Thread via GitHub


orpiske merged PR #11959:
URL: https://github.com/apache/camel/pull/11959


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org