Hi, I am using the state processing API to examine a savepoint. My code works fine when I use a HashMapStateBackend but for larger savepoints, I don’t have enough memory so need to use a EmbeddedRocksDBStateBackend. Even then, I am able to process some smaller states but this one:
operatorID,parallelism,maxParallelism,coordinatorState (bytes),sub task states,total size (bytes) 6030185956219c0e7d5d37d16df14a69,1,128,(none),1,16201253369 …hangs with a thread stuck here: org.rocksdb.RocksDB.disposeInternal(long) RocksDB.java (native) org.rocksdb.RocksObject.disposeInternal() RocksObject.java:37 org.rocksdb.AbstractImmutableNativeReference.close() AbstractImmutableNativeReference.java:57 org.apache.flink.util.IOUtils.closeQuietly(AutoCloseable) IOUtils.java:275 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose() RocksDBKeyedStateBackend.java:456 org.apache.flink.state.api.input.operator.StateReaderOperator.close() StateReaderOperator.java:120 org.apache.flink.state.api.input.KeyedStateInputFormat.close() KeyedStateInputFormat.java:206 org.apache.flink.runtime.operators.DataSourceTask.invoke() DataSourceTask.java:219 org.apache.flink.runtime.taskmanager.Task.doRun() Task.java:779 org.apache.flink.runtime.taskmanager.Task.run() Task.java:566 java.lang.Thread.run() Thread.java:829 I found this issue: https://issues.apache.org/jira/browse/FLINK-20044 which seems to imply that an error has occurred but I don’t see any sign of an error in my logs. My code basically looks like this: DataSource<StateReport> source = savepoint.readKeyedState("re-process-1", new ReFunctionStateReader()); … source. writeAsFormattedText(…); env.execute(); And below is how the logs end. Any suggestions as to what I might do to resolve this issue? Thanks, Mike 9281 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink (TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) (9373e99f41ebc27a485fdd3bb3496a1a) switched from DEPLOYING to INITIALIZING. 9281 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink (TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) (9373e99f41ebc27a485fdd3bb3496a1a) switched from INITIALIZING to RUNNING. 9291 WARN [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) exceeded the 80 characters length limit and was truncated. 9291 WARN [CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0] org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 characters length limit and was truncated. 9299 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager uses directory /var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280 for spill files. 9308 INFO [CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0] org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0 (9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED. 9308 INFO [CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0] org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0 (9b8eb1c7a89c2b36a94049376c5dae6c). 9311 INFO [flink-akka.actor.default-dispatcher-3] org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1)#0 9b8eb1c7a89c2b36a94049376c5dae6c. 9313 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Attempting to load RocksDB native library and store it under '/var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280' 9321 INFO [flink-akka.actor.default-dispatcher-6] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at runStateReport(SavepointReport.java:147) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:147)) (1/1) (9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED. 9561 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Successfully loaded RocksDB native library 9570 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Getting managed memory shared cache for RocksDB. 9573 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Obtained shared RocksDB cache of size 67108864 bytes 9824 INFO [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) (org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at runStateReport(SavepointReport.java:151)) (1/1)#0] org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] - Starting to restore from state handle: KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, offsets=[84184, 97979226, 217371561, 295595622, 439574029, 558281882, 733284918, 974056904, 1077346944, 1207588358, 1309072112, 1404981561, 1555072389, 1648312051, 1770101727, 1871634781, 2087771527, 2187184752, 2414219813, 2515316945, 2631804199, 2729979188, 2922225187, 3045642428, 3262427401, 3358964955, 3466713255, 3593016146, 3714840493, 3818058962, 3911382076, 4011213147, 4141534943, 4273318080, 4381607680, 4486938271, 4573049384, 4708982976, 4887623520, 5041168778, 5154668536, 5288860823, 5374337001, 5484378918, 5613171728, 5799983770, 5914309992, 6027189739, 6147120423, 6245180677, 6359314082, 6473151610, 6584315864, 6745588200, 6849061980, 6945922434, 7076120744, 7189799673, 7298032068, 7401483802, 7523698152, 7625978817, 7851967489, 8048030846, 8163864487, 8258026739, 8361709853, 8488631881, 8587903996, 8717003177, 8832686400, 8924435995, 9008880140, 9098243461, 9209217981, 9321370064, 9528996685, 9669462006, 9771717841, 9924405572, 10006668961, 10102187175, 10208713272, 10315707983, 10448831942, 10581272525, 10694443074, 10803791729, 10925152867, 11028906474, 11191213540, 11296313712, 11448709650, 11574331504, 11844259366, 11927519634, 12015857009, 12121419982, 12235944888, 12311643736, 12422467285, 12535417272, 12628866385, 12741620669, 12830885243, 12910504273, 13012574020, 13152586397, 13270425326, 13392334040, 13509000111, 13626241487, 13727493803, 13916299006, 14059489943, 14190951395, 14378881966, 14575613436, 14682829220, 14912173683, 15224725149, 15403872561, 15498873556, 15619714817, 15735403180, 15842446504, 15970747949, 16085182172]}, stateHandle=RelativeFileStateHandle State: file:/Users/mikeb/Documents/braid/braid-job/savepoints/savepoint-1e4955-ac043eda3ee4/0fa5d199-32b9-4176-b80b-d8f7bdd21f3b, 0fa5d199-32b9-4176-b80b-d8f7bdd21f3b [16201253369 bytes]}.