Re: Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source
You are using a proprietary connector which only works on Dataflow. You will have to use io.external.gcp.pubsub.ReadFromPubsub. PubSub support is experimental from Python. -Max On 09.06.20 06:40, Pradip Thachile wrote: > Quick update: this test code works just fine on Dataflow as well as the > DirectRunner. Looks like the FlinkRunner is problematic for some reason here. > > On 2020/06/08 20:11:13, Pradip Thachile wrote: >> Hey folks, >> >> I posted this on the Flink user mailing list but didn't get any traction >> there (potentially since this is Beam related?). I've got a Beam/Python >> pipeline that works on the DirectRunner and now am trying to run this on a >> local dev Flink cluster. Running this yields an error out the gate around >> not being able to deserialize UnboundedSource (my PubSub source). I'm not >> sure how to debug this and would love to get some feedback on how to solve >> this issue. I'm also adding in a simple example that reproduces this error. >> >> Beam SDK: 2.19 >> Flink: 1.9.3 >> Python: 3.7 >> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', >> '--flink_submit_uber_jar', '--streaming'] >> (Stacktrace below) >> >> #!/usr/bin/env python3 >> import apache_beam as beam >> >> class DummyPipeline(beam.PTransform): >> def expand(self, p): >> ( >> p >> | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub( >> topic="") >> | beam.Map(print) >> ) >> >> return p >> >> def main(): >> beam_options = [ >> # "--runner=DirectRunner", >> "--runner=FlinkRunner", >> "--flink_version=1.9", >> "--flink_submit_uber_jar", >> "--streaming", >> '--save_main_session', >> ] >> popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options) >> p = beam.Pipeline(options=popts) >> >> ( >> p >> | "Do It" >> DummyPipeline() >> ) >> job = p.run() >> job.wait_until_finish() >> >> if __name__ == "__main__": >> main() >> >> -Pradip >> >> [main] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - >> ArtifactStagingService started on localhost:55371 >> [main] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java >> ExpansionService started on localhost:55372 >> [main] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - >> JobService started on localhost:55364 >> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker >> - Invoking job >> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f >> with pipeline runner >> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1 >> [grpc-default-executor-0] INFO >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting >> job invocation >> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f >> [flink-runner-job-invoker] INFO >> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to >> Flink program. >> [flink-runner-job-invoker] INFO >> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a >> Streaming Environment. >> [flink-runner-job-invoker] ERROR >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error >> during job invocation >> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f. >> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) >> at >> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) >> at >> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) >> at >> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) >> at >> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTas
Re: Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source
Quick update: this test code works just fine on Dataflow as well as the DirectRunner. Looks like the FlinkRunner is problematic for some reason here. On 2020/06/08 20:11:13, Pradip Thachile wrote: > Hey folks, > > I posted this on the Flink user mailing list but didn't get any traction > there (potentially since this is Beam related?). I've got a Beam/Python > pipeline that works on the DirectRunner and now am trying to run this on a > local dev Flink cluster. Running this yields an error out the gate around not > being able to deserialize UnboundedSource (my PubSub source). I'm not sure > how to debug this and would love to get some feedback on how to solve this > issue. I'm also adding in a simple example that reproduces this error. > > Beam SDK: 2.19 > Flink: 1.9.3 > Python: 3.7 > Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', > '--flink_submit_uber_jar', '--streaming'] > (Stacktrace below) > > #!/usr/bin/env python3 > import apache_beam as beam > > class DummyPipeline(beam.PTransform): > def expand(self, p): > ( > p > | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub( > topic="") > | beam.Map(print) > ) > > return p > > def main(): > beam_options = [ > # "--runner=DirectRunner", > "--runner=FlinkRunner", > "--flink_version=1.9", > "--flink_submit_uber_jar", > "--streaming", > '--save_main_session', > ] > popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options) > p = beam.Pipeline(options=popts) > > ( > p > | "Do It" >> DummyPipeline() > ) > job = p.run() > job.wait_until_finish() > > if __name__ == "__main__": > main() > > -Pradip > > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - ArtifactStagingService started on localhost:55371 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - Java ExpansionService started on localhost:55372 > [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver > - JobService started on localhost:55364 > [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker > - Invoking job > BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f > with pipeline runner > org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1 > [grpc-default-executor-0] INFO > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting > job invocation > BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f > [flink-runner-job-invoker] INFO > org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to > Flink program. > [flink-runner-job-invoker] INFO > org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a > Streaming Environment. > [flink-runner-job-invoker] ERROR > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error > during job invocation > BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f. > java.lang.IllegalArgumentException: unable to deserialize UnboundedSource > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) > at > org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) > at > org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Wor
Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source
Hey folks, I posted this on the Flink user mailing list but didn't get any traction there (potentially since this is Beam related?). I've got a Beam/Python pipeline that works on the DirectRunner and now am trying to run this on a local dev Flink cluster. Running this yields an error out the gate around not being able to deserialize UnboundedSource (my PubSub source). I'm not sure how to debug this and would love to get some feedback on how to solve this issue. I'm also adding in a simple example that reproduces this error. Beam SDK: 2.19 Flink: 1.9.3 Python: 3.7 Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', '--flink_submit_uber_jar', '--streaming'] (Stacktrace below) #!/usr/bin/env python3 import apache_beam as beam class DummyPipeline(beam.PTransform): def expand(self, p): ( p | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub( topic="") | beam.Map(print) ) return p def main(): beam_options = [ # "--runner=DirectRunner", "--runner=FlinkRunner", "--flink_version=1.9", "--flink_submit_uber_jar", "--streaming", '--save_main_session', ] popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options) p = beam.Pipeline(options=popts) ( p | "Do It" >> DummyPipeline() ) job = p.run() job.wait_until_finish() if __name__ == "__main__": main() -Pradip [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:55371 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:55372 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:55364 [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1 [grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming Environment. [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f. java.lang.IllegalArgumentException: unable to deserialize UnboundedSource at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) at org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at org.xerial.sn