This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ca3604a6af [FLINK-30729][state/changelog] Refactor checkState() to 
WARN of StateChangelogWriter to avoid confusing IllegalException
4ca3604a6af is described below

commit 4ca3604a6afb7dace03464fafe7c5f14ab95fc5b
Author: fredia <fredia...@gmail.com>
AuthorDate: Mon Feb 6 14:28:23 2023 +0800

    [FLINK-30729][state/changelog] Refactor checkState() to WARN of 
StateChangelogWriter to avoid confusing IllegalException
---
 .../org/apache/flink/changelog/fs/FsStateChangelogWriter.java  | 10 ++++++++--
 .../state/changelog/inmemory/InMemoryStateChangelogWriter.java | 10 ++++++++--
 .../state/changelog/inmemory/StateChangelogStorageTest.java    |  2 ++
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index 83a4b2d5861..aebb9adf58f 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -169,8 +169,11 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
 
     @Override
     public void appendMeta(byte[] value) throws IOException {
+        if (closed) {
+            LOG.warn("{} is closed.", logId);
+            return;
+        }
         LOG.trace("append metadata to {}: {} bytes", logId, value.length);
-        checkState(!closed, "%s is closed", logId);
         activeChangeSet.add(StateChange.ofMetadataChange(value));
         preEmptiveFlushIfNeeded(value);
     }
@@ -178,7 +181,10 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     @Override
     public void append(int keyGroup, byte[] value) throws IOException {
         LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
-        checkState(!closed, "%s is closed", logId);
+        if (closed) {
+            LOG.warn("{} is closed.", logId);
+            return;
+        }
         activeChangeSet.add(StateChange.ofDataChange(keyGroup, value));
         preEmptiveFlushIfNeeded(value);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 7fdb3307640..24d675d9a6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -61,8 +61,11 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
 
     @Override
     public void appendMeta(byte[] value) throws IOException {
-        Preconditions.checkState(!closed, "LogWriter is closed");
         LOG.trace("append metadata: {} bytes", value.length);
+        if (closed) {
+            LOG.warn("LogWriter is closed.");
+            return;
+        }
         changesByKeyGroup
                 .computeIfAbsent(META_KEY_GROUP, unused -> new TreeMap<>())
                 .put(sqn, value);
@@ -71,8 +74,11 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
 
     @Override
     public void append(int keyGroup, byte[] value) {
-        Preconditions.checkState(!closed, "LogWriter is closed");
         LOG.trace("append, keyGroup={}, {} bytes", keyGroup, value.length);
+        if (closed) {
+            LOG.warn("LogWriter is closed.");
+            return;
+        }
         changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new 
TreeMap<>()).put(sqn, value);
         sqn = sqn.next();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index f0c92ab55ad..44e8ed8319a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.util.CloseableIterator;
 
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -61,6 +62,7 @@ public class StateChangelogStorageTest<T extends 
ChangelogStateHandle> {
         return Stream.of(true);
     }
 
+    @Disabled("FLINK-30729")
     @MethodSource("parameters")
     @ParameterizedTest(name = "compression = {0}")
     public void testNoAppendAfterClose(boolean compression) throws IOException 
{

Reply via email to