Coders should only be checked over the language boundaries. On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang <boyu...@google.com> wrote:
> Thanks Cham! > > I just realized that the *beam:window_fn:serialized_**java:v1 *is > introduced by Java *Reshuffle.viaRandomKey()*. But > *Reshuffle.viaRandomKey()* does rewindowed into original window > strategy(which is *GlobalWindows *in my case). Is it expected that we > also check intermediate PCollection rather than only the PCollection that > across the language boundary? > > More about my Ptransform: > MyExternalPTransform -- expand to -- ParDo() -> Reshuffle.viaRandomKey() > -> ParDo() -> WindowInto(FixWindow) -> ParDo() -> output void > > | > > -> ParDo() -> > output PCollection to Python SDK > > On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> Also it's strange that Java used (beam:window_fn:serialized_java:v1) for >> the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is >> what is being registered by Python [2]. This seems to be the immediate >> issue. Tracking bug for supporting custom windows is >> https://issues.apache.org/jira/browse/BEAM-10507. >> >> [1] >> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55 >> [2] >> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449 >> >> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Pipelines that use external WindowingStrategies might be failing during >>> proto -> object -> proto conversion we do today. This limitation will go >>> away once Dataflow directly starts reading Beam protos. We are working on >>> this now. >>> >>> Thanks, >>> Cham >>> >>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang <boyu...@google.com> wrote: >>> >>>> Thanks, Robert! I want to add more details on my External PTransform: >>>> >>>> MyExternalPTransform -- expand to -- ParDo() -> WindowInto(FixWindow) >>>> -> ParDo() -> output void >>>> | >>>> -> >>>> ParDo() -> output PCollection to Python SDK >>>> The full stacktrace: >>>> >>>> INFO:root:Using Java SDK harness container image >>>> dataflow-dev.gcr.io/boyuanz/java:latest >>>> Starting expansion service at localhost:53569 >>>> Aug 13, 2020 7:42:11 PM >>>> org.apache.beam.sdk.expansion.service.ExpansionService >>>> loadRegisteredTransforms >>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, >>>> beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, >>>> beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1] >>>> beam:external:java:kafka:read:v1: >>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@4ac68d3e >>>> beam:external:java:kafka:write:v1: >>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@277c0f21 >>>> beam:external:java:jdbc:read_rows:v1: >>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@6073f712 >>>> beam:external:java:jdbc:write:v1: >>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@43556938 >>>> beam:external:java:generate_sequence:v1: >>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x0000000800b2a440@3d04a311 >>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is >>>> deprecated. Please use --worker_zone instead. >>>> Aug 13, 2020 7:42:12 PM >>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1' >>>> Aug 13, 2020 7:42:14 PM >>>> org.apache.beam.sdk.expansion.service.ExpansionService expand >>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1' >>>> >>>> WARNING:root:Make sure that locally built Python SDK docker image has >>>> Python 3.6 interpreter. >>>> INFO:root:Using Python SDK docker image: >>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at >>>> local, we will try to pull from hub.docker.com >>>> Traceback (most recent call last): >>>> File "<embedded module '_launcher'>", line 165, in run_filename_as_main >>>> File "<embedded module '_launcher'>", line 39, in _run_code_in_main >>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line 87, >>>> in <module> >>>> run() >>>> File "apache_beam/integration/cross_language_kafkaio_test.py", line 82, >>>> in run >>>> test_method(beam.Pipeline(options=pipeline_options)) >>>> File "apache_beam/io/external/xlang_kafkaio_it_test.py", line 94, in >>>> run_xlang_kafkaio >>>> pipeline.run(False) >>>> File "apache_beam/pipeline.py", line 534, in run >>>> return self.runner.run_pipeline(self, self._options) >>>> File "apache_beam/runners/dataflow/dataflow_runner.py", line 496, in >>>> run_pipeline >>>> allow_proto_holders=True) >>>> File "apache_beam/pipeline.py", line 879, in from_runner_api >>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>> part = context.transforms.get_by_id(transform_id) >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>> part = context.transforms.get_by_id(transform_id) >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>> part = context.transforms.get_by_id(transform_id) >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>> part = context.transforms.get_by_id(transform_id) >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>> part = context.transforms.get_by_id(transform_id) >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>> part = context.transforms.get_by_id(transform_id) >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1266, in from_runner_api >>>> part = context.transforms.get_by_id(transform_id) >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pipeline.py", line 1272, in from_runner_api >>>> id in proto.outputs.items() >>>> File "apache_beam/pipeline.py", line 1272, in <dictcomp> >>>> id in proto.outputs.items() >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/pvalue.py", line 217, in from_runner_api >>>> proto.windowing_strategy_id), >>>> File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id >>>> self._id_to_proto[id], self._pipeline_context) >>>> File "apache_beam/transforms/core.py", line 2597, in from_runner_api >>>> windowfn=WindowFn.from_runner_api(proto.window_fn, context), >>>> File "apache_beam/utils/urns.py", line 186, in from_runner_api >>>> parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>> KeyError: 'beam:window_fn:serialized_java:v1' >>>> >>>> >>>> On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> You should be able to use a WindowInto with any of the common >>>>> windowing operations (e.g. global, fixed, sliding, sessions) in an >>>>> external transform. You should also be able to window into an >>>>> arbitrary WindowFn as long as it produces standards window types, but >>>>> if there's a bug here you could possibly work around it by windowing >>>>> into a more standard windowing fn before returning. >>>>> >>>>> What is the full traceback? >>>>> >>>>> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang <boyu...@google.com> >>>>> wrote: >>>>> > >>>>> > Hi team, >>>>> > >>>>> > I'm trying to create an External transform in Java SDK, which >>>>> expands into several ParDo and a Window.into(FixWindow). When I use this >>>>> transform in Python SDK, I get an pipeline construction error: >>>>> > >>>>> > apache_beam/utils/urns.py", line 186, in from_runner_api >>>>> > parameter_type, constructor = cls._known_urns[fn_proto.urn] >>>>> > KeyError: 'beam:window_fn:serialized_java:v1' >>>>> > >>>>> > Is it expected that I cannot use a Window.into when building >>>>> External Ptransform? Or do I miss anything here? >>>>> > >>>>> > >>>>> > Thanks for your help! >>>>> >>>>