This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 65e7004dd32633f9dfc87b0808ffcb587daf525c
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Fri Jun 16 11:48:58 2023 +0800

    [hotfix][state-changelog] Don't trigger new materialization unless the 
previous one is confirmed/failed/cancelled
    
    Co-authored-by: Roman <khachatryan.ro...@gmail.com>
---
 .../changelog/ChangelogKeyedStateBackend.java      | 28 +++++++++++
 .../changelog/ChangelogKeyedStateBackendTest.java  | 56 ++++++++++++++++++++++
 .../common/PeriodicMaterializationManager.java     | 33 ++++++++++---
 3 files changed, 111 insertions(+), 6 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index f286018eb31..f224098bb8e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend<K>
 
     private long lastConfirmedMaterializationId = -1L;
 
+    /** last failed or cancelled materialization. */
+    private long lastFailedMaterializationId = -1L;
+
     private final ChangelogTruncateHelper changelogTruncateHelper;
 
     public ChangelogKeyedStateBackend(
@@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend<K>
                 materializationId = Math.max(materializationId, 
h.getMaterializationID());
             }
         }
+        this.lastConfirmedMaterializationId = materializationId;
         this.materializedId = materializationId + 1;
 
         if (!localMaterialized.isEmpty() || 
!localRestoredNonMaterialized.isEmpty()) {
@@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend<K>
      */
     @Override
     public Optional<MaterializationRunnable> initMaterialization() throws 
Exception {
+        if (lastConfirmedMaterializationId < materializedId - 1
+                && lastFailedMaterializationId < materializedId - 1) {
+            // SharedStateRegistry potentially requires that the checkpoint's 
dependency on the
+            // shared file be continuous, it will be broken if we trigger a 
new materialization
+            // before the previous one has either confirmed or failed. See 
discussion in
+            // 
https://github.com/apache/flink/pull/22669#issuecomment-1593370772 .
+            LOG.info(
+                    "materialization:{} not confirmed or failed or cancelled, 
skip trigger new one.",
+                    materializedId - 1);
+            return Optional.empty();
+        }
+
         SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber();
         SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
 
@@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend<K>
         changelogTruncateHelper.materialized(upTo);
     }
 
+    @Override
+    public void handleMaterializationFailureOrCancellation(
+            long materializationID, SequenceNumber upTo, Throwable cause) {
+
+        LOG.info(
+                "Task {} failed or cancelled materialization:{} which is 
upTo:{}",
+                subtaskName,
+                materializationID,
+                upTo);
+        lastFailedMaterializationId = Math.max(lastFailedMaterializationId, 
materializationID);
+    }
+
     // TODO: this method may change after the ownership PR
     private List<KeyedStateHandle> getMaterializedResult(
             @Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
index 03366b62be9..7581b2720f6 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
@@ -38,16 +38,21 @@ import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend;
 import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier;
 import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder;
 import 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess;
+import 
org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 
+import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.RunnableFuture;
 
 import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /** {@link ChangelogKeyedStateBackend} test. */
 @RunWith(Parameterized.class)
@@ -85,6 +90,57 @@ public class ChangelogKeyedStateBackendTest {
         }
     }
 
+    @Test
+    public void testInitMaterialization() throws Exception {
+        MockKeyedStateBackend<Integer> delegatedBackend = createMock();
+        ChangelogKeyedStateBackend<Integer> backend = 
createChangelog(delegatedBackend);
+
+        try {
+            Optional<MaterializationRunnable> runnable;
+
+            appendMockStateChange(backend); // ensure there is 
non-materialized changelog
+
+            runnable = backend.initMaterialization();
+            // 1. should trigger first materialization
+            assertTrue("first materialization should be trigger.", 
runnable.isPresent());
+
+            appendMockStateChange(backend); // ensure there is 
non-materialized changelog
+
+            // 2. should not trigger new one until the previous one has been 
confirmed or failed
+            assertFalse(backend.initMaterialization().isPresent());
+
+            backend.handleMaterializationFailureOrCancellation(
+                    runnable.get().getMaterializationID(),
+                    runnable.get().getMaterializedTo(),
+                    null);
+            runnable = backend.initMaterialization();
+            // 3. should trigger new one after previous one failed
+            assertTrue(runnable.isPresent());
+
+            appendMockStateChange(backend); // ensure there is 
non-materialized changelog
+
+            // 4. should not trigger new one until the previous one has been 
confirmed or failed
+            assertFalse(backend.initMaterialization().isPresent());
+
+            backend.handleMaterializationResult(
+                    SnapshotResult.empty(),
+                    runnable.get().getMaterializationID(),
+                    runnable.get().getMaterializedTo());
+            checkpoint(backend, checkpointId).get().discardState();
+            backend.notifyCheckpointComplete(checkpointId);
+            // 5. should trigger new one after previous one has been confirmed
+            assertTrue(backend.initMaterialization().isPresent());
+        } finally {
+            backend.close();
+            backend.dispose();
+        }
+    }
+
+    private void appendMockStateChange(ChangelogKeyedStateBackend 
changelogKeyedBackend)
+            throws IOException {
+        changelogKeyedBackend.getChangelogWriter().append(0, new byte[] {'s'});
+    }
+
     private MockKeyedStateBackend<Integer> createMock() {
         return new MockKeyedStateBackendBuilder<>(
                         new KvStateRegistry().createTaskRegistry(new JobID(), 
new JobVertexID()),
diff --git 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
index d7af80c1752..68bee356208 100644
--- 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
+++ 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
@@ -33,6 +33,8 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import java.io.Closeable;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
@@ -51,23 +53,21 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class PeriodicMaterializationManager implements Closeable {
 
     /** {@link MaterializationRunnable} provider and consumer, i.e. state 
backend. */
+    @NotThreadSafe
     public interface MaterializationTarget {
 
         /**
          * Initialize state materialization so that materialized data can be 
persisted durably and
          * included into the checkpoint.
          *
-         * <p>This method is not thread safe. It should be called either under 
a lock or through
-         * task mailbox executor.
-         *
          * @return a tuple of - future snapshot result from the underlying 
state backend - a {@link
          *     SequenceNumber} identifying the latest change in the changelog
          */
         Optional<MaterializationRunnable> initMaterialization() throws 
Exception;
 
         /**
-         * This method is not thread safe. It should be called either under a 
lock or through task
-         * mailbox executor.
+         * Implementations should not trigger materialization until the 
previous one has been
+         * confirmed or failed.
          */
         void handleMaterializationResult(
                 SnapshotResult<KeyedStateHandle> materializedSnapshot,
@@ -75,6 +75,9 @@ public class PeriodicMaterializationManager implements 
Closeable {
                 SequenceNumber upTo)
                 throws Exception;
 
+        void handleMaterializationFailureOrCancellation(
+                long materializationID, SequenceNumber upTo, Throwable cause);
+
         MaterializationTarget NO_OP =
                 new MaterializationTarget() {
                     @Override
@@ -87,6 +90,10 @@ public class PeriodicMaterializationManager implements 
Closeable {
                             SnapshotResult<KeyedStateHandle> 
materializedSnapshot,
                             long materializationID,
                             SequenceNumber upTo) {}
+
+                    @Override
+                    public void handleMaterializationFailureOrCancellation(
+                            long materializationID, SequenceNumber upTo, 
Throwable cause) {}
                 };
     }
 
@@ -268,9 +275,11 @@ public class PeriodicMaterializationManager implements 
Closeable {
                             } else if (throwable instanceof 
CancellationException) {
                                 // can happen e.g. due to task cancellation
                                 LOG.info("materialization cancelled", 
throwable);
+                                notifyFailureOrCancellation(materializationID, 
upTo, throwable);
                                 scheduleNextMaterialization();
                             } else {
                                 // if failed
+                                notifyFailureOrCancellation(materializationID, 
upTo, throwable);
                                 metrics.reportFailedMaterialization();
                                 int retryTime = 
numberOfConsecutiveFailures.incrementAndGet();
 
@@ -295,6 +304,18 @@ public class PeriodicMaterializationManager implements 
Closeable {
                         });
     }
 
+    private void notifyFailureOrCancellation(
+            long materializationId, SequenceNumber upTo, Throwable cause) {
+        mailboxExecutor.execute(
+                () ->
+                        target.handleMaterializationFailureOrCancellation(
+                                materializationId, upTo, cause),
+                "Task {} materialization:{},upTo:{} failed or canceled.",
+                subtaskName,
+                materializationId,
+                upTo);
+    }
+
     private CompletableFuture<SnapshotResult<KeyedStateHandle>> uploadSnapshot(
             RunnableFuture<SnapshotResult<KeyedStateHandle>> 
materializedRunnableFuture) {
 
@@ -385,7 +406,7 @@ public class PeriodicMaterializationManager implements 
Closeable {
             return materializationRunnable;
         }
 
-        SequenceNumber getMaterializedTo() {
+        public SequenceNumber getMaterializedTo() {
             return materializedTo;
         }
 

Reply via email to