[ 
https://issues.apache.org/jira/browse/FLINK-31810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17727905#comment-17727905
 ] 

Yun Tang commented on FLINK-31810:
----------------------------------

[~david.artiga] One possible reason might be the duplicated stream, which is 
used if local recovery is enabled, has written the file corruptly. And the 
restored file is broken. However, I did not prove it by comparing the changed 
code. And what kind of DFS did you use to store the checkpoint, S3, HDFS or 
what?

Please keep the broken file or maybe you could disable local-recovery to see 
whether this problem happened again.

> RocksDBException: Bad table magic number on checkpoint rescale
> --------------------------------------------------------------
>
>                 Key: FLINK-31810
>                 URL: https://issues.apache.org/jira/browse/FLINK-31810
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.15.2
>            Reporter: Robert Metzger
>            Priority: Major
>
> While rescaling a job from checkpoint, I ran into this exception:
> {code:java}
> SinkMaterializer[7] -> rob-result[7]: Writer -> rob-result[7]: Committer 
> (4/4)#3 (c1b348f7eed6e1ce0e41ef75338ae754) switched from INITIALIZING to 
> FAILED with failure cause: java.lang.Exception: Exception while creating 
> StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> SinkUpsertMaterializer_7d9b7588bc2ff89baed50d7a4558caa4_(4/4) from any of the 
> 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>       ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>       at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>       at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       ... 13 more
> Caused by: java.io.IOException: Error while opening RocksDB instance.
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreDBInstanceFromStateHandle(RocksDBIncrementalRestoreOperation.java:465)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:321)
>       at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:164)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:315)
>       ... 18 more
> Caused by: org.rocksdb.RocksDBException: Bad table magic number: expected 
> 9863518390377041911, found 4096 in 
> /tmp/job_00000000000000000000000000000000_op_SinkUpsertMaterializer_7d9b7588bc2ff89baed50d7a4558caa4__4_4__uuid_d5587dfc-78b3-427c-8cb6-35507b71bc4b/46475654-5515-430e-b215-389d42cddb97/000232.sst
>       at org.rocksdb.RocksDB.open(Native Method)
>       at org.rocksdb.RocksDB.open(RocksDB.java:306)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)
>       ... 22 more
> {code}
> I haven't found any other cases of this issue on the internet, except for 
> this, but it is not related to Flink: 
> https://github.com/facebook/rocksdb/issues/3438.
> I'm posting this to track if other users are affected by this as well, and to 
> collect information about the circumstances when this issue occurs.
> What I have done to run into this situation:
> a) killed the JobManager
> b) removed the jobgraph information from the cluster config map so that the 
> JobGraph gets regenerated with the changed parallelism
> c) restarted the JobManager, which regenerated the job graph with the new 
> parallelism
> d) During state recovery, above exception occurred.
> In my understanding, rescaling from a checkpoint is legal, because that's 
> what reactive mode is also doing.
> Also, removing the jobgraph information from the cluster config map is ok, 
> because the Flink Kubernetes operator is also doing that.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to