Hello guys I'm using the flink runner. I had to update my application in order to fix some minor bugs, (incorrect assignment on the output fields)
Then when I tried to update my pipeline with the latest code I found this error: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error adding to state. at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189) at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:693) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: Error adding to state. at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.add(FlinkStateInternals.java:760) at org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119) at org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613) at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB. at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91) at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.add(FlinkStateInternals.java:753) at org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119) at org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613) at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189) at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) at org.apa… My app was running fine and then I just updated the code. The failed task belongs to a side input. (I'm using AWS Kinesis application). Is not the first issue I found, others are sometimes related to null fields that should not be null (and because I'm using autovalue builders, it fails to build them). So my questions are: - how can I prevent this to happen? - How to deal with incompatible states in case we needed' Currently I had to restart the application from scratch, but I'm afraid of losing data doing that way. Really appreciated some help (I'm using KafkaIO for read/write my data) Regards.
