lightzhao commented on code in PR #7273:
URL: https://github.com/apache/seatunnel/pull/7273#discussion_r1719615436


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java:
##########
@@ -58,8 +61,7 @@ public void run() {
                         task.accept(consumer);
                     }
                 } catch (Exception e) {
-                    throw new KafkaConnectorException(
-                            KafkaConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, 
e);

Review Comment:
   > I think we should just let job failed when find an exception like
   > 
   > 
https://github.com/apache/seatunnel/blob/16eeb1c12323b462474ad23103de96c0060a4537/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java#L71
   > 
   > In other cases, the behavior not change.
   
   This exception is triggered when consumer.commitSync(offsets). Now there are 
two problems:
   1. This exception is thrown during the execution of the KafkaConsumerThread 
thread. To obtain the result of the thread execution exception, Future or 
callback is required. This has a great impact on the current 
KafkaConsumerThread code structure transformation. I suggest that it keep the 
thread from exiting abnormally until the Kafka cluster is restored and continue 
to execute
   2. consumer.commitSync is triggered at checkpoint, so even if an exception 
is captured, the task may not exit abnormally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to