Hi everyone, Last few days I have been trying to run a streaming pipeline (code on Github <https://github.com/matthiasa4/beam-demo>) on a Flink Runner.
I am running a Flink cluster locally (v1.5.6 <https://flink.apache.org/downloads.html>) I have built the SDK Harness Container: *./gradlew :beam-sdks-python-container:docker* and started the JobServer: *./gradlew :beam-runners-flink_2.11-job-server:runShadow -PflinkMasterUrl=localhost:8081.* I run my pipeline with: *env/bin/python streaming_pipeline.py --runner=PortableRunner --job_endpoint=localhost:8099 --output xxx --input_subscription xxx --output_subscription xxx* All this is running inside a Ubuntu (Bionic) in a Virtualbox. The job submits fine, but unfortunately fails after a few seconds with the error attached. Anything I am missing or doing wrong? Many thanks. Best, Matthias
TimerException{java.lang.RuntimeException: Failed to finish remote bundle} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Failed to finish remote bundle at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:624) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87) at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:679) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:673) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:378) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330) ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 5: Traceback (most recent call last): File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute response = task() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction request.instruction_id) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle bundle_processor.process_bundle(instruction_id) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 489, in process_bundle ].process_encoded(data.data) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 126, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 182, in apache_beam.runners.worker.operations.Operation.output def output(self, windowed_value, output_index=0): File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 678, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise_with_traceback(new_exn) File "apache_beam/runners/common.py", line 682, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 419, in apache_beam.runners.common.SimpleInvoker.invoke_process windowed_value, self.process_method(windowed_value.value)) File "/home/matthias/Coding/GDE-demo-beam/env/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 859, in split_source AttributeError: '_PubSubSource' object has no attribute 'estimate_size' [while running 'Read from PubSub/Read/Split'] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:621) ... 13 more Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 5: Traceback (most recent call last): File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute response = task() File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction request.instruction_id) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle bundle_processor.process_bundle(instruction_id) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 489, in process_bundle ].process_encoded(data.data) File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 126, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 182, in apache_beam.runners.worker.operations.Operation.output def output(self, windowed_value, output_index=0): File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 678, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise_with_traceback(new_exn) File "apache_beam/runners/common.py", line 682, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 419, in apache_beam.runners.common.SimpleInvoker.invoke_process windowed_value, self.process_method(windowed_value.value)) File "/home/matthias/Coding/GDE-demo-beam/env/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 859, in split_source AttributeError: '_PubSubSource' object has no attribute 'estimate_size' [while running 'Read from PubSub/Read/Split'] at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ... 3 more