phet commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1883504102
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -340,18 +340,29 @@ public void run() {
while (true) {
record = queue.take();
messagesRead.inc();
- HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
- recordsProcessed.incrementAndGet();
+ try {
+ HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)
record);
+ recordsProcessed.incrementAndGet();
+ }
+ catch (Exception e) {
+ // Rethrow exception in case auto commit is disabled
+ if (!HighLevelConsumer.this.enableAutoCommit) {
+ throw e;
+ }
+ // Continue with processing next records in case auto commit is
enabled
+ log.error("Encountered exception while processing record. Record:
{} Exception: {}", record, e);
+ }
- if(!HighLevelConsumer.this.enableAutoCommit) {
- KafkaPartition partition = new
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
+ if (!HighLevelConsumer.this.enableAutoCommit) {
+ KafkaPartition partition =
+ new
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic)
+ .build();
// Committed offset should always be the offset of the next record
to be read (hence +1)
partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
}
}
- } catch (InterruptedException e) {
+ } catch(InterruptedException e){
Review Comment:
needs whitespace
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -340,18 +340,29 @@ public void run() {
while (true) {
record = queue.take();
messagesRead.inc();
- HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
- recordsProcessed.incrementAndGet();
+ try {
+ HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)
record);
+ recordsProcessed.incrementAndGet();
+ }
+ catch (Exception e) {
+ // Rethrow exception in case auto commit is disabled
+ if (!HighLevelConsumer.this.enableAutoCommit) {
+ throw e;
+ }
+ // Continue with processing next records in case auto commit is
enabled
+ log.error("Encountered exception while processing record. Record:
{} Exception: {}", record, e);
Review Comment:
might be worth adding an external metric here so we could establish an
alarm. we don't want to continue churning through a long sequence of errors
only to be dropping messages w/ nobody realizing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]