Florian Schmidt created FLINK-9336:
--------------------------------------
Summary: Queryable state fails with Exception after task manager
restore
Key: FLINK-9336
URL: https://issues.apache.org/jira/browse/FLINK-9336
Project: Flink
Issue Type: Bug
Components: Queryable State
Affects Versions: 1.5.0
Reporter: Florian Schmidt
The following example can be used to reproduce the Exception:
# Run an app with a FlatMap which exposes its MapState through Queryable State
with RocksDB
# Run a queryable state client: ✔️
# Kill the TM
# Start a new TM
# Run a queryable state client: ❌ (StackTrace below)
This happens if we run our queryable state client after the new TM has started
and the app is running but *before the FlatMap has processed elements again*
With a little help and some debugging I found out that very likely the reason
is that in the _RocksDBMapState_ there is a private field _userKeyOffset,_
**which is initialised through the code path of
_RocksDBMapState::serializeCurrentKeyAndNamespace. **_This will only be called
when processing an element and therefore accessing the state. If now the state
is accessed before that through queryable state, this value will be 0 and
therefore the deserialization of the user key will fail as seen below.
The observed stacktrace
{code:java}
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Failed request 0. Caused by:
java.lang.RuntimeException: Failed request 0. Caused by:
java.lang.RuntimeException: Error while processing request with ID 0. Caused
by: java.lang.RuntimeException: Error while deserializing the user key. at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at
java.io.DataInputStream.readFully(DataInputStream.java:197) at
java.io.DataInputStream.readUTF(DataInputStream.java:609) at
java.io.DataInputStream.readUTF(DataInputStream.java:564) at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
... 11 more at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748) at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748) at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at
org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
at
org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
Caused by: java.lang.RuntimeException: Failed request 0. Caused by:
java.lang.RuntimeException: Failed request 0. Caused by:
java.lang.RuntimeException: Error while processing request with ID 0. Caused
by: java.lang.RuntimeException: Error while deserializing the user key. at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at
java.io.DataInputStream.readFully(DataInputStream.java:197) at
java.io.DataInputStream.readUTF(DataInputStream.java:609) at
java.io.DataInputStream.readUTF(DataInputStream.java:564) at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
... 11 more at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748) at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748) at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)