This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ec14e1b4c [hotfix] Tiering Source Enumerator should clear pending
split when failover (#1820)
ec14e1b4c is described below
commit ec14e1b4c7b9b63a77d8db65234fccd2b7692442
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Oct 17 10:46:52 2025 +0800
[hotfix] Tiering Source Enumerator should clear pending split when failover
(#1820)
---
.../tiering/committer/TieringCommitOperator.java | 9 ++--
...RestoreEvent.java => TieringFailOverEvent.java} | 4 +-
.../source/enumerator/TieringSourceEnumerator.java | 8 +--
.../enumerator/TieringSourceEnumeratorTest.java | 62 ++++++++++++++++++++++
fluss-test-coverage/pom.xml | 1 -
5 files changed, 75 insertions(+), 9 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 779ca8913..0d3fe32d2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -25,7 +25,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
+import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
import org.apache.fluss.flink.tiering.source.TieringSource;
import org.apache.fluss.lake.committer.BucketOffset;
@@ -129,10 +129,10 @@ public class TieringCommitOperator<WriteResult,
Committable>
super.setup(containingTask, config, output);
int attemptNumber = getRuntimeContext().getAttemptNumber();
if (attemptNumber > 0) {
- LOG.info("Send TieringRestoreEvent");
+ LOG.info("Send TieringFailoverEvent, current attempt number: {}",
attemptNumber);
// attempt number is greater than zero, the job must failover
operatorEventGateway.sendEventToCoordinator(
- new SourceEventWrapper(new TieringRestoreEvent()));
+ new SourceEventWrapper(new TieringFailOverEvent()));
}
}
@@ -175,6 +175,9 @@ public class TieringCommitOperator<WriteResult, Committable>
new SourceEventWrapper(
new FailedTieringEvent(
tableId,
ExceptionUtils.stringifyException(e))));
+ LOG.warn(
+ "Fail to commit tiering write result, will try to tier
again in next round.",
+ e);
} finally {
collectedTableBucketWriteResults.remove(tableId);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
similarity index 89%
rename from
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
rename to
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
index 77a48f9e7..8cbe87fd1 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
@@ -20,7 +20,7 @@ package org.apache.fluss.flink.tiering.event;
import org.apache.flink.api.connector.source.SourceEvent;
-/** SourceEvent used to represent tiering is restoring. */
-public class TieringRestoreEvent implements SourceEvent {
+/** SourceEvent used to represent tiering is failover. */
+public class TieringFailOverEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index b0b58a092..7b670bc15 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -26,7 +26,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
+import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
import
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -218,13 +218,15 @@ public class TieringSourceEnumerator
}
}
- if (sourceEvent instanceof TieringRestoreEvent) {
+ if (sourceEvent instanceof TieringFailOverEvent) {
LOG.info(
- "Receiving tiering restore event, mark current tiering
table epoch {} as failed.",
+ "Receiving tiering failover event, mark current tiering
table epoch {} as failed.",
tieringTableEpochs);
// we need to make all as failed
failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
tieringTableEpochs.clear();
+ // also clean all pending splits since we mark all as failed
+ pendingSplits.clear();
}
if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index 1d04c7cf8..79c49c45f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.tiering.source.enumerator;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
+import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
import org.apache.fluss.flink.tiering.source.TieringTestBase;
import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
@@ -644,6 +645,48 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
}
}
+ @Test
+ void testHandleFailOverEvent() throws Throwable {
+ TablePath tablePath1 = TablePath.of(DEFAULT_DB,
"tiering-failover-test-log-table1");
+ createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
+
+ TablePath tablePath2 = TablePath.of(DEFAULT_DB,
"tiering-failover-test-log-table2");
+ createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
+
+ int numSubtasks = 1;
+ try (MockSplitEnumeratorContext<TieringSplit> context =
+ new MockSplitEnumeratorContext<>(numSubtasks)) {
+ TieringSourceEnumerator enumerator =
+ new TieringSourceEnumerator(flussConf, context, 500);
+
+ enumerator.start();
+ assertThat(context.getSplitsAssignmentSequence()).isEmpty();
+
+ // register one reader
+ int subtaskId = 0;
+ registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
+
+ // handle split request
+ enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+
+ // should get one tiering split, and the split is for tablePath1
+ verifyTieringSplitAssignment(context, 1, tablePath1);
+
+ // clean assignment
+ context.getSplitsAssignmentSequence().clear();
+
+ // enumerator handle TieringFailOverEvent, which will mark current
tiering tablePath1 as
+ // fail, and all pending splits should be clear
+ enumerator.handleSourceEvent(subtaskId, new
TieringFailOverEvent());
+
+ // handle split request
+ enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+ // now, should get another one tiering split, the split is for
tablePath2 since all
+ // pending split for tablePath1 is clear
+ verifyTieringSplitAssignment(context, 1, tablePath2);
+ }
+ }
+
private static CommitLakeTableSnapshotRequest
genCommitLakeTableSnapshotRequest(
long tableId,
@Nullable Long partitionId,
@@ -697,4 +740,23 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
.sorted(Comparator.comparing(Object::toString))
.collect(Collectors.toList());
}
+
+ private void verifyTieringSplitAssignment(
+ MockSplitEnumeratorContext<TieringSplit> context,
+ int expectedSplitSize,
+ TablePath expectedTablePath)
+ throws Throwable {
+ waitUntilTieringTableSplitAssignmentReady(context, 1, 200);
+ List<SplitsAssignment<TieringSplit>> actualAssignment =
+ context.getSplitsAssignmentSequence();
+
+ List<TieringSplit> allTieringSplits =
+ actualAssignment.stream()
+ .flatMap(assignments ->
assignments.assignment().values().stream())
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ assertThat(allTieringSplits).hasSize(expectedSplitSize);
+ assertThat(allTieringSplits)
+ .allMatch(tieringSplit ->
tieringSplit.getTablePath().equals(expectedTablePath));
+ }
}
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index c4577f324..ee38b0c54 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -395,7 +395,6 @@
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>
<exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude>
-
<exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude>
<exclude>
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
</exclude>