This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ec14e1b4c [hotfix] Tiering Source Enumerator should clear pending 
split when failover (#1820)
ec14e1b4c is described below

commit ec14e1b4c7b9b63a77d8db65234fccd2b7692442
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Oct 17 10:46:52 2025 +0800

    [hotfix] Tiering Source Enumerator should clear pending split when failover 
(#1820)
---
 .../tiering/committer/TieringCommitOperator.java   |  9 ++--
 ...RestoreEvent.java => TieringFailOverEvent.java} |  4 +-
 .../source/enumerator/TieringSourceEnumerator.java |  8 +--
 .../enumerator/TieringSourceEnumeratorTest.java    | 62 ++++++++++++++++++++++
 fluss-test-coverage/pom.xml                        |  1 -
 5 files changed, 75 insertions(+), 9 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 779ca8913..0d3fe32d2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -25,7 +25,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
+import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
 import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
 import org.apache.fluss.flink.tiering.source.TieringSource;
 import org.apache.fluss.lake.committer.BucketOffset;
@@ -129,10 +129,10 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         super.setup(containingTask, config, output);
         int attemptNumber = getRuntimeContext().getAttemptNumber();
         if (attemptNumber > 0) {
-            LOG.info("Send TieringRestoreEvent");
+            LOG.info("Send TieringFailoverEvent, current attempt number: {}", 
attemptNumber);
             // attempt number is greater than zero, the job must failover
             operatorEventGateway.sendEventToCoordinator(
-                    new SourceEventWrapper(new TieringRestoreEvent()));
+                    new SourceEventWrapper(new TieringFailOverEvent()));
         }
     }
 
@@ -175,6 +175,9 @@ public class TieringCommitOperator<WriteResult, Committable>
                         new SourceEventWrapper(
                                 new FailedTieringEvent(
                                         tableId, 
ExceptionUtils.stringifyException(e))));
+                LOG.warn(
+                        "Fail to commit tiering write result, will try to tier 
again in next round.",
+                        e);
             } finally {
                 collectedTableBucketWriteResults.remove(tableId);
             }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
similarity index 89%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
index 77a48f9e7..8cbe87fd1 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
@@ -20,7 +20,7 @@ package org.apache.fluss.flink.tiering.event;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 
-/** SourceEvent used to represent tiering is restoring. */
-public class TieringRestoreEvent implements SourceEvent {
+/** SourceEvent used to represent tiering is failover. */
+public class TieringFailOverEvent implements SourceEvent {
     private static final long serialVersionUID = 1L;
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index b0b58a092..7b670bc15 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -26,7 +26,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
+import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
 import org.apache.fluss.flink.tiering.source.split.TieringSplit;
 import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
 import 
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -218,13 +218,15 @@ public class TieringSourceEnumerator
             }
         }
 
-        if (sourceEvent instanceof TieringRestoreEvent) {
+        if (sourceEvent instanceof TieringFailOverEvent) {
             LOG.info(
-                    "Receiving tiering restore event, mark current tiering 
table epoch {} as failed.",
+                    "Receiving tiering failover event, mark current tiering 
table epoch {} as failed.",
                     tieringTableEpochs);
             // we need to make all as failed
             failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
             tieringTableEpochs.clear();
+            // also clean all pending splits since we mark all as failed
+            pendingSplits.clear();
         }
 
         if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index 1d04c7cf8..79c49c45f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.tiering.source.enumerator;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
+import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
 import org.apache.fluss.flink.tiering.source.TieringTestBase;
 import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
 import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
@@ -644,6 +645,48 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
         }
     }
 
+    @Test
+    void testHandleFailOverEvent() throws Throwable {
+        TablePath tablePath1 = TablePath.of(DEFAULT_DB, 
"tiering-failover-test-log-table1");
+        createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
+
+        TablePath tablePath2 = TablePath.of(DEFAULT_DB, 
"tiering-failover-test-log-table2");
+        createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
+
+        int numSubtasks = 1;
+        try (MockSplitEnumeratorContext<TieringSplit> context =
+                new MockSplitEnumeratorContext<>(numSubtasks)) {
+            TieringSourceEnumerator enumerator =
+                    new TieringSourceEnumerator(flussConf, context, 500);
+
+            enumerator.start();
+            assertThat(context.getSplitsAssignmentSequence()).isEmpty();
+
+            // register one reader
+            int subtaskId = 0;
+            registerReader(context, enumerator, subtaskId, "localhost-" + 
subtaskId);
+
+            // handle split request
+            enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+
+            // should get one tiering split, and the split is for tablePath1
+            verifyTieringSplitAssignment(context, 1, tablePath1);
+
+            // clean assignment
+            context.getSplitsAssignmentSequence().clear();
+
+            // enumerator handle TieringFailOverEvent, which will mark current 
tiering tablePath1 as
+            // fail, and all pending splits should be clear
+            enumerator.handleSourceEvent(subtaskId, new 
TieringFailOverEvent());
+
+            // handle split request
+            enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+            // now, should get another one tiering split, the split is for 
tablePath2 since all
+            // pending split for tablePath1 is clear
+            verifyTieringSplitAssignment(context, 1, tablePath2);
+        }
+    }
+
     private static CommitLakeTableSnapshotRequest 
genCommitLakeTableSnapshotRequest(
             long tableId,
             @Nullable Long partitionId,
@@ -697,4 +740,23 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
                 .sorted(Comparator.comparing(Object::toString))
                 .collect(Collectors.toList());
     }
+
+    private void verifyTieringSplitAssignment(
+            MockSplitEnumeratorContext<TieringSplit> context,
+            int expectedSplitSize,
+            TablePath expectedTablePath)
+            throws Throwable {
+        waitUntilTieringTableSplitAssignmentReady(context, 1, 200);
+        List<SplitsAssignment<TieringSplit>> actualAssignment =
+                context.getSplitsAssignmentSequence();
+
+        List<TieringSplit> allTieringSplits =
+                actualAssignment.stream()
+                        .flatMap(assignments -> 
assignments.assignment().values().stream())
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
+        assertThat(allTieringSplits).hasSize(expectedSplitSize);
+        assertThat(allTieringSplits)
+                .allMatch(tieringSplit -> 
tieringSplit.getTablePath().equals(expectedTablePath));
+    }
 }
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index c4577f324..ee38b0c54 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -395,7 +395,6 @@
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude>
-                                        
<exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude>
                                         <exclude>
                                             
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
                                         </exclude>

Reply via email to