[ 
https://issues.apache.org/jira/browse/GOBBLIN-2177?focusedWorklogId=947079&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-947079
 ]

ASF GitHub Bot logged work on GOBBLIN-2177:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Dec/24 17:07
            Start Date: 06/Dec/24 17:07
    Worklog Time Spent: 10m 
      Work Description: abhishekmjain commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1873718377


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -348,13 +348,15 @@ record = queue.take();
             // Committed offset should always be the offset of the next record 
to be read (hence +1)
             partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
           }
+        } catch (InterruptedException e) {
+          // Stop queue processor and return when encountered 
InterruptedException
+          log.warn("Thread interrupted while processing queue ", e);
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          // Log the error and let the queue processor continue processing
+          log.error("Encountered exception while processing record. Record: {} 
Exception: {}", record, e);
         }

Review Comment:
   I think we can keep the behaviour same in case of manual commit. Since in 
those cases commit only happens when the message is processed.
   
   But in case of autoCommit, messages are committed as soon as they are 
consumed. Even when the `processQueue` infinite loop is broken, messages are 
consumed and committed. In such scenarios it would be better to let future 
messages be processed if they could.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 947079)
    Time Spent: 2h  (was: 1h 50m)

> Avoid stopping Kafka HighLevelConsumer - QueueProcessor on 
> non-InterruptedExceptions
> ------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2177
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2177
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Abhishek Jain
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> The QueueProcessor within HighLevelConsumer contains an infinite while loop 
> that is enclosed in a try-catch block. When any exception is encountered, 
> this loop breaks, which halts the processing of any consumed messages until 
> the service is restarted.
> We should not break this infinite loop on all exceptions; rather, we should 
> break it only on InterruptedException, which truly means the QueueProcessor 
> should stop processing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to