Hi all,
This error came as a bit of a surprise.

Here’s a snippet of the traceback (full traceback below).

  File "apache_beam/runners/common.py", line 751, in
apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 423, in
apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File 
"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
line 860, in split_source
AttributeError: '_PubSubSource' object has no attribute
'estimate_size' [while running 'PubSubInflow/Read/Split']


Flink is using _PubSubSource which is, as far as I can tell, a stub that
should be replaced at runtime by an actual streaming source. Is this error
a bug or a known limitation? If the latter, is there a Jira issue and any
momentum to solve this?

I’m pretty confused by this because the Apache Beam Portability Support
Matrix [1] makes it pretty clear that Flink supports streaming, and the
docs for “Built-in I/O Transforms” lists Google PubSub and BigQuery as the
only IO transforms that support streaming, so if streaming works with
Flink, PubSub should probably be the thing it works with.

I'm using beam 2.13.0 and flink 1.8.

thanks,
chad

[1]
https://docs.google.com/spreadsheets/d/1KDa_FGn1ShjomGd-UUDOhuh2q73de2tPz6BqHpzqvNI/edit#gid=0
[2] https://beam.apache.org/documentation/io/built-in/

Full traceback:

Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for
instruction 5: Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 157, in _execute
    response = task()
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 190, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 333, in do_instruction
    request.instruction_id)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 359, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 589, in process_bundle
    ].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 246, in
apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 247, in
apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 583, in
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 584, in
apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 747, in
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 753, in
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 807, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 751, in
apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 423, in
apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File 
"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
line 860, in split_source
AttributeError: '_PubSubSource' object has no attribute
'estimate_size' [while running 'PubSubInflow/Read/Split']

Reply via email to