Pipelines that use external WindowingStrategies might be failing during
proto -> object -> proto conversion we do today. This limitation will go
away once Dataflow directly starts reading Beam protos. We are working on
this now.

Thanks,
Cham

On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <boyu...@google.com> wrote:

> Thanks, Robert! I want to add more details on my External PTransform:
>
> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow) ->
> ParDo() -> output void
>                                                                     |
>                                                                     ->
> ParDo() -> output PCollection to Python SDK
> The full stacktrace:
>
> INFO:root:Using Java SDK harness container image 
> dataflow-dev.gcr.io/boyuanz/java:latest
> Starting expansion service at localhost:53569
> Aug 13, 2020 7:42:11 PM 
> org.apache.beam.sdk.expansion.service.ExpansionService 
> loadRegisteredTransforms
> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, 
> beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, 
> beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>       beam:external:java:kafka:read:v1: 
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>       beam:external:java:kafka:write:v1: 
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>       beam:external:java:jdbc:read_rows:v1: 
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>       beam:external:java:jdbc:write:v1: 
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>       beam:external:java:generate_sequence:v1: 
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is 
> deprecated. Please use --worker_zone instead.
> Aug 13, 2020 7:42:12 PM 
> org.apache.beam.sdk.expansion.service.ExpansionService expand
> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
> Aug 13, 2020 7:42:14 PM 
> org.apache.beam.sdk.expansion.service.ExpansionService expand
> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>
> WARNING:root:Make sure that locally built Python SDK docker image has Python 
> 3.6 interpreter.
> INFO:root:Using Python SDK docker image: 
> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at local, 
> we will try to pull from hub.docker.com
> Traceback (most recent call last):
>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, in 
> <module>
>     run()
>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, in 
> run
>     test_method(beam.Pipeline(options=pipeline_options))
>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in 
> run_xlang_kafkaio
>     pipeline.run(False)
>   File "apache_beam/pipeline.py", line 534, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in 
> run_pipeline
>     allow_proto_holders=True)
>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>     id in proto.outputs.items()
>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>     id in proto.outputs.items()
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>     proto.windowing_strategy_id),
>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
> KeyError: 'beam:window_fn:serialized_java:v1'
>
>
> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> You should be able to use a WindowInto with any of the common
>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>> external transform. You should also be able to window into an
>> arbitrary WindowFn as long as it produces standards window types, but
>> if there's a bug here you could possibly work around it by windowing
>> into a more standard windowing fn before returning.
>>
>> What is the full traceback?
>>
>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <boyu...@google.com> wrote:
>> >
>> > Hi team,
>> >
>> > I'm trying to create an External transform in Java SDK, which expands
>> into several ParDo and a Window.into(FixWindow). When I use this transform
>> in Python SDK, I get an pipeline construction error:
>> >
>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>> > KeyError: 'beam:window_fn:serialized_java:v1'
>> >
>> > Is it expected that I cannot use a Window.into when building External
>> Ptransform? Or do I miss anything here?
>> >
>> >
>> > Thanks for your help!
>>
>

Reply via email to