This is an automated email from the ASF dual-hosted git repository. hongshun pushed a commit to branch dlf-2.5 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 1effba51036f8f35fee09dda5226922e2bd593b7 Author: luoyuxia <[email protected]> AuthorDate: Tue Jul 22 15:04:58 2025 +0800 fix failover issue --- .../tiering/committer/TieringCommitOperator.java | 14 ++++++++++++ .../flink/tiering/event/TieringRestoreEvent.java | 25 ++++++++++++++++++++++ .../source/enumerator/TieringSourceEnumerator.java | 12 +++++++++++ 3 files changed, 51 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java index 6f6720a9e..49fb2230e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.LakeTableSnapshotNotExistException; import com.alibaba.fluss.flink.tiering.event.FailedTieringEvent; import com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent; +import com.alibaba.fluss.flink.tiering.event.TieringRestoreEvent; import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult; import com.alibaba.fluss.flink.tiering.source.TieringSource; import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot; @@ -38,6 +39,7 @@ import com.alibaba.fluss.utils.ExceptionUtils; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; @@ -115,6 +117,18 @@ public class TieringCommitOperator<WriteResult, Committable> admin = connection.getAdmin(); } + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + int attemptNumber = getRuntimeContext().getAttemptNumber(); + LOG.info("Attempt number is {}", attemptNumber); + if (attemptNumber > 0) { + LOG.info("Send TieringRestoreEvent"); + operatorEventGateway.sendEventToCoordinator( + new SourceEventWrapper(new TieringRestoreEvent())); + } + } + @Override public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/event/TieringRestoreEvent.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/event/TieringRestoreEvent.java new file mode 100644 index 000000000..6bf789a6e --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/event/TieringRestoreEvent.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.tiering.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** SourceEvent used to represent tiering is restoring. */ +public class TieringRestoreEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 296f1580f..180b144e6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -26,6 +26,7 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.flink.metrics.FlinkMetricRegistry; import com.alibaba.fluss.flink.tiering.event.FailedTieringEvent; import com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent; +import com.alibaba.fluss.flink.tiering.event.TieringRestoreEvent; import com.alibaba.fluss.flink.tiering.source.split.TieringSplit; import com.alibaba.fluss.flink.tiering.source.split.TieringSplitGenerator; import com.alibaba.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; @@ -214,6 +215,14 @@ public class TieringSourceEnumerator failedTableEpochs.put(failedTableId, tieringEpoch); } } + if (sourceEvent instanceof TieringRestoreEvent) { + LOG.info( + "Receiving tiering restore event, current tiering table epoch is {} .", + tieringTableEpochs); + // we need to make all as failed + failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs)); + tieringTableEpochs.clear(); + } if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) { // call one round of heartbeat to notify table has been finished or failed @@ -256,6 +265,9 @@ public class TieringSourceEnumerator private @Nullable Tuple3<Long, Long, TablePath> requestTieringTableSplitsViaHeartBeat() { Map<Long, Long> currentFinishedTableEpochs = new HashMap<>(this.finishedTableEpochs); Map<Long, Long> currentFailedTableEpochs = new HashMap<>(this.failedTableEpochs); + LOG.info( + "Requesting tiering table splits via heartbeat, failed table epoch is {}.", + currentFailedTableEpochs); LakeTieringHeartbeatRequest tieringHeartbeatRequest = tieringTableHeartBeat( basicHeartBeat(),
