abhishekmjain commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1873718377
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -348,13 +348,15 @@ record = queue.take();
// Committed offset should always be the offset of the next record
to be read (hence +1)
partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
}
+ } catch (InterruptedException e) {
+ // Stop queue processor and return when encountered
InterruptedException
+ log.warn("Thread interrupted while processing queue ", e);
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ // Log the error and let the queue processor continue processing
+ log.error("Encountered exception while processing record. Record: {}
Exception: {}", record, e);
}
Review Comment:
I think we can keep the behaviour same in case of manual commit. Since in
those cases commit only happens when the message is processed.
But in case of autoCommit, messages are committed as soon as they are
consumed. Even when the `processQueue` infinite loop is broken, messages are
consumed and committed. In such scenarios it would be better to let future
messages be processed if they could.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]