Cross-language support for PubSub is not yet implemented but it can be done 
similarly to ReadFromKafka. There are still some limitations regarding the 
coders, i.e. only coders can be used which are available in both the Java and 
the Python SDK (standard coders).

As of now the user experience is a bit rough, but we will be improving that 
very soon. Happy to help out if you want to contribute a cross-language 
ReadFromPubSub. 

-Max

On July 13, 2019 6:05:26 PM GMT+02:00, Chad Dombrova <chad...@gmail.com> wrote:
>Thanks for the response, Max.  I saw that KafkaIO is now supported in
>python via an external transform (
>https://jira.apache.org/jira/browse/BEAM-7029), but I thought I read
>somewhere that it was only supported in batch mode (though I don't see
>that
>mentioned in the ticket or the PR, so not sure where I got that
>impression).
>
>Do I have to modify the source along the lines of the KafkaIO PRs to
>work
>with PubSubIO, or is it already supported via some flag?
>
>-chad
>
>
>On Sat, Jul 13, 2019 at 8:43 AM Maximilian Michels <m...@apache.org>
>wrote:
>
>> Hi Chad,
>>
>> This stub will only be replaced by the Dataflow service. It's an
>artifact
>> of the pre-portability era.
>>
>> That said, we now have the option to replace ReadFromPubSub with an
>> external transform which would utilize Java's PubSubIO via the new
>> cross-language feature.
>>
>> Thanks,
>> Max
>>
>> On 12.07.19 19:32, Chad Dombrova wrote:
>> > Hi all,
>> > This error came as a bit of a surprise.
>> >
>> > Here’s a snippet of the traceback (full traceback below).
>> >
>> > |File "apache_beam/runners/common.py", line 751, in
>> > apache_beam.runners.common.DoFnRunner.process return
>> > self.do_fn_invoker.invoke_process(windowed_value) File
>> > "apache_beam/runners/common.py", line 423, in
>> > apache_beam.runners.common.SimpleInvoker.invoke_process
>windowed_value,
>> > self.process_method(windowed_value.value)) File
>> >
>>
>"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>> > line 860, in split_source AttributeError: '_PubSubSource' object
>has no
>> > attribute 'estimate_size' [while running 'PubSubInflow/Read/Split']
>|
>> >
>> > Flink is using _PubSubSource which is, as far as I can tell, a stub
>that
>> > should be replaced at runtime by an actual streaming source. Is
>this
>> > error a bug or a known limitation? If the latter, is there a Jira
>issue
>> > and any momentum to solve this?
>> >
>> > I’m pretty confused by this because the Apache Beam Portability
>Support
>> > Matrix [1] makes it pretty clear that Flink supports streaming, and
>the
>> > docs for “Built-in I/O Transforms” lists Google PubSub and BigQuery
>as
>> > the only IO transforms that support streaming, so if streaming
>works
>> > with Flink, PubSub should probably be the thing it works with.
>> >
>> > I'm using beam 2.13.0 and flink 1.8.
>> >
>> > thanks,
>> > chad
>> >
>> > [1]
>> >
>>
>https://docs.google.com/spreadsheets/d/1KDa_FGn1ShjomGd-UUDOhuh2q73de2tPz6BqHpzqvNI/edit#gid=0
>> > [2] https://beam.apache.org/documentation/io/built-in/
>> >
>> > Full traceback:
>> >
>> > |Caused by: java.util.concurrent.ExecutionException:
>> > java.lang.RuntimeException: Error received from SDK harness for
>> > instruction 5: Traceback (most recent call last): File
>> >
>>
>"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> > line 157, in _execute response = task() File
>> >
>>
>"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> > line 190, in <lambda> self._execute(lambda:
>worker.do_instruction(work),
>> > work) File
>> >
>>
>"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> > line 333, in do_instruction request.instruction_id) File
>> >
>>
>"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> > line 359, in process_bundle
>> > bundle_processor.process_bundle(instruction_id)) File
>> >
>>
>"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> > line 589, in process_bundle ].process_encoded(data.data) File
>> >
>>
>"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> > line 143, in process_encoded self.output(decoded_value) File
>> > "apache_beam/runners/worker/operations.py", line 246, in
>> > apache_beam.runners.worker.operations.Operation.output def
>output(self,
>> > windowed_value, output_index=0): File
>> > "apache_beam/runners/worker/operations.py", line 247, in
>> > apache_beam.runners.worker.operations.Operation.output
>> > cython.cast(Receiver,
>> > self.receivers[output_index]).receive(windowed_value) File
>> > "apache_beam/runners/worker/operations.py", line 143, in
>> > apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>> > self.consumer.process(windowed_value) File
>> > "apache_beam/runners/worker/operations.py", line 583, in
>> > apache_beam.runners.worker.operations.DoOperation.process with
>> > self.scoped_process_state: File
>> > "apache_beam/runners/worker/operations.py", line 584, in
>> > apache_beam.runners.worker.operations.DoOperation.process
>> > delayed_application = self.dofn_receiver.receive(o) File
>> > "apache_beam/runners/common.py", line 747, in
>> > apache_beam.runners.common.DoFnRunner.receive
>> > self.process(windowed_value) File "apache_beam/runners/common.py",
>line
>> > 753, in apache_beam.runners.common.DoFnRunner.process
>> > self._reraise_augmented(exn) File "apache_beam/runners/common.py",
>line
>> > 807, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>> > raise_with_traceback(new_exn) File "apache_beam/runners/common.py",
>line
>> > 751, in apache_beam.runners.common.DoFnRunner.process return
>> > self.do_fn_invoker.invoke_process(windowed_value) File
>> > "apache_beam/runners/common.py", line 423, in
>> > apache_beam.runners.common.SimpleInvoker.invoke_process
>windowed_value,
>> > self.process_method(windowed_value.value)) File
>> >
>>
>"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>> > line 860, in split_source AttributeError: '_PubSubSource' object
>has no
>> > attribute 'estimate_size' [while running 'PubSubInflow/Read/Split']
>|
>> >
>>
>>

Reply via email to