orpiske commented on code in PR #11935:
URL: https://github.com/apache/camel/pull/11935#discussion_r1386300217


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java:
##########
@@ -77,6 +77,9 @@ public ProcessingResult processPolledRecords(
 
                 lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
                         kafkaRecordProcessor, record);
+                
+                LOG.debug("processed record on partition {} and offset {} and 
got result for partition {} and offset {}",

Review Comment:
   Same note.



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##########
@@ -112,30 +115,56 @@ public ProcessingResult processExchange(
             exchange.setException(e);
         }
         if (exchange.getException() != null) {
-            boolean breakOnErrorExit = processException(exchange, partition, 
lastResult.getPartitionLastOffset(),
+            
+            LOG.debug("an exception was thrown for record at partition {} and 
offset {}",

Review Comment:
   Minor thing for consistency: capitalize the first letter of the log message. 
This is pattern we use throughout the code base.



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##########
@@ -112,30 +115,56 @@ public ProcessingResult processExchange(
             exchange.setException(e);
         }
         if (exchange.getException() != null) {
-            boolean breakOnErrorExit = processException(exchange, partition, 
lastResult.getPartitionLastOffset(),
+            
+            LOG.debug("an exception was thrown for record at partition {} and 
offset {}",
+                    record.partition(), record.offset());
+            
+            boolean breakOnErrorExit = processException(exchange, 
topicPartition, record, lastResult,
                     exceptionHandler);
-            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartitionLastOffset(), true);
+            
+            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
         } else {
-            return new ProcessingResult(false, record.offset(), 
exchange.getException() != null);
+            return new ProcessingResult(false, record.partition(), 
record.offset(), exchange.getException() != null);
         }
     }
 
     private boolean processException(
-            Exchange exchange, TopicPartition partition, long 
partitionLastOffset,
+            Exchange exchange, TopicPartition topicPartition, 
+            ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
             ExceptionHandler exceptionHandler) {
 
         // processing failed due to an unhandled exception, what should we do
         if (configuration.isBreakOnFirstError()) {
+            
+            if (lastResult.getPartition() != -1 &&
+                lastResult.getPartition() != record.partition()) {
+                LOG.error("about to process an exception with UNEXPECTED 
partition & offset. Got topic partition {}. " + 

Review Comment:
   Same note.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to