[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Tang updated FLINK-25478: ----------------------------- Summary: Same materialized state handle should not register multi times (was: Same materialized state handle should register multi times) > Same materialized state handle should not register multi times > -------------------------------------------------------------- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / State Backends > Reporter: Yun Tang > Priority: Critical > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > {code:java} > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: xxxxx > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)