Re: Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source

2020-06-17 Thread Maximilian Michels
You are using a proprietary connector which only works on Dataflow. You
will have to use io.external.gcp.pubsub.ReadFromPubsub. PubSub support
is experimental from Python.

-Max

On 09.06.20 06:40, Pradip Thachile wrote:
> Quick update: this test code works just fine on Dataflow as well as the 
> DirectRunner. Looks like the FlinkRunner is problematic for some reason here.
> 
> On 2020/06/08 20:11:13, Pradip Thachile  wrote: 
>> Hey folks, 
>>
>> I posted this on the Flink user mailing list but didn't get any traction 
>> there (potentially since this is Beam related?). I've got a Beam/Python 
>> pipeline that works on the DirectRunner and now am trying to run this on a 
>> local dev Flink cluster. Running this yields an error out the gate around 
>> not being able to deserialize UnboundedSource (my PubSub source). I'm not 
>> sure how to debug this and would love to get some feedback on how to solve 
>> this issue. I'm also adding in a simple example that reproduces this error.
>>
>> Beam SDK: 2.19
>> Flink: 1.9.3
>> Python: 3.7
>> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', 
>> '--flink_submit_uber_jar', '--streaming']
>> (Stacktrace below)
>>
>> #!/usr/bin/env python3
>> import apache_beam as beam
>>
>> class DummyPipeline(beam.PTransform):
>> def expand(self, p):
>> (
>> p
>> | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
>> topic="")
>> | beam.Map(print)
>> )
>>
>> return p
>>
>> def main():
>> beam_options = [
>> # "--runner=DirectRunner",
>> "--runner=FlinkRunner",
>> "--flink_version=1.9",
>> "--flink_submit_uber_jar",
>> "--streaming",
>> '--save_main_session',
>> ]
>> popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
>> p = beam.Pipeline(options=popts)
>>
>> (
>> p
>> | "Do It" >> DummyPipeline()
>> )
>> job = p.run()
>> job.wait_until_finish()
>>
>> if __name__ == "__main__":
>> main()
>>
>> -Pradip
>>
>> [main] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
>> ArtifactStagingService started on localhost:55371
>> [main] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java 
>> ExpansionService started on localhost:55372
>> [main] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
>> JobService started on localhost:55364
>> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker 
>> - Invoking job 
>> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f 
>> with pipeline runner 
>> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
>> [grpc-default-executor-0] INFO 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting 
>> job invocation 
>> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f
>> [flink-runner-job-invoker] INFO 
>> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
>> Flink program.
>> [flink-runner-job-invoker] INFO 
>> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a 
>> Streaming Environment.
>> [flink-runner-job-invoker] ERROR 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
>> during job invocation 
>> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f.
>> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
>> at 
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>> at 
>> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
>> at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
>> at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
>> at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
>> at 
>> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
>> at 
>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
>> at 
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
>> at 
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
>> at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>> at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTas

Re: Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source

2020-06-09 Thread Pradip Thachile
Quick update: this test code works just fine on Dataflow as well as the 
DirectRunner. Looks like the FlinkRunner is problematic for some reason here.

On 2020/06/08 20:11:13, Pradip Thachile  wrote: 
> Hey folks, 
> 
> I posted this on the Flink user mailing list but didn't get any traction 
> there (potentially since this is Beam related?). I've got a Beam/Python 
> pipeline that works on the DirectRunner and now am trying to run this on a 
> local dev Flink cluster. Running this yields an error out the gate around not 
> being able to deserialize UnboundedSource (my PubSub source). I'm not sure 
> how to debug this and would love to get some feedback on how to solve this 
> issue. I'm also adding in a simple example that reproduces this error.
> 
> Beam SDK: 2.19
> Flink: 1.9.3
> Python: 3.7
> Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', 
> '--flink_submit_uber_jar', '--streaming']
> (Stacktrace below)
> 
> #!/usr/bin/env python3
> import apache_beam as beam
> 
> class DummyPipeline(beam.PTransform):
> def expand(self, p):
> (
> p
> | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
> topic="")
> | beam.Map(print)
> )
> 
> return p
> 
> def main():
> beam_options = [
> # "--runner=DirectRunner",
> "--runner=FlinkRunner",
> "--flink_version=1.9",
> "--flink_submit_uber_jar",
> "--streaming",
> '--save_main_session',
> ]
> popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
> p = beam.Pipeline(options=popts)
> 
> (
> p
> | "Do It" >> DummyPipeline()
> )
> job = p.run()
> job.wait_until_finish()
> 
> if __name__ == "__main__":
> main()
> 
> -Pradip
> 
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - ArtifactStagingService started on localhost:55371
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - Java ExpansionService started on localhost:55372
> [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver 
> - JobService started on localhost:55364
> [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker 
> - Invoking job 
> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f 
> with pipeline runner 
> org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
> [grpc-default-executor-0] INFO 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting 
> job invocation 
> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f
> [flink-runner-job-invoker] INFO 
> org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
> Flink program.
> [flink-runner-job-invoker] INFO 
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a 
> Streaming Environment.
> [flink-runner-job-invoker] ERROR 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error 
> during job invocation 
> BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f.
> java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at 
> org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
> at 
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
> at 
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
> at 
> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Wor

Beam/Python/Flink: Unable to deserialize UnboundedSource for PubSub source

2020-06-08 Thread Pradip Thachile
Hey folks, 

I posted this on the Flink user mailing list but didn't get any traction there 
(potentially since this is Beam related?). I've got a Beam/Python pipeline that 
works on the DirectRunner and now am trying to run this on a local dev Flink 
cluster. Running this yields an error out the gate around not being able to 
deserialize UnboundedSource (my PubSub source). I'm not sure how to debug this 
and would love to get some feedback on how to solve this issue. I'm also adding 
in a simple example that reproduces this error.

Beam SDK: 2.19
Flink: 1.9.3
Python: 3.7
Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', 
'--flink_submit_uber_jar', '--streaming']
(Stacktrace below)

#!/usr/bin/env python3
import apache_beam as beam

class DummyPipeline(beam.PTransform):
def expand(self, p):
(
p
| "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub(
topic="")
| beam.Map(print)
)

return p

def main():
beam_options = [
# "--runner=DirectRunner",
"--runner=FlinkRunner",
"--flink_version=1.9",
"--flink_submit_uber_jar",
"--streaming",
'--save_main_session',
]
popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options)
p = beam.Pipeline(options=popts)

(
p
| "Do It" >> DummyPipeline()
)
job = p.run()
job.wait_until_finish()

if __name__ == "__main__":
main()

-Pradip

[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
ArtifactStagingService started on localhost:55371
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
Java ExpansionService started on localhost:55372
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
JobService started on localhost:55364
[grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - 
Invoking job 
BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f with 
pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1
[grpc-default-executor-0] INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job 
invocation 
BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f
[flink-runner-job-invoker] INFO 
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to 
Flink program.
[flink-runner-job-invoker] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming 
Environment.
[flink-runner-job-invoker] ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during 
job invocation 
BeamApp-crossbow-0607202942-6ab5b807_f5df-b95e-4b60-ae87-d069f19d029f.
java.lang.IllegalArgumentException: unable to deserialize UnboundedSource
at 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
at 
org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126)
at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507)
at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472)
at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250)
at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120)
at 
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113)
at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84)
at 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
at org.xerial.sn