phet commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1883504102


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -340,18 +340,29 @@ public void run() {
         while (true) {
           record = queue.take();
           messagesRead.inc();
-          HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
-          recordsProcessed.incrementAndGet();
+          try {
+            HighLevelConsumer.this.processMessage((DecodeableKafkaRecord) 
record);
+            recordsProcessed.incrementAndGet();
+          }
+          catch (Exception e) {
+            // Rethrow exception in case auto commit is disabled
+            if (!HighLevelConsumer.this.enableAutoCommit) {
+              throw e;
+            }
+            // Continue with processing next records in case auto commit is 
enabled
+            log.error("Encountered exception while processing record. Record: 
{} Exception: {}", record, e);
+          }
 
-          if(!HighLevelConsumer.this.enableAutoCommit) {
-            KafkaPartition partition = new 
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
+          if (!HighLevelConsumer.this.enableAutoCommit) {
+            KafkaPartition partition =
+                new 
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic)
+                    .build();
             // Committed offset should always be the offset of the next record 
to be read (hence +1)
             partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
           }
         }
-      } catch (InterruptedException e) {
+      } catch(InterruptedException e){

Review Comment:
   needs whitespace



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -340,18 +340,29 @@ public void run() {
         while (true) {
           record = queue.take();
           messagesRead.inc();
-          HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
-          recordsProcessed.incrementAndGet();
+          try {
+            HighLevelConsumer.this.processMessage((DecodeableKafkaRecord) 
record);
+            recordsProcessed.incrementAndGet();
+          }
+          catch (Exception e) {
+            // Rethrow exception in case auto commit is disabled
+            if (!HighLevelConsumer.this.enableAutoCommit) {
+              throw e;
+            }
+            // Continue with processing next records in case auto commit is 
enabled
+            log.error("Encountered exception while processing record. Record: 
{} Exception: {}", record, e);

Review Comment:
   might be worth adding an external metric here so we could establish an 
alarm.  we don't want to continue churning through a long sequence of errors 
only to be dropping messages w/ nobody realizing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to