Hi Brian,

Thanks so much for your quick response!

I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
exact same error. Here is the full stacktrace:

(metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
metadata_persistor --project XXXXX --environments XXXXX --window_size 1
--input_subscription
projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
--runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
--staging_location gs://XXXXXXXXXXX/item-metadata/staging
/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
BeamDeprecationWarning: options is deprecated since First stable release.
References to <pipeline>.options will not be supported
  experiments = p.options.view_as(DebugOptions).experiments or []
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
Traceback (most recent call last):
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
line 8, in <module>
    sys.exit(run())
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
line 246, in run
    batch_size=500,
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 524, in __exit__
    self.run().wait_until_finish()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 497, in run
    self._options).run(False)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 510, in run
    return self.runner.run_pipeline(self, self._options)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 484, in run_pipeline
    allow_proto_holders=True)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 858, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1238, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1244, in from_runner_api
    id in proto.outputs.items()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 1244, in <dictcomp>
    id in proto.outputs.items()
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
line 214, in from_runner_api
    element_type=context.element_type_from_coder_id(proto.coder_id),
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
line 227, in element_type_from_coder_id
    self.coders[coder_id].to_type_hint())
  File
"/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
line 221, in to_type_hint
    raise NotImplementedError('BEAM-2717')
NotImplementedError: BEAM-2717

When I was debugging and commenting out the different steps, I noticed the
location in my code that supposedly throws the error changes. Here it
complains about the WriteToBigQuery step (batch_size=500) but if I comment
out that step it just moves on to the one above. It appears it's
consistently thrown on the last run step (don't know if that's helpful,
just thought I'd mention it).

After adding beam.typehints.disable_type_annotations() it still throws the
same error.

Another thing I forgot to mention in my first email is that I registered a
ProtoCoder as suggested at the bottom of this page (
https://beam.apache.org/documentation/sdks/python-type-safety/) as:

beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)

Thanks again, really appreciate your help!
Lien

On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bhule...@google.com> wrote:

> Hi Lien,
>
> > First time writing the email list, so please tell me if I'm doing this
> all wrong.
> Not at all! This is exactly the kind of question this list is for
>
> I have a couple of questions that may help us debug:
> - Can you share the full stacktrace?
> - What version of Beam are you using?
>
> There were some changes to the way we use typehints in the most recent
> Beam release (2.21) that might be causing this [1]. If you're using 2.21
> could you try reverting to the old behavior (call
> `apache_beam.typehints.disable_type_annotations()` before constructing the
> pipeline) to see if that helps?
>
> Thanks,
> Brian
>
> [1] https://beam.apache.org/blog/python-typing/
>
> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <lien.michi...@froomle.com>
> wrote:
>
>>
>> Hi everyone,
>>
>> First time writing the email list, so please tell me if I'm doing this
>> all wrong.
>>
>> I'm building a streaming pipeline to be run on the DataflowRunner that
>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>
>> I can get the pipeline started fine with the DirectRunner, but as soon as
>> I try to deploy to DataFlow it throws the following error:
>>
>> File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>> 221, in to_type_hint
>>     raise NotImplementedError('BEAM-2717')
>>
>> I've tried narrowing down what exactly could be causing the issue and it
>> appears to be caused by the second step in my pipeline, which transforms
>> the bytes read from PubSub to my own internal Proto format:
>>
>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>> action_wrapper = ActionWrapper()
>> action_wrapper.ParseFromString(x)
>>
>> return action_wrapper
>>
>> which is applied as a Map step.
>>
>> I've added typehints to all downstream steps as follows:
>> def partition_by_environment(
>> x: ActionWrapper, num_partitions: int, environments: List[str]
>> ) -> int:
>>
>> I'd really appreciate it if anyone could let me know what I'm doing
>> wrong, or what exactly is the issue this error is referring to. I read the
>> JIRA ticket, but did not understand how it is related to my issue here.
>>
>> Thanks!
>> Kind regards,
>> Lien
>>
>

Reply via email to