Actually Reshuffle uses a custom non-merging Window (IdentityWindowFn) [1]. Dataflow Runner v2 (which is required for multi-language pipelines on Dataflow) currently does not support custom windowing functions I believe. So getting Reshuffle (and by extension connectors such as Snowflake) for Dataflow Python as a cross-language transform will require support for custom Window functions on Dataflow Runner v2 in addition to https://issues.apache.org/jira/browse/BEAM-11360 unfortunately. I believe we are working on the prior but I'm not sure about the exact ETA. Latter should be done by the end of the quarter.
+Harsh Vardhan <anan...@google.com> +Robert Bradshaw <rober...@google.com> Thanks, Cham [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L79 On Mon, Nov 30, 2020 at 10:35 AM Chamikara Jayalath <chamik...@google.com> wrote: > Please follow https://issues.apache.org/jira/browse/BEAM-11360 instead. > > Thanks, > Cham > > On Mon, Nov 30, 2020 at 10:26 AM Steve Niemitz <sniem...@apache.org> > wrote: > >> alright, thank you. Is BEAM-10507 the jira to watch for any progress on >> that? >> >> On Mon, Nov 30, 2020 at 12:55 PM Boyuan Zhang <boyu...@google.com> wrote: >> >>> Hi Steve, >>> >>> Unfortunately I don't think there is a workaround before we have the >>> change that Cham mentions. >>> >>> On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz <sniem...@apache.org> >>> wrote: >>> >>>> I'm trying to write an xlang transform that uses Reshuffle internally, >>>> and ran into this as well. Is there any workaround to this for now (other >>>> than removing the reshuffle), or do I just need to wait for what Chamikara >>>> mentioned? I noticed the same issue was mentioned in the SnowflakeIO.Read >>>> PR as well [1]. >>>> >>>> https://github.com/apache/beam/pull/12149#discussion_r463710165 >>>> >>>> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang <boyu...@google.com> >>>> wrote: >>>> >>>>> That explains a lot. Thanks, Cham! >>>>> >>>>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> Due to the proto -> object -> proto conversion we do today, Python >>>>>> needs to parse the full sub-graph from Java. We have hooks for >>>>>> PTransforms >>>>>> and Coders but not for windowing operations. This limitation will go away >>>>>> after we have direct Beam proto to Dataflow proto conversion in place. >>>>>> >>>>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke <rob...@frantil.com> >>>>>> wrote: >>>>>> >>>>>>> Coders should only be checked over the language boundaries. >>>>>>> >>>>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <boyu...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Cham! >>>>>>>> >>>>>>>> I just realized that the *beam:window_fn:serialized_**java:v1 *is >>>>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But >>>>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window >>>>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that >>>>>>>> we also check intermediate PCollection rather than only the PCollection >>>>>>>> that across the language boundary? >>>>>>>> >>>>>>>> More about my Ptransform: >>>>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> >>>>>>>> ParDo() -> >>>>>>>> output void >>>>>>>> >>>>>>>> | >>>>>>>> >>>>>>>> -> >>>>>>>> ParDo() -> output PCollection to Python SDK >>>>>>>> >>>>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1) >>>>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1] >>>>>>>>> which is what is being registered by Python [2]. This seems to be the >>>>>>>>> immediate issue. Tracking bug for supporting custom windows is >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-10507. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55 >>>>>>>>> [2] >>>>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449 >>>>>>>>> >>>>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath < >>>>>>>>> chamik...@google.com> wrote: >>>>>>>>> >>>>>>>>>> Pipelines that use external WindowingStrategies might be failing >>>>>>>>>> during proto -> object -> proto conversion we do today. This >>>>>>>>>> limitation >>>>>>>>>> will go away once Dataflow directly starts reading Beam protos. We >>>>>>>>>> are >>>>>>>>>> working on this now. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Cham >>>>>>>>>> >>>>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <boyu...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks, Robert! I want to add more details on my External >>>>>>>>>>> PTransform: >>>>>>>>>>> >>>>>>>>>>> MyExternalPTransform -- expand to -- ParDo() -> >>>>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void >>>>>>>>>>> >>>>>>>>>>> | >>>>>>>>>>> >>>>>>>>>>> -> ParDo() -> output PCollection to Python SDK >>>>>>>>>>> The full stacktrace: >>>>>>>>>>> >>>>>>>>>>> INFO:root:Using Java SDK harness container image >>>>>>>>>>> dataflow-dev.gcr.io/boyuanz/java:latest >>>>>>>>>>> Starting expansion service at localhost:53569 >>>>>>>>>>> Aug 13, 2020 7:42:11 PM >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService >>>>>>>>>>> loadRegisteredTransforms >>>>>>>>>>> INFO: Registering external transforms: >>>>>>>>>>> [beam:external:java:kafka:read:v1, >>>>>>>>>>> beam:external:java:kafka:write:v1, >>>>>>>>>>> beam:external:java:jdbc:read_rows:v1, >>>>>>>>>>> beam:external:java:jdbc:write:v1, >>>>>>>>>>> beam:external:java:generate_sequence:v1] >>>>>>>>>>> beam:external:java:kafka:read:v1: >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e >>>>>>>>>>> beam:external:java:kafka:write:v1: >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21 >>>>>>>>>>> beam:external:java:jdbc:read_rows:v1: >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712 >>>>>>>>>>> beam:external:java:jdbc:write:v1: >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938 >>>>>>>>>>> beam:external:java:generate_sequence:v1: >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311 >>>>>>>>>>> WARNING:apache_beam.options.pipeline_options_validator:Option >>>>>>>>>>> --zone is deprecated. Please use --worker_zone instead. >>>>>>>>>>> Aug 13, 2020 7:42:12 PM >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>>>>> INFO: Expanding 'WriteToKafka' with URN >>>>>>>>>>> 'beam:external:java:kafka:write:v1' >>>>>>>>>>> Aug 13, 2020 7:42:14 PM >>>>>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>>>>>>>>> INFO: Expanding 'ReadFromKafka' with URN >>>>>>>>>>> 'beam:external:java:kafka:read:v1' >>>>>>>>>>> >>>>>>>>>>> WARNING:root:Make sure that locally built Python SDK docker image >>>>>>>>>>> has Python 3.6 interpreter. >>>>>>>>>>> INFO:root:Using Python SDK docker image: >>>>>>>>>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available >>>>>>>>>>> at local, we will try to pull from hub.docker.com >>>>>>>>>>> Traceback (most recent call last): >>>>>>>>>>> File "<embedded module '_launcher'>", line 165, in >>>>>>>>>>> run_filename_as_main >>>>>>>>>>> File "<embedded module '_launcher'>", line 39, in >>>>>>>>>>> _run_code_in_main >>>>>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", >>>>>>>>>>> line 87, in <module> >>>>>>>>>>> run() >>>>>>>>>>> File "apache_beam/integration/cross_language_kafkaio_test.py", >>>>>>>>>>> line 82, in run >>>>>>>>>>> test_method(beam.Pipeline(options=pipeline_options)) >>>>>>>>>>> File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, >>>>>>>>>>> in run_xlang_kafkaio >>>>>>>>>>> pipeline.run(False) >>>>>>>>>>> File "apache_beam/pipeline.py", line 534, in run >>>>>>>>>>> return self.runner.run_pipeline(self, self._options) >>>>>>>>>>> File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, >>>>>>>>>>> in run_pipeline >>>>>>>>>>> allow_proto_holders=True) >>>>>>>>>>> File "apache_beam/pipeline.py", line 879, in from_runner_api >>>>>>>>>>> p.transforms_stack = >>>>>>>>>>> [context.transforms.get_by_id(root_transform_id)] >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>>>>>>>>> part = context.transforms.get_by_id(transform_id) >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pipeline.py", line 1272, in from_runner_api >>>>>>>>>>> id in proto.outputs.items() >>>>>>>>>>> File "apache_beam/pipeline.py", line 1272, in <dictcomp> >>>>>>>>>>> id in proto.outputs.items() >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/pvalue.py", line 217, in from_runner_api >>>>>>>>>>> proto.windowing_strategy_id), >>>>>>>>>>> File "apache_beam/runners/pipeline_context.py", line 95, in >>>>>>>>>>> get_by_id >>>>>>>>>>> self._id_to_proto[id], self._pipeline_context) >>>>>>>>>>> File "apache_beam/transforms/core.py", line 2597, in >>>>>>>>>>> from_runner_api >>>>>>>>>>> windowfn=WindowFn.from_runner_api(proto.window_fn, context), >>>>>>>>>>> File "apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>>>>> parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>>>>>>>> KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw < >>>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> You should be able to use a WindowInto with any of the common >>>>>>>>>>>> windowing operations (e.g. global, fixed, sliding, sessions) in >>>>>>>>>>>> an >>>>>>>>>>>> external transform. You should also be able to window into an >>>>>>>>>>>> arbitrary WindowFn as long as it produces standards window >>>>>>>>>>>> types, but >>>>>>>>>>>> if there's a bug here you could possibly work around it by >>>>>>>>>>>> windowing >>>>>>>>>>>> into a more standard windowing fn before returning. >>>>>>>>>>>> >>>>>>>>>>>> What is the full traceback? >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang < >>>>>>>>>>>> boyu...@google.com> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > Hi team, >>>>>>>>>>>> > >>>>>>>>>>>> > I'm trying to create an External transform in Java SDK, which >>>>>>>>>>>> expands into several ParDo and a Window.into(FixWindow). When I >>>>>>>>>>>> use this >>>>>>>>>>>> transform in Python SDK, I get an pipeline construction error: >>>>>>>>>>>> > >>>>>>>>>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api >>>>>>>>>>>> > parameter_type, constructor = >>>>>>>>>>>> cls._known_urns[fn_proto.urn] >>>>>>>>>>>> > KeyError: 'beam:window_fn:serialized_java:v1' >>>>>>>>>>>> > >>>>>>>>>>>> > Is it expected that I cannot use a Window.into when building >>>>>>>>>>>> External Ptransform? Or do I miss anything here? >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > Thanks for your help! >>>>>>>>>>>> >>>>>>>>>>>