bowenli86 commented on code in PR #274:
URL: 
https://github.com/apache/flink-connector-kafka/pull/274#discussion_r3455868212


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -754,6 +864,8 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
     @Override
     public void close() throws IOException {
         try {
+            kafkaMetadataServiceDiscoveryContext.close();

Review Comment:
   DynamicKafkaSourceEnumerator.close() waits for 
kafkaMetadataServiceDiscoveryContext.close() before closing 
kafkaMetadataService. Since the discovery context uses shutdownNow() and then 
waits indefinitely, a custom KafkaMetadataService with an in-flight metadata 
call that does not respond to interruption, or only unblocks when close() is 
called, could hang source coordinator shutdown.
   
   Could we either close/unblock the metadata service before waiting for the 
discovery worker, avoid serializing close() behind active metadata calls, or 
use a bounded wait with explicit failure/logging?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -587,17 +653,38 @@ private void startAllEnumerators() {
     }
 
     private void closeAllEnumeratorsAndContexts() {
-        clusterEnumeratorMap.forEach(
+        Map<String, StoppableKafkaEnumContextProxy> 
closingClusterEnumContextMap =
+                new HashMap<>(clusterEnumContextMap);
+        Map<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
+                closingClusterEnumeratorMap = new 
HashMap<>(clusterEnumeratorMap);
+        closingClusterEnumContextMap
+                .values()
+                .forEach(StoppableKafkaEnumContextProxy::prepareForClose);
+        clusterEnumContextMap.clear();
+        clusterEnumeratorMap.clear();
+
+        enumeratorClosingExecutor.execute(
+                () ->
+                        closeEnumeratorsAndContexts(
+                                closingClusterEnumContextMap, 
closingClusterEnumeratorMap));
+    }
+
+    private void closeEnumeratorsAndContexts(
+            Map<String, StoppableKafkaEnumContextProxy> 
closingClusterEnumContextMap,
+            Map<String, SplitEnumerator<KafkaPartitionSplit, 
KafkaSourceEnumState>>
+                    closingClusterEnumeratorMap) {
+        closingClusterEnumeratorMap.forEach(
                 (cluster, subEnumerator) -> {
                     try {
-                        clusterEnumContextMap.get(cluster).close();
+                        closingClusterEnumContextMap.get(cluster).close();
                         subEnumerator.close();
                     } catch (Exception e) {
-                        throw new RuntimeException(e);
+                        enumContext.runInCoordinatorThread(

Review Comment:
   The async stale-enumerator close path catches exceptions and posts a 
throwing runnable back through enumContext.runInCoordinatorThread(). During 
coordinator shutdown, that posted runnable can race with context shutdown and 
potentially be dropped/rejected, while DynamicKafkaSourceEnumerator.close() 
only waits for enumeratorClosingExecutor termination.
   
   Could we retain the first async close failure, for example with a Future or 
AtomicReference, and rethrow or at least log it from close() after awaiting the 
close executor? That would make close failures observable even during shutdown.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -725,6 +812,29 @@ private Set<SplitAndAssignmentStatus> filterStateByTopics(
                 .collect(Collectors.toSet());
     }
 
+    private void tuneEnumeratorAdminClientTimeouts(Properties consumerProps) {
+        tuneEnumeratorAdminClientTimeout(consumerProps, 
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        tuneEnumeratorAdminClientTimeout(
+                consumerProps, 
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG);
+    }
+
+    private void tuneEnumeratorAdminClientTimeout(Properties consumerProps, 
String propertyKey) {
+        String configuredTimeoutMs = consumerProps.getProperty(propertyKey);
+        if (configuredTimeoutMs == null) {
+            return;
+        }
+
+        long readerTimeoutMs = Long.parseLong(configuredTimeoutMs);
+        long enumeratorTimeoutMs =
+                Math.max(1L, readerTimeoutMs / 
ENUMERATOR_ADMIN_CLIENT_TIMEOUT_DIVISOR);
+        consumerProps.setProperty(propertyKey, 
Long.toString(enumeratorTimeoutMs));

Review Comment:
   The sub-enumerator timeout tuning silently rewrites `user/cluster-provided 
request.timeout.ms` and `default.api.timeout.ms` to half their configured 
values before constructing the Kafka sub-enumerator. I understand the 
mitigation goal, but this is a behavior change for jobs that intentionally 
configured those Kafka client timeouts.
   
   Could we make this opt-in / dynamic-source-specific, or document why it is 
safe to override user-supplied Kafka timeout settings here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to