lightzhao commented on code in PR #7273:
URL: https://github.com/apache/seatunnel/pull/7273#discussion_r1719615436
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java:
##########
@@ -58,8 +61,7 @@ public void run() {
task.accept(consumer);
}
} catch (Exception e) {
- throw new KafkaConnectorException(
- KafkaConnectorErrorCode.CONSUME_THREAD_RUN_ERROR,
e);
Review Comment:
> I think we should just let job failed when find an exception like
>
>
https://github.com/apache/seatunnel/blob/16eeb1c12323b462474ad23103de96c0060a4537/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java#L71
>
> In other cases, the behavior not change.
This exception is triggered when consumer.commitSync(offsets). Now there are
two problems:
1. This exception is thrown during the execution of the KafkaConsumerThread
thread. To obtain the result of the thread execution exception, Future or
callback is required. This has a great impact on the current
KafkaConsumerThread code structure transformation. I suggest that it keep the
thread from exiting abnormally until the Kafka cluster is restored and continue
to execute
2. consumer.commitSync is triggered at checkpoint, so even if an exception
is captured, the task may not exit abnormally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]