This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 65e7004dd32633f9dfc87b0808ffcb587daf525c Author: wangfeifan <zoltar9...@163.com> AuthorDate: Fri Jun 16 11:48:58 2023 +0800 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled Co-authored-by: Roman <khachatryan.ro...@gmail.com> --- .../changelog/ChangelogKeyedStateBackend.java | 28 +++++++++++ .../changelog/ChangelogKeyedStateBackendTest.java | 56 ++++++++++++++++++++++ .../common/PeriodicMaterializationManager.java | 33 ++++++++++--- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f286018eb31..f224098bb8e 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend<K> private long lastConfirmedMaterializationId = -1L; + /** last failed or cancelled materialization. */ + private long lastFailedMaterializationId = -1L; + private final ChangelogTruncateHelper changelogTruncateHelper; public ChangelogKeyedStateBackend( @@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend<K> materializationId = Math.max(materializationId, h.getMaterializationID()); } } + this.lastConfirmedMaterializationId = materializationId; this.materializedId = materializationId + 1; if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { @@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend<K> */ @Override public Optional<MaterializationRunnable> initMaterialization() throws Exception { + if (lastConfirmedMaterializationId < materializedId - 1 + && lastFailedMaterializationId < materializedId - 1) { + // SharedStateRegistry potentially requires that the checkpoint's dependency on the + // shared file be continuous, it will be broken if we trigger a new materialization + // before the previous one has either confirmed or failed. See discussion in + // https://github.com/apache/flink/pull/22669#issuecomment-1593370772 . + LOG.info( + "materialization:{} not confirmed or failed or cancelled, skip trigger new one.", + materializedId - 1); + return Optional.empty(); + } + SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber(); SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); @@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend<K> changelogTruncateHelper.materialized(upTo); } + @Override + public void handleMaterializationFailureOrCancellation( + long materializationID, SequenceNumber upTo, Throwable cause) { + + LOG.info( + "Task {} failed or cancelled materialization:{} which is upTo:{}", + subtaskName, + materializationID, + upTo); + lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID); + } + // TODO: this method may change after the ownership PR private List<KeyedStateHandle> getMaterializedResult( @Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index 03366b62be9..7581b2720f6 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -38,16 +38,21 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder; import org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess; +import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; +import java.io.IOException; +import java.util.Optional; import java.util.concurrent.RunnableFuture; import static java.util.Collections.emptyList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** {@link ChangelogKeyedStateBackend} test. */ @RunWith(Parameterized.class) @@ -85,6 +90,57 @@ public class ChangelogKeyedStateBackendTest { } } + @Test + public void testInitMaterialization() throws Exception { + MockKeyedStateBackend<Integer> delegatedBackend = createMock(); + ChangelogKeyedStateBackend<Integer> backend = createChangelog(delegatedBackend); + + try { + Optional<MaterializationRunnable> runnable; + + appendMockStateChange(backend); // ensure there is non-materialized changelog + + runnable = backend.initMaterialization(); + // 1. should trigger first materialization + assertTrue("first materialization should be trigger.", runnable.isPresent()); + + appendMockStateChange(backend); // ensure there is non-materialized changelog + + // 2. should not trigger new one until the previous one has been confirmed or failed + assertFalse(backend.initMaterialization().isPresent()); + + backend.handleMaterializationFailureOrCancellation( + runnable.get().getMaterializationID(), + runnable.get().getMaterializedTo(), + null); + runnable = backend.initMaterialization(); + // 3. should trigger new one after previous one failed + assertTrue(runnable.isPresent()); + + appendMockStateChange(backend); // ensure there is non-materialized changelog + + // 4. should not trigger new one until the previous one has been confirmed or failed + assertFalse(backend.initMaterialization().isPresent()); + + backend.handleMaterializationResult( + SnapshotResult.empty(), + runnable.get().getMaterializationID(), + runnable.get().getMaterializedTo()); + checkpoint(backend, checkpointId).get().discardState(); + backend.notifyCheckpointComplete(checkpointId); + // 5. should trigger new one after previous one has been confirmed + assertTrue(backend.initMaterialization().isPresent()); + } finally { + backend.close(); + backend.dispose(); + } + } + + private void appendMockStateChange(ChangelogKeyedStateBackend changelogKeyedBackend) + throws IOException { + changelogKeyedBackend.getChangelogWriter().append(0, new byte[] {'s'}); + } + private MockKeyedStateBackend<Integer> createMock() { return new MockKeyedStateBackendBuilder<>( new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), diff --git a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java index d7af80c1752..68bee356208 100644 --- a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java +++ b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java @@ -33,6 +33,8 @@ import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.NotThreadSafe; + import java.io.Closeable; import java.util.Optional; import java.util.concurrent.CancellationException; @@ -51,23 +53,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class PeriodicMaterializationManager implements Closeable { /** {@link MaterializationRunnable} provider and consumer, i.e. state backend. */ + @NotThreadSafe public interface MaterializationTarget { /** * Initialize state materialization so that materialized data can be persisted durably and * included into the checkpoint. * - * <p>This method is not thread safe. It should be called either under a lock or through - * task mailbox executor. - * * @return a tuple of - future snapshot result from the underlying state backend - a {@link * SequenceNumber} identifying the latest change in the changelog */ Optional<MaterializationRunnable> initMaterialization() throws Exception; /** - * This method is not thread safe. It should be called either under a lock or through task - * mailbox executor. + * Implementations should not trigger materialization until the previous one has been + * confirmed or failed. */ void handleMaterializationResult( SnapshotResult<KeyedStateHandle> materializedSnapshot, @@ -75,6 +75,9 @@ public class PeriodicMaterializationManager implements Closeable { SequenceNumber upTo) throws Exception; + void handleMaterializationFailureOrCancellation( + long materializationID, SequenceNumber upTo, Throwable cause); + MaterializationTarget NO_OP = new MaterializationTarget() { @Override @@ -87,6 +90,10 @@ public class PeriodicMaterializationManager implements Closeable { SnapshotResult<KeyedStateHandle> materializedSnapshot, long materializationID, SequenceNumber upTo) {} + + @Override + public void handleMaterializationFailureOrCancellation( + long materializationID, SequenceNumber upTo, Throwable cause) {} }; } @@ -268,9 +275,11 @@ public class PeriodicMaterializationManager implements Closeable { } else if (throwable instanceof CancellationException) { // can happen e.g. due to task cancellation LOG.info("materialization cancelled", throwable); + notifyFailureOrCancellation(materializationID, upTo, throwable); scheduleNextMaterialization(); } else { // if failed + notifyFailureOrCancellation(materializationID, upTo, throwable); metrics.reportFailedMaterialization(); int retryTime = numberOfConsecutiveFailures.incrementAndGet(); @@ -295,6 +304,18 @@ public class PeriodicMaterializationManager implements Closeable { }); } + private void notifyFailureOrCancellation( + long materializationId, SequenceNumber upTo, Throwable cause) { + mailboxExecutor.execute( + () -> + target.handleMaterializationFailureOrCancellation( + materializationId, upTo, cause), + "Task {} materialization:{},upTo:{} failed or canceled.", + subtaskName, + materializationId, + upTo); + } + private CompletableFuture<SnapshotResult<KeyedStateHandle>> uploadSnapshot( RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture) { @@ -385,7 +406,7 @@ public class PeriodicMaterializationManager implements Closeable { return materializationRunnable; } - SequenceNumber getMaterializedTo() { + public SequenceNumber getMaterializedTo() { return materializedTo; }