This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch dlf-2.5
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 1effba51036f8f35fee09dda5226922e2bd593b7
Author: luoyuxia <[email protected]>
AuthorDate: Tue Jul 22 15:04:58 2025 +0800

    fix failover issue
---
 .../tiering/committer/TieringCommitOperator.java   | 14 ++++++++++++
 .../flink/tiering/event/TieringRestoreEvent.java   | 25 ++++++++++++++++++++++
 .../source/enumerator/TieringSourceEnumerator.java | 12 +++++++++++
 3 files changed, 51 insertions(+)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
index 6f6720a9e..49fb2230e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -24,6 +24,7 @@ import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.LakeTableSnapshotNotExistException;
 import com.alibaba.fluss.flink.tiering.event.FailedTieringEvent;
 import com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent;
+import com.alibaba.fluss.flink.tiering.event.TieringRestoreEvent;
 import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
 import com.alibaba.fluss.flink.tiering.source.TieringSource;
 import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
@@ -38,6 +39,7 @@ import com.alibaba.fluss.utils.ExceptionUtils;
 
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -115,6 +117,18 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         admin = connection.getAdmin();
     }
 
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        int attemptNumber = getRuntimeContext().getAttemptNumber();
+        LOG.info("Attempt number is {}", attemptNumber);
+        if (attemptNumber > 0) {
+            LOG.info("Send TieringRestoreEvent");
+            operatorEventGateway.sendEventToCoordinator(
+                    new SourceEventWrapper(new TieringRestoreEvent()));
+        }
+    }
+
     @Override
     public void 
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
             throws Exception {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/event/TieringRestoreEvent.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/event/TieringRestoreEvent.java
new file mode 100644
index 000000000..6bf789a6e
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/event/TieringRestoreEvent.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.tiering.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/** SourceEvent used to represent tiering is restoring. */
+public class TieringRestoreEvent implements SourceEvent {
+
+    private static final long serialVersionUID = 1L;
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 296f1580f..180b144e6 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -26,6 +26,7 @@ import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.flink.metrics.FlinkMetricRegistry;
 import com.alibaba.fluss.flink.tiering.event.FailedTieringEvent;
 import com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent;
+import com.alibaba.fluss.flink.tiering.event.TieringRestoreEvent;
 import com.alibaba.fluss.flink.tiering.source.split.TieringSplit;
 import com.alibaba.fluss.flink.tiering.source.split.TieringSplitGenerator;
 import 
com.alibaba.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -214,6 +215,14 @@ public class TieringSourceEnumerator
                 failedTableEpochs.put(failedTableId, tieringEpoch);
             }
         }
+        if (sourceEvent instanceof TieringRestoreEvent) {
+            LOG.info(
+                    "Receiving tiering restore event, current tiering table 
epoch is {} .",
+                    tieringTableEpochs);
+            // we need to make all as failed
+            failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
+            tieringTableEpochs.clear();
+        }
 
         if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
             // call one round of heartbeat to notify table has been finished 
or failed
@@ -256,6 +265,9 @@ public class TieringSourceEnumerator
     private @Nullable Tuple3<Long, Long, TablePath> 
requestTieringTableSplitsViaHeartBeat() {
         Map<Long, Long> currentFinishedTableEpochs = new 
HashMap<>(this.finishedTableEpochs);
         Map<Long, Long> currentFailedTableEpochs = new 
HashMap<>(this.failedTableEpochs);
+        LOG.info(
+                "Requesting tiering table splits via heartbeat, failed table 
epoch is {}.",
+                currentFailedTableEpochs);
         LakeTieringHeartbeatRequest tieringHeartbeatRequest =
                 tieringTableHeartBeat(
                         basicHeartBeat(),

Reply via email to