Hello guys

I'm using the flink runner. I had to update my application in order to
fix some minor bugs,  (incorrect assignment on the output fields)

Then when I tried to update my pipeline with the latest code I found this error:

org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException: Error adding to state.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:693)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error adding to state.
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.add(FlinkStateInternals.java:760)
at 
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB.
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.add(FlinkStateInternals.java:753)
at 
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apa…

My app was running fine and then I just updated the code. The failed
task belongs to a side input.
 (I'm using AWS Kinesis application). Is not the first issue I found,
others are sometimes related to null fields that should not be null
(and because I'm using autovalue builders, it fails to build them). So
my questions are:


- how can I prevent this to happen?
- How to deal with incompatible states in case we needed'

Currently I had to restart the application from scratch, but I'm
afraid of losing data doing that way.

Really appreciated some help (I'm using KafkaIO for read/write my data)

Regards.

Reply via email to