[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git commit 65e7004dd32633f9dfc87b0808ffcb587daf525c Author: wangfeifan AuthorDate: Fri Jun 16 11:48:58 2023 +0800 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled Co-authored-by: Roman --- .../changelog/ChangelogKeyedStateBackend.java | 28 +++ .../changelog/ChangelogKeyedStateBackendTest.java | 56 ++ .../common/PeriodicMaterializationManager.java | 33 ++--- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f286018eb31..f224098bb8e 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend private long lastConfirmedMaterializationId = -1L; +/** last failed or cancelled materialization. */ +private long lastFailedMaterializationId = -1L; + private final ChangelogTruncateHelper changelogTruncateHelper; public ChangelogKeyedStateBackend( @@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastConfirmedMaterializationId = materializationId; this.materializedId = materializationId + 1; if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { @@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend */ @Override public Optional initMaterialization() throws Exception { +if (lastConfirmedMaterializationId < materializedId - 1 +&& lastFailedMaterializationId < materializedId - 1) { +// SharedStateRegistry potentially requires that the checkpoint's dependency on the +// shared file be continuous, it will be broken if we trigger a new materialization +// before the previous one has either confirmed or failed. See discussion in +// https://github.com/apache/flink/pull/22669#issuecomment-1593370772 . +LOG.info( +"materialization:{} not confirmed or failed or cancelled, skip trigger new one.", +materializedId - 1); +return Optional.empty(); +} + SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber(); SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); @@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend changelogTruncateHelper.materialized(upTo); } +@Override +public void handleMaterializationFailureOrCancellation( +long materializationID, SequenceNumber upTo, Throwable cause) { + +LOG.info( +"Task {} failed or cancelled materialization:{} which is upTo:{}", +subtaskName, +materializationID, +upTo); +lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID); +} + // TODO: this method may change after the ownership PR private List getMaterializedResult( @Nonnull SnapshotResult materializedSnapshot) { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index 03366b62be9..7581b2720f6 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -38,16 +38,21 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder; import org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess; +import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import
[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit aad658910fd6967199bce69f33872d245315bca1 Author: wangfeifan AuthorDate: Fri Jun 16 11:48:58 2023 +0800 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled Co-authored-by: Roman --- .../changelog/ChangelogKeyedStateBackend.java | 28 +++ .../changelog/ChangelogKeyedStateBackendTest.java | 56 ++ .../common/PeriodicMaterializationManager.java | 33 ++--- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f286018eb31..f224098bb8e 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend private long lastConfirmedMaterializationId = -1L; +/** last failed or cancelled materialization. */ +private long lastFailedMaterializationId = -1L; + private final ChangelogTruncateHelper changelogTruncateHelper; public ChangelogKeyedStateBackend( @@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastConfirmedMaterializationId = materializationId; this.materializedId = materializationId + 1; if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { @@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend */ @Override public Optional initMaterialization() throws Exception { +if (lastConfirmedMaterializationId < materializedId - 1 +&& lastFailedMaterializationId < materializedId - 1) { +// SharedStateRegistry potentially requires that the checkpoint's dependency on the +// shared file be continuous, it will be broken if we trigger a new materialization +// before the previous one has either confirmed or failed. See discussion in +// https://github.com/apache/flink/pull/22669#issuecomment-1593370772 . +LOG.info( +"materialization:{} not confirmed or failed or cancelled, skip trigger new one.", +materializedId - 1); +return Optional.empty(); +} + SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber(); SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); @@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend changelogTruncateHelper.materialized(upTo); } +@Override +public void handleMaterializationFailureOrCancellation( +long materializationID, SequenceNumber upTo, Throwable cause) { + +LOG.info( +"Task {} failed or cancelled materialization:{} which is upTo:{}", +subtaskName, +materializationID, +upTo); +lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID); +} + // TODO: this method may change after the ownership PR private List getMaterializedResult( @Nonnull SnapshotResult materializedSnapshot) { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index 03366b62be9..7581b2720f6 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -38,16 +38,21 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder; import org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess; +import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import
[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled
This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 85f32d6bcb31708c9c2c845ea03ef117726b7c1a Author: wangfeifan AuthorDate: Fri Jun 16 11:48:58 2023 +0800 [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled Co-authored-by: Roman --- .../changelog/ChangelogKeyedStateBackend.java | 28 +++ .../changelog/ChangelogKeyedStateBackendTest.java | 56 ++ .../common/PeriodicMaterializationManager.java | 33 ++--- 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f998bc45cf7..599441b8970 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -191,6 +191,9 @@ public class ChangelogKeyedStateBackend private long lastConfirmedMaterializationId = -1L; +/** last failed or cancelled materialization. */ +private long lastFailedMaterializationId = -1L; + private final ChangelogTruncateHelper changelogTruncateHelper; public ChangelogKeyedStateBackend( @@ -729,6 +732,7 @@ public class ChangelogKeyedStateBackend materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastConfirmedMaterializationId = materializationId; this.materializedId = materializationId + 1; if (!localMaterialized.isEmpty() || !localRestoredNonMaterialized.isEmpty()) { @@ -759,6 +763,18 @@ public class ChangelogKeyedStateBackend */ @Override public Optional initMaterialization() throws Exception { +if (lastConfirmedMaterializationId < materializedId - 1 +&& lastFailedMaterializationId < materializedId - 1) { +// SharedStateRegistry potentially requires that the checkpoint's dependency on the +// shared file be continuous, it will be broken if we trigger a new materialization +// before the previous one has either confirmed or failed. See discussion in +// https://github.com/apache/flink/pull/22669#issuecomment-1593370772 . +LOG.info( +"materialization:{} not confirmed or failed or cancelled, skip trigger new one.", +materializedId - 1); +return Optional.empty(); +} + SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber(); SequenceNumber lastMaterializedTo = changelogSnapshotState.lastMaterializedTo(); @@ -833,6 +849,18 @@ public class ChangelogKeyedStateBackend changelogTruncateHelper.materialized(upTo); } +@Override +public void handleMaterializationFailureOrCancellation( +long materializationID, SequenceNumber upTo, Throwable cause) { + +LOG.info( +"Task {} failed or cancelled materialization:{} which is upTo:{}", +subtaskName, +materializationID, +upTo); +lastFailedMaterializationId = Math.max(lastFailedMaterializationId, materializationID); +} + // TODO: this method may change after the ownership PR private List getMaterializedResult( @Nonnull SnapshotResult materializedSnapshot) { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java index 980d81e0a5d..ff4c93c5460 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java @@ -38,16 +38,21 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder; import org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess; +import org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import