Savonitar commented on code in PR #279:
URL: 
https://github.com/apache/flink-connector-kafka/pull/279#discussion_r3505979200


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -796,26 +857,303 @@ private void retainRemovedClusterEnumeratorStates(
         }
 
         long retainedUntilMs = System.currentTimeMillis() + 
removedClusterStateRetentionMs;
-        activeClusterEnumeratorStates.entrySet().stream()
-                .filter(entry -> 
!activeKafkaClusterIds.contains(entry.getKey()))
-                .forEach(
-                        entry ->
-                                retainedClusterEnumeratorStates.put(
-                                        entry.getKey(),
-                                        new 
DynamicKafkaSourceEnumState.RetainedClusterState(
-                                                entry.getValue(), 
retainedUntilMs)));
+        for (Entry<String, KafkaSourceEnumState> entry : 
activeClusterEnumeratorStates.entrySet()) {
+            if (activeKafkaClusterIds.contains(entry.getKey())) {
+                continue;
+            }
+            retainedClusterEnumeratorStates.put(
+                    entry.getKey(),
+                    new DynamicKafkaSourceEnumState.RetainedClusterState(
+                            entry.getValue(), retainedUntilMs));
+            retainedSplitOffsetHandoffs.remove(entry.getKey());
+        }
     }
 
     private void pruneExpiredRetainedClusterEnumeratorStates() {
         if (removedClusterStateRetentionMs <= 0) {
             retainedClusterEnumeratorStates.clear();
+            retainedSplitOffsetHandoffs.clear();
             return;
         }
 
         long currentTimeMillis = System.currentTimeMillis();
         retainedClusterEnumeratorStates
                 .entrySet()
                 .removeIf(entry -> entry.getValue().getRetainedUntilMs() <= 
currentTimeMillis);
+        
retainedSplitOffsetHandoffs.keySet().retainAll(retainedClusterEnumeratorStates.keySet());
+    }
+
+    private void pruneExpiredRetainedSplitOffsetHandoffs() {
+        long currentTimeMillis = System.currentTimeMillis();
+        retainedSplitOffsetHandoffs
+                .entrySet()
+                .removeIf(
+                        entry -> {
+                            RetainedSplitOffsetHandoff handoff = 
entry.getValue();
+                            if (!handoff.isExpired(currentTimeMillis)) {
+                                return false;
+                            }
+                            logger.debug(
+                                    "Discarding timed out retained split 
offset handoff for cluster {}: handoffId={}",
+                                    entry.getKey(),
+                                    handoff.handoffId);
+                            handoff.offsetsByReader.clear();
+                            return true;
+                        });
+    }
+
+    private boolean isRetainedClusterReadyForAssignment(
+            String kafkaClusterId,
+            DynamicKafkaSourceEnumState.RetainedClusterState 
retainedClusterState) {
+        Set<String> activeTopics =
+                latestClusterTopicsMap.getOrDefault(kafkaClusterId, 
Collections.emptySet());
+        return 
filterStateByTopics(retainedClusterState.getKafkaSourceEnumState(), 
activeTopics)
+                .stream()
+                .noneMatch(
+                        splitStatus ->
+                                
splitStatus.assignmentStatus().equals(AssignmentStatus.ASSIGNED));
+    }
+
+    private void startRetainedSplitOffsetHandoff(String kafkaClusterId) {
+        if (retainedSplitOffsetHandoffs.containsKey(kafkaClusterId)) {
+            return;
+        }
+
+        // Keep the attempt bounded without making a fast metadata refresh 
interval shorter than a
+        // reader source-event round trip.
+        long handoffTimeoutMs =
+                Math.max(
+                        kafkaMetadataServiceDiscoveryIntervalMs,
+                        RETAINED_SPLIT_OFFSET_HANDOFF_MIN_TIMEOUT_MS);
+        long deadlineMs = System.currentTimeMillis() + handoffTimeoutMs;
+        RetainedSplitOffsetHandoff handoff =
+                new 
RetainedSplitOffsetHandoff(++nextRetainedSplitOffsetHandoffId, deadlineMs);
+        retainedSplitOffsetHandoffs.put(kafkaClusterId, handoff);
+        scheduleRetainedSplitOffsetHandoffRetryIfNeeded();
+        for (int readerId : enumContext.registeredReaders().keySet()) {
+            sendRetainedSplitOffsetRequestToReader(kafkaClusterId, handoff, 
readerId);
+        }
+    }
+
+    private void scheduleRetainedSplitOffsetHandoffRetryIfNeeded() {
+        if (kafkaMetadataServiceDiscoveryIntervalMs > 0
+                || retainedSplitOffsetHandoffRetryScheduled) {
+            return;
+        }
+
+        // One-time metadata discovery has no later refresh to discard an 
expired handoff. Keep a
+        // single lightweight retry loop after the first handoff; it does not 
fetch metadata.
+        retainedSplitOffsetHandoffRetryScheduled = true;
+        kafkaMetadataServiceDiscoveryContext.<Void>callAsync(
+                () -> null,
+                (ignored, t) -> {
+                    if (t != null) {
+                        throw new RuntimeException("Retained split offset 
handoff retry failed", t);
+                    }
+                    pruneExpiredRetainedClusterEnumeratorStates();
+                    pruneExpiredRetainedSplitOffsetHandoffs();
+                    maybeStartReadyRetainedClusterEnumerators();
+                },
+                RETAINED_SPLIT_OFFSET_HANDOFF_MIN_TIMEOUT_MS,
+                RETAINED_SPLIT_OFFSET_HANDOFF_MIN_TIMEOUT_MS);
+    }
+
+    private void sendPendingRetainedSplitOffsetRequestsToReader(int readerId) {
+        retainedSplitOffsetHandoffs.forEach(
+                (kafkaClusterId, handoff) ->
+                        sendRetainedSplitOffsetRequestToReader(kafkaClusterId, 
handoff, readerId));
+    }
+
+    private void sendRetainedSplitOffsetRequestToReader(
+            String kafkaClusterId, RetainedSplitOffsetHandoff handoff, int 
readerId) {
+        RequestRetainedSplitOffsetsEvent requestEvent =
+                new RequestRetainedSplitOffsetsEvent(handoff.handoffId, 
kafkaClusterId);
+        logger.debug(
+                "Requesting retained split offsets from reader {}: {}", 
readerId, requestEvent);
+        enumContext.sendEventToSourceReader(readerId, requestEvent);
+    }
+
+    private void handleRetainedSplitOffsetsEvent(
+            int subtaskId, RetainedSplitOffsetsEvent 
retainedSplitOffsetsEvent) {
+        if (!enumContext.registeredReaders().containsKey(subtaskId)) {
+            logger.debug("Ignoring retained split offsets from unavailable 
reader {}", subtaskId);
+            return;
+        }
+        pruneExpiredRetainedClusterEnumeratorStates();
+        String kafkaClusterId = retainedSplitOffsetsEvent.getKafkaClusterId();
+        RetainedSplitOffsetHandoff handoff = 
retainedSplitOffsetHandoffs.get(kafkaClusterId);
+        if (handoff == null || handoff.handoffId != 
retainedSplitOffsetsEvent.getHandoffId()) {
+            logger.debug(
+                    "Ignoring stale retained split offsets from reader {}: {}",
+                    subtaskId,
+                    retainedSplitOffsetsEvent);
+            return;
+        }
+        if (handoff.isExpired(System.currentTimeMillis())) {
+            logger.debug(
+                    "Ignoring retained split offsets from timed out handoff 
{}: {}",
+                    handoff.handoffId,
+                    retainedSplitOffsetsEvent);
+            clearRetainedSplitOffsetHandoff(kafkaClusterId, handoff);
+            return;
+        }
+
+        handoff.offsetsByReader.put(subtaskId, 
retainedSplitOffsetsEvent.getRetainedSplitOffsets());
+        if (handoff.offsetsByReader.size() >= 
enumContext.currentParallelism()) {
+            applyRetainedSplitOffsetHandoff(kafkaClusterId, handoff);
+        }
+        maybeStartReadyRetainedClusterEnumerators();
+    }
+
+    private void applyRetainedSplitOffsetHandoff(
+            String kafkaClusterId, RetainedSplitOffsetHandoff handoff) {
+        DynamicKafkaSourceEnumState.RetainedClusterState retainedClusterState =
+                retainedClusterEnumeratorStates.get(kafkaClusterId);
+        if (retainedClusterState == null) {
+            clearRetainedSplitOffsetHandoff(kafkaClusterId, handoff);
+            return;
+        }
+
+        KafkaSourceEnumState kafkaSourceEnumState = 
retainedClusterState.getKafkaSourceEnumState();
+        Map<String, Long> retainedSplitOffsets = handoff.mergedOffsets();
+        Set<String> activeTopics =
+                latestClusterTopicsMap.getOrDefault(kafkaClusterId, 
Collections.emptySet());
+        Set<SplitAndAssignmentStatus> updatedSplits = new HashSet<>();
+        for (SplitAndAssignmentStatus splitStatus : 
kafkaSourceEnumState.splits()) {
+            if (!activeTopics.contains(splitStatus.split().getTopic())) {
+                updatedSplits.add(splitStatus);
+                continue;
+            }
+            if 
(splitStatus.assignmentStatus().equals(AssignmentStatus.ASSIGNED)) {
+                Long retainedSplitOffset =
+                        retainedSplitOffsets.get(
+                                toDynamicSplitId(kafkaClusterId, 
splitStatus.split()));
+                if (retainedSplitOffset == null) {
+                    // No reader retains this offset anymore; let normal 
discovery recreate it.

Review Comment:
   1. Is my understanding correct, in this case, it will be re-discovered as a 
"new" partition, so it comes back from newDiscoveryOffsetsInitializer 
(earliest) rather than the configured starting offset, e.g. a latest(), and 
source would rewind to the start of the topic.
   Could this null branch fall back to the split's own retained offset instead 
of dropping it, so it resumes from the last known position?
   2. In this case, will sink topic have duplicates even in EOS mode? 



##########
docs/content/docs/connectors/datastream/dynamic-kafka.md:
##########
@@ -242,6 +242,8 @@ By default, metadata removal also removes that cluster's 
split offsets from subs
 To keep removed cluster offsets available for a later re-add or restore, set
 `stream-metadata-removed-cluster-retention-ms` to a positive duration. For 
example,
 `604800000` retains removed cluster state for seven days before the source 
stops checkpointing it.
+If the cluster is re-added, the source uses the retained offsets but computes 
fresh reader
+assignments instead of reusing their previous owners.

Review Comment:
   should we mention this branch/reset behavior 
https://github.com/apache/flink-connector-kafka/pull/279/changes#r3505979200 
which sounds like an exception? if we go with the suggested workflow.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -748,6 +796,19 @@ public void addReader(int subtaskId) {
         // assign pending splits from the sub enumerator
         clusterEnumeratorMap.forEach(
                 (cluster, subEnumerator) -> 
subEnumerator.addReader(subtaskId));
+        if (!retainedSplitOffsetHandoffs.isEmpty()) {
+            pruneExpiredRetainedSplitOffsetHandoffs();
+            // A reader can join while a re-added cluster is waiting for 
offset handoff. Send
+            // metadata first so the reader has reconciled its local retained 
state before it
+            // answers the request. Restart the attempt so delayed responses 
from the reader's
+            // previous attempt cannot count as its replacement report.
+            retainedSplitOffsetHandoffs
+                    .values()
+                    .forEach(handoff -> handoff.offsetsByReader.clear());

Review Comment:
   Can I clarify, a single slow subtask that keeps going down and coming back 
up can repeatedly clear the other reader's reports and involve a delay?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to