[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled

2023-08-05 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 65e7004dd32633f9dfc87b0808ffcb587daf525c
Author: wangfeifan 
AuthorDate: Fri Jun 16 11:48:58 2023 +0800

[hotfix][state-changelog] Don't trigger new materialization unless the 
previous one is confirmed/failed/cancelled

Co-authored-by: Roman 
---
 .../changelog/ChangelogKeyedStateBackend.java  | 28 +++
 .../changelog/ChangelogKeyedStateBackendTest.java  | 56 ++
 .../common/PeriodicMaterializationManager.java | 33 ++---
 3 files changed, 111 insertions(+), 6 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index f286018eb31..f224098bb8e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend
 
 private long lastConfirmedMaterializationId = -1L;
 
+/** last failed or cancelled materialization. */
+private long lastFailedMaterializationId = -1L;
+
 private final ChangelogTruncateHelper changelogTruncateHelper;
 
 public ChangelogKeyedStateBackend(
@@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend
 materializationId = Math.max(materializationId, 
h.getMaterializationID());
 }
 }
+this.lastConfirmedMaterializationId = materializationId;
 this.materializedId = materializationId + 1;
 
 if (!localMaterialized.isEmpty() || 
!localRestoredNonMaterialized.isEmpty()) {
@@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend
  */
 @Override
 public Optional initMaterialization() throws 
Exception {
+if (lastConfirmedMaterializationId < materializedId - 1
+&& lastFailedMaterializationId < materializedId - 1) {
+// SharedStateRegistry potentially requires that the checkpoint's 
dependency on the
+// shared file be continuous, it will be broken if we trigger a 
new materialization
+// before the previous one has either confirmed or failed. See 
discussion in
+// 
https://github.com/apache/flink/pull/22669#issuecomment-1593370772 .
+LOG.info(
+"materialization:{} not confirmed or failed or cancelled, 
skip trigger new one.",
+materializedId - 1);
+return Optional.empty();
+}
+
 SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber();
 SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
 
@@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend
 changelogTruncateHelper.materialized(upTo);
 }
 
+@Override
+public void handleMaterializationFailureOrCancellation(
+long materializationID, SequenceNumber upTo, Throwable cause) {
+
+LOG.info(
+"Task {} failed or cancelled materialization:{} which is 
upTo:{}",
+subtaskName,
+materializationID,
+upTo);
+lastFailedMaterializationId = Math.max(lastFailedMaterializationId, 
materializationID);
+}
+
 // TODO: this method may change after the ownership PR
 private List getMaterializedResult(
 @Nonnull SnapshotResult materializedSnapshot) {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
index 03366b62be9..7581b2720f6 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
@@ -38,16 +38,21 @@ import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend;
 import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier;
 import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder;
 import 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess;
+import 
org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import 

[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled

2023-08-05 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aad658910fd6967199bce69f33872d245315bca1
Author: wangfeifan 
AuthorDate: Fri Jun 16 11:48:58 2023 +0800

[hotfix][state-changelog] Don't trigger new materialization unless the 
previous one is confirmed/failed/cancelled

Co-authored-by: Roman 
---
 .../changelog/ChangelogKeyedStateBackend.java  | 28 +++
 .../changelog/ChangelogKeyedStateBackendTest.java  | 56 ++
 .../common/PeriodicMaterializationManager.java | 33 ++---
 3 files changed, 111 insertions(+), 6 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index f286018eb31..f224098bb8e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -190,6 +190,9 @@ public class ChangelogKeyedStateBackend
 
 private long lastConfirmedMaterializationId = -1L;
 
+/** last failed or cancelled materialization. */
+private long lastFailedMaterializationId = -1L;
+
 private final ChangelogTruncateHelper changelogTruncateHelper;
 
 public ChangelogKeyedStateBackend(
@@ -728,6 +731,7 @@ public class ChangelogKeyedStateBackend
 materializationId = Math.max(materializationId, 
h.getMaterializationID());
 }
 }
+this.lastConfirmedMaterializationId = materializationId;
 this.materializedId = materializationId + 1;
 
 if (!localMaterialized.isEmpty() || 
!localRestoredNonMaterialized.isEmpty()) {
@@ -758,6 +762,18 @@ public class ChangelogKeyedStateBackend
  */
 @Override
 public Optional initMaterialization() throws 
Exception {
+if (lastConfirmedMaterializationId < materializedId - 1
+&& lastFailedMaterializationId < materializedId - 1) {
+// SharedStateRegistry potentially requires that the checkpoint's 
dependency on the
+// shared file be continuous, it will be broken if we trigger a 
new materialization
+// before the previous one has either confirmed or failed. See 
discussion in
+// 
https://github.com/apache/flink/pull/22669#issuecomment-1593370772 .
+LOG.info(
+"materialization:{} not confirmed or failed or cancelled, 
skip trigger new one.",
+materializedId - 1);
+return Optional.empty();
+}
+
 SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber();
 SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
 
@@ -832,6 +848,18 @@ public class ChangelogKeyedStateBackend
 changelogTruncateHelper.materialized(upTo);
 }
 
+@Override
+public void handleMaterializationFailureOrCancellation(
+long materializationID, SequenceNumber upTo, Throwable cause) {
+
+LOG.info(
+"Task {} failed or cancelled materialization:{} which is 
upTo:{}",
+subtaskName,
+materializationID,
+upTo);
+lastFailedMaterializationId = Math.max(lastFailedMaterializationId, 
materializationID);
+}
+
 // TODO: this method may change after the ownership PR
 private List getMaterializedResult(
 @Nonnull SnapshotResult materializedSnapshot) {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
index 03366b62be9..7581b2720f6 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
@@ -38,16 +38,21 @@ import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend;
 import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier;
 import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder;
 import 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess;
+import 
org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import 

[flink] 04/04: [hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled

2023-08-03 Thread roman
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85f32d6bcb31708c9c2c845ea03ef117726b7c1a
Author: wangfeifan 
AuthorDate: Fri Jun 16 11:48:58 2023 +0800

[hotfix][state-changelog] Don't trigger new materialization unless the 
previous one is confirmed/failed/cancelled

Co-authored-by: Roman 
---
 .../changelog/ChangelogKeyedStateBackend.java  | 28 +++
 .../changelog/ChangelogKeyedStateBackendTest.java  | 56 ++
 .../common/PeriodicMaterializationManager.java | 33 ++---
 3 files changed, 111 insertions(+), 6 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index f998bc45cf7..599441b8970 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -191,6 +191,9 @@ public class ChangelogKeyedStateBackend
 
 private long lastConfirmedMaterializationId = -1L;
 
+/** last failed or cancelled materialization. */
+private long lastFailedMaterializationId = -1L;
+
 private final ChangelogTruncateHelper changelogTruncateHelper;
 
 public ChangelogKeyedStateBackend(
@@ -729,6 +732,7 @@ public class ChangelogKeyedStateBackend
 materializationId = Math.max(materializationId, 
h.getMaterializationID());
 }
 }
+this.lastConfirmedMaterializationId = materializationId;
 this.materializedId = materializationId + 1;
 
 if (!localMaterialized.isEmpty() || 
!localRestoredNonMaterialized.isEmpty()) {
@@ -759,6 +763,18 @@ public class ChangelogKeyedStateBackend
  */
 @Override
 public Optional initMaterialization() throws 
Exception {
+if (lastConfirmedMaterializationId < materializedId - 1
+&& lastFailedMaterializationId < materializedId - 1) {
+// SharedStateRegistry potentially requires that the checkpoint's 
dependency on the
+// shared file be continuous, it will be broken if we trigger a 
new materialization
+// before the previous one has either confirmed or failed. See 
discussion in
+// 
https://github.com/apache/flink/pull/22669#issuecomment-1593370772 .
+LOG.info(
+"materialization:{} not confirmed or failed or cancelled, 
skip trigger new one.",
+materializedId - 1);
+return Optional.empty();
+}
+
 SequenceNumber upTo = stateChangelogWriter.nextSequenceNumber();
 SequenceNumber lastMaterializedTo = 
changelogSnapshotState.lastMaterializedTo();
 
@@ -833,6 +849,18 @@ public class ChangelogKeyedStateBackend
 changelogTruncateHelper.materialized(upTo);
 }
 
+@Override
+public void handleMaterializationFailureOrCancellation(
+long materializationID, SequenceNumber upTo, Throwable cause) {
+
+LOG.info(
+"Task {} failed or cancelled materialization:{} which is 
upTo:{}",
+subtaskName,
+materializationID,
+upTo);
+lastFailedMaterializationId = Math.max(lastFailedMaterializationId, 
materializationID);
+}
+
 // TODO: this method may change after the ownership PR
 private List getMaterializedResult(
 @Nonnull SnapshotResult materializedSnapshot) {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
index 980d81e0a5d..ff4c93c5460 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
@@ -38,16 +38,21 @@ import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend;
 import 
org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier;
 import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder;
 import 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.DummyCheckpointingStorageAccess;
+import 
org.apache.flink.state.common.PeriodicMaterializationManager.MaterializationRunnable;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import