This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4ca3604a6af [FLINK-30729][state/changelog] Refactor checkState() to WARN of StateChangelogWriter to avoid confusing IllegalException 4ca3604a6af is described below commit 4ca3604a6afb7dace03464fafe7c5f14ab95fc5b Author: fredia <fredia...@gmail.com> AuthorDate: Mon Feb 6 14:28:23 2023 +0800 [FLINK-30729][state/changelog] Refactor checkState() to WARN of StateChangelogWriter to avoid confusing IllegalException --- .../org/apache/flink/changelog/fs/FsStateChangelogWriter.java | 10 ++++++++-- .../state/changelog/inmemory/InMemoryStateChangelogWriter.java | 10 ++++++++-- .../state/changelog/inmemory/StateChangelogStorageTest.java | 2 ++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index 83a4b2d5861..aebb9adf58f 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -169,8 +169,11 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl @Override public void appendMeta(byte[] value) throws IOException { + if (closed) { + LOG.warn("{} is closed.", logId); + return; + } LOG.trace("append metadata to {}: {} bytes", logId, value.length); - checkState(!closed, "%s is closed", logId); activeChangeSet.add(StateChange.ofMetadataChange(value)); preEmptiveFlushIfNeeded(value); } @@ -178,7 +181,10 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl @Override public void append(int keyGroup, byte[] value) throws IOException { LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length); - checkState(!closed, "%s is closed", logId); + if (closed) { + LOG.warn("{} is closed.", logId); + return; + } activeChangeSet.add(StateChange.ofDataChange(keyGroup, value)); preEmptiveFlushIfNeeded(value); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index 7fdb3307640..24d675d9a6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -61,8 +61,11 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChang @Override public void appendMeta(byte[] value) throws IOException { - Preconditions.checkState(!closed, "LogWriter is closed"); LOG.trace("append metadata: {} bytes", value.length); + if (closed) { + LOG.warn("LogWriter is closed."); + return; + } changesByKeyGroup .computeIfAbsent(META_KEY_GROUP, unused -> new TreeMap<>()) .put(sqn, value); @@ -71,8 +74,11 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChang @Override public void append(int keyGroup, byte[] value) { - Preconditions.checkState(!closed, "LogWriter is closed"); LOG.trace("append, keyGroup={}, {} bytes", keyGroup, value.length); + if (closed) { + LOG.warn("LogWriter is closed."); + return; + } changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new TreeMap<>()).put(sqn, value); sqn = sqn.next(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java index f0c92ab55ad..44e8ed8319a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -61,6 +62,7 @@ public class StateChangelogStorageTest<T extends ChangelogStateHandle> { return Stream.of(true); } + @Disabled("FLINK-30729") @MethodSource("parameters") @ParameterizedTest(name = "compression = {0}") public void testNoAppendAfterClose(boolean compression) throws IOException {