When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 Docker image, the following exception is visible for the failing job on the *job manager*:
2020-11-04 09:27:14 java.lang.RuntimeException: Failed to cleanup global state. at org.apache.beam.runners.flink.translation.wrappers.streaming.state. FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150) at org.apache.beam.runners.flink.translation.wrappers.streaming. DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791) at org.apache.beam.runners.flink.translation.wrappers.streaming. DoFnOperator.processWatermark1(DoFnOperator.java:741) at org.apache.beam.runners.flink.translation.wrappers.streaming. DoFnOperator.processWatermark(DoFnOperator.java:713) at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask .java:167) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve .findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve .java:179) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve .inputWatermark(StatusWatermarkValve.java:101) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .processElement(StreamTaskNetworkInput.java:180) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.runtime.state.VoidNamespace at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize( VoidNamespaceSerializer.java:32) at org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils .writeNameSpace(RocksDBKeySerializationUtils.java:77) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend .getKeys(RocksDBKeyedStateBackend.java:291) at org.apache.flink.runtime.state.AbstractKeyedStateBackend .applyToAllKeys(AbstractKeyedStateBackend.java:242) at org.apache.beam.runners.flink.translation.wrappers.streaming.state. FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141) ... 17 more This is from the *task manager's* logs: 2020-11-04 08:46:31,250 WARN org.apache.flink.runtime.taskmanager.Task [] - BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) -> BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0 java.lang.RuntimeException: Failed to cleanup global state. at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150) ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791) ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741) ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713) ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265] Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.runtime.state.VoidNamespace at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77) ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291) ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141) ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] ... 17 more I think it might be a "translation" problem. One last thing I want to try before downgrading to Flink 1.10.2 is using Flink 1.11.1 as an executor to see if this is caused by mismatched minor versions. Best, Tobi