alright, thank you. Is BEAM-10507 the jira to watch for any progress on that?
On Mon, Nov 30, 2020 at 12:55 PM Boyuan Zhang <boyu...@google.com> wrote: > Hi Steve, > > Unfortunately I don't think there is a workaround before we have the > change that Cham mentions. > > On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <sniem...@apache.org> wrote: > >> I'm trying to write an xlang transform that uses Reshuffle internally, >> and ran into this as well. Is there any workaround to this for now (other >> than removing the reshuffle), or do I just need to wait for what Chamikara >> mentioned? I noticed the same issue was mentioned in the SnowflakeIO.Read >> PR as well [1]. >> >> https://github.com/apache/beam/pull/12149#discussion_r463710165 >> >> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <boyu...@google.com> wrote: >> >>> That explains a lot. Thanks, Cham! >>> >>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> Due to the proto -> object -> proto conversion we do today, Python >>>> needs to parse the full sub-graph from Java. We have hooks for PTransforms >>>> and Coders but not for windowing operations. This limitation will go away >>>> after we have direct Beam proto to Dataflow proto conversion in place. >>>> >>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <rob...@frantil.com> >>>> wrote: >>>> >>>>> Coders should only be checked over the language boundaries. >>>>> >>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <boyu...@google.com> wrote: >>>>> >>>>>> Thanks Cham! >>>>>> >>>>>> I just realized that the *beam:window_fn:serialized_**java:v1 *is >>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But >>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window >>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that >>>>>> we also check intermediate PCollection rather than only the PCollection >>>>>> that across the language boundary? >>>>>> >>>>>> More about my Ptransform: >>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() >>>>>> -> >>>>>> output void >>>>>> >>>>>> | >>>>>> >>>>>> -> >>>>>> ParDo() >>>>>> -> output PCollection to Python SDK >>>>>> >>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> >>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) >>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1] >>>>>>> which is what is being registered by Python [2]. This seems to be the >>>>>>> immediate issue. Tracking bug for supporting custom windows is >>>>>>> https://issues.apache.org/jira/browse/BEAM-10507. >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55 >>>>>>> [2] >>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449 >>>>>>> >>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath < >>>>>>> chamik...@google.com> wrote: >>>>>>> >>>>>>>> Pipelines that use external WindowingStrategies might be failing >>>>>>>> during proto -> object -> proto conversion we do today. This limitation >>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are >>>>>>>> working on this now. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cham >>>>>>>> >>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <boyu...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks, Robert! I want to add more details on my External >>>>>>>>> PTransform: >>>>>>>>> >>>>>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void >>>>>>>>> >>>>>>>>> | >>>>>>>>> >>>>>>>>> -> ParDo() -> output PCollection to Python SDK >>>>>>>>> The full stacktrace: >>>>>>>>> >>>>>>>>> INFO:root:Using Java SDK harness container image >>>>>>>>> dataflow-dev.gcr.io/boyuanz/java:latest >>>>>>>>> Starting expansion service at localhost:53569 >>>>>>>>> Aug 13, 2020 7:42:11 PM >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService >>>>>>>>> loadRegisteredTransforms >>>>>>>>> INFO: Registering external transforms: >>>>>>>>> [beam:external:java:kafka:read:v1, beam:external:java:kafka:write:v1, >>>>>>>>> beam:external:java:jdbc:read_rows:v1, >>>>>>>>> beam:external:java:jdbc:write:v1, >>>>>>>>> beam:external:java:generate_sequence:v1] >>>>>>>>> beam:external:java:kafka:read:v1: >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e >>>>>>>>> beam:external:java:kafka:write:v1: >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21 >>>>>>>>> beam:external:java:jdbc:read_rows:v1: >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712 >>>>>>>>> beam:external:java:jdbc:write:v1: >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938 >>>>>>>>> beam:external:java:generate_sequence:v1: >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311 >>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone >>>>>>>>> is deprecated. Please use --worker_zone instead. >>>>>>>>> Aug 13, 2020 7:42:12 PM >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>>> INFO: Expanding 'WriteToKafka' with URN >>>>>>>>> 'beam:external:java:kafka:write:v1' >>>>>>>>> Aug 13, 2020 7:42:14 PM >>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN >>>>>>>>> 'beam:external:java:kafka:read:v1' >>>>>>>>> >>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image has >>>>>>>>> Python 3.6 interpreter. >>>>>>>>> INFO:root:Using Python SDK docker image: >>>>>>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available >>>>>>>>> at local, we will try to pull from hub.docker.com >>>>>>>>> Traceback (most recent call last): >>>>>>>>> File "<embedded module '_launcher'>", line 165, in >>>>>>>>> run_filename_as_main >>>>>>>>> File "<embedded module '_launcher'>", line 39, in _run_code_in_main >>>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line >>>>>>>>> 87, in <module> >>>>>>>>> run() >>>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line >>>>>>>>> 82, in run >>>>>>>>> test_method(beam.Pipeline(options=pipeline_options)) >>>>>>>>> File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, >>>>>>>>> in run_xlang_kafkaio >>>>>>>>> pipeline.run(False) >>>>>>>>> File "apache_beam/pipeline.py", line 534, in run >>>>>>>>> return self.runner.run_pipeline(self, self._options) >>>>>>>>> File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, >>>>>>>>> in run_pipeline >>>>>>>>> allow_proto_holders=True) >>>>>>>>> File "apache_beam/pipeline.py", line 879, in from_runner_api >>>>>>>>> p.transforms_stack = >>>>>>>>> [context.transforms.get_by_id(root_transform_id)] >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pipeline.py", line 1272, in from_runner_api >>>>>>>>> id in proto.outputs.items() >>>>>>>>> File "apache_beam/pipeline.py", line 1272, in <dictcomp> >>>>>>>>> id in proto.outputs.items() >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/pvalue.py", line 217, in from_runner_api >>>>>>>>> proto.windowing_strategy_id), >>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>> get_by_id >>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>> File "apache_beam/transforms/core.py", line 2597, in from_runner_api >>>>>>>>> windowfn=WindowFn.from_runner_api(proto.window_fn, context), >>>>>>>>> File "apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>>> parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw < >>>>>>>>> rober...@google.com> wrote: >>>>>>>>> >>>>>>>>>> You should be able to use a WindowInto with any of the common >>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an >>>>>>>>>> external transform. You should also be able to window into an >>>>>>>>>> arbitrary WindowFn as long as it produces standards window types, >>>>>>>>>> but >>>>>>>>>> if there's a bug here you could possibly work around it by >>>>>>>>>> windowing >>>>>>>>>> into a more standard windowing fn before returning. >>>>>>>>>> >>>>>>>>>> What is the full traceback? >>>>>>>>>> >>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <boyu...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> > >>>>>>>>>> > Hi team, >>>>>>>>>> > >>>>>>>>>> > I'm trying to create an External transform in Java SDK, which >>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I use >>>>>>>>>> this >>>>>>>>>> transform in Python SDK, I get an pipeline construction error: >>>>>>>>>> > >>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>>>> > parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>>>> > >>>>>>>>>> > Is it expected that I cannot use a Window.into when building >>>>>>>>>> External Ptransform? Or do I miss anything here? >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > Thanks for your help! >>>>>>>>>> >>>>>>>>>