Hi,

I am using the state processing API to examine a savepoint. My code works fine 
when I use a HashMapStateBackend but for larger savepoints, I don’t have enough 
memory so need to use a EmbeddedRocksDBStateBackend. Even then, I am able to 
process some smaller states but this one:

operatorID,parallelism,maxParallelism,coordinatorState (bytes),sub task 
states,total size (bytes)
6030185956219c0e7d5d37d16df14a69,1,128,(none),1,16201253369

…hangs with a thread stuck here:

org.rocksdb.RocksDB.disposeInternal(long) RocksDB.java (native)
org.rocksdb.RocksObject.disposeInternal() RocksObject.java:37
org.rocksdb.AbstractImmutableNativeReference.close() 
AbstractImmutableNativeReference.java:57
org.apache.flink.util.IOUtils.closeQuietly(AutoCloseable) IOUtils.java:275
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose() 
RocksDBKeyedStateBackend.java:456
org.apache.flink.state.api.input.operator.StateReaderOperator.close() 
StateReaderOperator.java:120
org.apache.flink.state.api.input.KeyedStateInputFormat.close() 
KeyedStateInputFormat.java:206
org.apache.flink.runtime.operators.DataSourceTask.invoke() 
DataSourceTask.java:219
org.apache.flink.runtime.taskmanager.Task.doRun() Task.java:779
org.apache.flink.runtime.taskmanager.Task.run() Task.java:566
java.lang.Thread.run() Thread.java:829

I found this issue:

https://issues.apache.org/jira/browse/FLINK-20044

which seems to imply that an error has occurred but I don’t see any sign of an 
error in my logs. My code basically looks like this:

DataSource<StateReport> source = savepoint.readKeyedState("re-process-1", new 
ReFunctionStateReader());
…
source. writeAsFormattedText(…);
env.execute();

And below is how the logs end. Any suggestions as to what I might do to resolve 
this issue?

Thanks,
Mike

9281 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink 
(TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) 
(9373e99f41ebc27a485fdd3bb3496a1a) switched from DEPLOYING to INITIALIZING.
9281 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DataSink 
(TextOutputFormat (savepoint-re-fn-report.csv) - UTF-8) (1/1) 
(9373e99f41ebc27a485fdd3bb3496a1a) switched from INITIALIZING to RUNNING.
9291 WARN  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at 
readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) exceeded the 80 
characters length limit and was truncated.
9291 WARN  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.metrics.MetricGroup [] - The operator name DataSource (at 
runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80 
characters length limit and was truncated.
9299 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager 
uses directory 
/var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280
 for spill files.
9308 INFO  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.runtime.taskmanager.Task [] - CHAIN DataSource (at 
runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0 
(9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED.
9308 INFO  [CHAIN DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0] 
org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for CHAIN 
DataSource (at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0 
(9b8eb1c7a89c2b36a94049376c5dae6c).
9311 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and 
sending final execution state FINISHED to JobManager for task CHAIN DataSource 
(at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1)#0 
9b8eb1c7a89c2b36a94049376c5dae6c.
9313 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Attempting to load RocksDB native library and store it under 
'/var/folders/sr/mwx4cq6s4qv_q_d6bgc5ptv80000gn/T/flink-io-ae624c6f-0ec3-46e2-9b67-a1828a712280'
9321 INFO  [flink-akka.actor.default-dispatcher-6] 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource 
(at runStateReport(SavepointReport.java:147) 
(org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:147)) (1/1) 
(9b8eb1c7a89c2b36a94049376c5dae6c) switched from RUNNING to FINISHED.
9561 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Successfully loaded RocksDB native library
9570 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Getting managed memory shared cache for RocksDB.
9573 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Obtained shared RocksDB cache of size 67108864 bytes
9824 INFO  [CHAIN DataSource (at readKeyedState(ExistingSavepoint.java:314) 
(org.apache.flink.state.api.input.KeyedStateInputFormat)) -> Map (Map at 
runStateReport(SavepointReport.java:151)) (1/1)#0] 
org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] - 
Starting to restore from state handle: 
KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
 endKeyGroup=127}, offsets=[84184, 97979226, 217371561, 295595622, 439574029, 
558281882, 733284918, 974056904, 1077346944, 1207588358, 1309072112, 
1404981561, 1555072389, 1648312051, 1770101727, 1871634781, 2087771527, 
2187184752, 2414219813, 2515316945, 2631804199, 2729979188, 2922225187, 
3045642428, 3262427401, 3358964955, 3466713255, 3593016146, 3714840493, 
3818058962, 3911382076, 4011213147, 4141534943, 4273318080, 4381607680, 
4486938271, 4573049384, 4708982976, 4887623520, 5041168778, 5154668536, 
5288860823, 5374337001, 5484378918, 5613171728, 5799983770, 5914309992, 
6027189739, 6147120423, 6245180677, 6359314082, 6473151610, 6584315864, 
6745588200, 6849061980, 6945922434, 7076120744, 7189799673, 7298032068, 
7401483802, 7523698152, 7625978817, 7851967489, 8048030846, 8163864487, 
8258026739, 8361709853, 8488631881, 8587903996, 8717003177, 8832686400, 
8924435995, 9008880140, 9098243461, 9209217981, 9321370064, 9528996685, 
9669462006, 9771717841, 9924405572, 10006668961, 10102187175, 10208713272, 
10315707983, 10448831942, 10581272525, 10694443074, 10803791729, 10925152867, 
11028906474, 11191213540, 11296313712, 11448709650, 11574331504, 11844259366, 
11927519634, 12015857009, 12121419982, 12235944888, 12311643736, 12422467285, 
12535417272, 12628866385, 12741620669, 12830885243, 12910504273, 13012574020, 
13152586397, 13270425326, 13392334040, 13509000111, 13626241487, 13727493803, 
13916299006, 14059489943, 14190951395, 14378881966, 14575613436, 14682829220, 
14912173683, 15224725149, 15403872561, 15498873556, 15619714817, 15735403180, 
15842446504, 15970747949, 16085182172]}, stateHandle=RelativeFileStateHandle 
State: 
file:/Users/mikeb/Documents/braid/braid-job/savepoints/savepoint-1e4955-ac043eda3ee4/0fa5d199-32b9-4176-b80b-d8f7bdd21f3b,
 0fa5d199-32b9-4176-b80b-d8f7bdd21f3b [16201253369 bytes]}.

Reply via email to