phet commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1871905332


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -348,13 +348,15 @@ record = queue.take();
             // Committed offset should always be the offset of the next record 
to be read (hence +1)
             partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
           }
+        } catch (InterruptedException e) {
+          // Stop queue processor and return when encountered 
InterruptedException
+          log.warn("Thread interrupted while processing queue ", e);
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          // Log the error and let the queue processor continue processing
+          log.error("Encountered exception while processing record. Record: {} 
Exception: {}", record, e);
         }

Review Comment:
   yes, exactly.  right now, it's an infinite loop w/ no recovery (so obviously 
no retries).
   
   this PR would change that infinite loop to do infinite recovery, although 
NEVER retry a given message (any msg hitting an error gets skipped).
   
   I'm suggesting to change the infinite loop to do possibly infinite recovery 
with bounded retries on any given message.  recovery only continues as long as 
msg retries resolve within bounded time.
   
   I'd let retries continue a configurable time, probably up to 15 mins or so 
in our case of gobblin cluster.  at that point fail loudly via a metrics 
condition to alert on.  (I'd set threshold very low there - like just one 
event.)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -336,8 +336,8 @@ public QueueProcessor(BlockingQueue queue) {
     public void run() {
       log.info("Starting queue processing.. " + 
Thread.currentThread().getName());
       KafkaConsumerRecord record = null;
-      try {
-        while (true) {
+      while (true) {
+        try {
           record = queue.take();

Review Comment:
   right now the `QueueProcessor` thread is dead so we no longer run:
   ```
   partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
   ```
   which means that whether or not we continue consuming, after system restart, 
we'll rewind offsets to have another go with that first problematic message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to