Coders should only be checked over the language boundaries.

On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <boyu...@google.com> wrote:

> Thanks Cham!
>
>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
> introduced by Java *Reshuffle.viaRandomKey()*. But
> *Reshuffle.viaRandomKey()* does rewindowed into original window
> strategy(which is *GlobalWindows *in my case). Is it expected that we
> also check intermediate PCollection rather than only the PCollection that
> across the language boundary?
>
> More about my Ptransform:
> MyExternalPTransform  -- expand to --  ParDo() -> Reshuffle.viaRandomKey()
> -> ParDo() -> WindowInto(FixWindow) -> ParDo() -> output void
>
>                                                            |
>
>                                                             -> ParDo() ->
> output PCollection to Python SDK
>
> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) for
>> the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is
>> what is being registered by Python [2]. This seems to be the immediate
>> issue. Tracking bug for supporting custom windows is
>> https://issues.apache.org/jira/browse/BEAM-10507.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>> [2]
>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>
>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Pipelines that use external WindowingStrategies might be failing during
>>> proto -> object -> proto conversion we do today. This limitation will go
>>> away once Dataflow directly starts reading Beam protos. We are working on
>>> this now.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <boyu...@google.com> wrote:
>>>
>>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>>
>>>> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
>>>> -> ParDo() -> output void
>>>>                                                                     |
>>>>                                                                     ->
>>>> ParDo() -> output PCollection to Python SDK
>>>> The full stacktrace:
>>>>
>>>> INFO:root:Using Java SDK harness container image 
>>>> dataflow-dev.gcr.io/boyuanz/java:latest
>>>> Starting expansion service at localhost:53569
>>>> Aug 13, 2020 7:42:11 PM 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService 
>>>> loadRegisteredTransforms
>>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, 
>>>> beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, 
>>>> beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>>>    beam:external:java:kafka:read:v1: 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e
>>>>    beam:external:java:kafka:write:v1: 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21
>>>>    beam:external:java:jdbc:read_rows:v1: 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712
>>>>    beam:external:java:jdbc:write:v1: 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938
>>>>    beam:external:java:generate_sequence:v1: 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311
>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is 
>>>> deprecated. Please use --worker_zone instead.
>>>> Aug 13, 2020 7:42:12 PM 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>>> Aug 13, 2020 7:42:14 PM 
>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>>
>>>> WARNING:root:Make sure that locally built Python SDK docker image has 
>>>> Python 3.6 interpreter.
>>>> INFO:root:Using Python SDK docker image: 
>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at 
>>>> local, we will try to pull from hub.docker.com
>>>> Traceback (most recent call last):
>>>>   File "<embedded module '_launcher'>", line 165, in run_filename_as_main
>>>>   File "<embedded module '_launcher'>", line 39, in _run_code_in_main
>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, 
>>>> in <module>
>>>>     run()
>>>>   File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, 
>>>> in run
>>>>     test_method(beam.Pipeline(options=pipeline_options))
>>>>   File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in 
>>>> run_xlang_kafkaio
>>>>     pipeline.run(False)
>>>>   File "apache_beam/pipeline.py", line 534, in run
>>>>     return self.runner.run_pipeline(self, self._options)
>>>>   File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in 
>>>> run_pipeline
>>>>     allow_proto_holders=True)
>>>>   File "apache_beam/pipeline.py", line 879, in from_runner_api
>>>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1266, in from_runner_api
>>>>     part = context.transforms.get_by_id(transform_id)
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pipeline.py", line 1272, in from_runner_api
>>>>     id in proto.outputs.items()
>>>>   File "apache_beam/pipeline.py", line 1272, in <dictcomp>
>>>>     id in proto.outputs.items()
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/pvalue.py", line 217, in from_runner_api
>>>>     proto.windowing_strategy_id),
>>>>   File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
>>>>     self._id_to_proto[id], self._pipeline_context)
>>>>   File "apache_beam/transforms/core.py", line 2597, in from_runner_api
>>>>     windowfn=WindowFn.from_runner_api(proto.window_fn, context),
>>>>   File "apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>> KeyError: 'beam:window_fn:serialized_java:v1'
>>>>
>>>>
>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> You should be able to use a WindowInto with any of the common
>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an
>>>>> external transform. You should also be able to window into an
>>>>> arbitrary WindowFn as long as it produces standards window types, but
>>>>> if there's a bug here you could possibly work around it by windowing
>>>>> into a more standard windowing fn before returning.
>>>>>
>>>>> What is the full traceback?
>>>>>
>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <boyu...@google.com>
>>>>> wrote:
>>>>> >
>>>>> > Hi team,
>>>>> >
>>>>> > I'm trying to create an External transform in Java SDK, which
>>>>> expands into several ParDo and a Window.into(FixWindow). When I use this
>>>>> transform in Python SDK, I get an pipeline construction error:
>>>>> >
>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api
>>>>> >     parameter_type, constructor = cls._known_urns[fn_proto.urn]
>>>>> > KeyError: 'beam:window_fn:serialized_java:v1'
>>>>> >
>>>>> > Is it expected that I cannot use a Window.into when building
>>>>> External Ptransform? Or do I miss anything here?
>>>>> >
>>>>> >
>>>>> > Thanks for your help!
>>>>>
>>>>

Reply via email to