[
https://issues.apache.org/jira/browse/GOBBLIN-2177?focusedWorklogId=947079&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-947079
]
ASF GitHub Bot logged work on GOBBLIN-2177:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 06/Dec/24 17:07
Start Date: 06/Dec/24 17:07
Worklog Time Spent: 10m
Work Description: abhishekmjain commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1873718377
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -348,13 +348,15 @@ record = queue.take();
// Committed offset should always be the offset of the next record
to be read (hence +1)
partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
}
+ } catch (InterruptedException e) {
+ // Stop queue processor and return when encountered
InterruptedException
+ log.warn("Thread interrupted while processing queue ", e);
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ // Log the error and let the queue processor continue processing
+ log.error("Encountered exception while processing record. Record: {}
Exception: {}", record, e);
}
Review Comment:
I think we can keep the behaviour same in case of manual commit. Since in
those cases commit only happens when the message is processed.
But in case of autoCommit, messages are committed as soon as they are
consumed. Even when the `processQueue` infinite loop is broken, messages are
consumed and committed. In such scenarios it would be better to let future
messages be processed if they could.
Issue Time Tracking
-------------------
Worklog Id: (was: 947079)
Time Spent: 2h (was: 1h 50m)
> Avoid stopping Kafka HighLevelConsumer - QueueProcessor on
> non-InterruptedExceptions
> ------------------------------------------------------------------------------------
>
> Key: GOBBLIN-2177
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2177
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Abhishek Jain
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> The QueueProcessor within HighLevelConsumer contains an infinite while loop
> that is enclosed in a try-catch block. When any exception is encountered,
> this loop breaks, which halts the processing of any consumed messages until
> the service is restarted.
> We should not break this infinite loop on all exceptions; rather, we should
> break it only on InterruptedException, which truly means the QueueProcessor
> should stop processing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)