Hi Jan, thank you for your response, I created a JIRA ticket https://issues.apache.org/jira/browse/BEAM-11191
On Wed, Nov 4, 2020 at 2:17 PM Jan Lukavský <[email protected]> wrote: > Hi Tobias, > > this looks like a bug, the clearGlobalState method has been introduced in > 2.25.0, and it (seems to) might have issues related to rocksdb, can you > file a Jira for that, please? > > Thanks, > > Jan > On 11/4/20 9:50 AM, Kaymak, Tobias wrote: > > When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 Docker > image, > the following exception is visible for the failing job on the *job > manager*: > > 2020-11-04 09:27:14 > java.lang.RuntimeException: Failed to cleanup global state. > at org.apache.beam.runners.flink.translation.wrappers.streaming.state. > FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150) > at org.apache.beam.runners.flink.translation.wrappers.streaming. > DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791) > at org.apache.beam.runners.flink.translation.wrappers.streaming. > DoFnOperator.processWatermark1(DoFnOperator.java:741) > at org.apache.beam.runners.flink.translation.wrappers.streaming. > DoFnOperator.processWatermark(DoFnOperator.java:713) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark( > OneInputStreamTask.java:167) > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels( > StatusWatermarkValve.java:179) > at org.apache.flink.streaming.runtime.streamstatus. > StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .processElement(StreamTaskNetworkInput.java:180) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .emitNext(StreamTaskNetworkInput.java:153) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:67) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:351) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxStep(MailboxProcessor.java:191) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:181) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:566) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast > to org.apache.flink.runtime.state.VoidNamespace > at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize( > VoidNamespaceSerializer.java:32) > at org.apache.flink.contrib.streaming.state. > RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils > .java:77) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend > .getKeys(RocksDBKeyedStateBackend.java:291) > at org.apache.flink.runtime.state.AbstractKeyedStateBackend > .applyToAllKeys(AbstractKeyedStateBackend.java:242) > at org.apache.beam.runners.flink.translation.wrappers.streaming.state. > FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141) > ... 17 more > This is from the *task manager's* logs: > 2020-11-04 08:46:31,250 WARN org.apache.flink.runtime.taskmanager.Task > [] - > BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable > ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) -> > BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) > -> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0 > > > > java.lang.RuntimeException: Failed to cleanup global state. > > > at > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150) > ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791) > ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741) > ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713) > ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [flink-dist_2.11-1.11.2.jar:1.11.2] > > at > java.lang.Thread.run(Thread.java:748) [?:1.8.0_265] > > > Caused by: > java.lang.ClassCastException: java.lang.String cannot be cast to > org.apache.flink.runtime.state.VoidNamespace > > at > org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77) > ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291) > ~[flink-statebackend-rocksdb_2.11-1.11.2.jar:1.11.2] > > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > > at > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141) > ~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] > ... 17 more > > > I think it might be a "translation" problem. One last thing I > want to try before downgrading to Flink 1.10.2 is using Flink 1.11.1 as an > executor to see if this is caused by mismatched minor versions. Best, Tobi > >
