Hi Ke, is it expected that Create.of will be expanded to a SDF
In Java SDK, Create.of will be expanded into CreateSource, which will be wrapped into SDF implementation. with regular pardo:v1 urn? No, the runner should run SplittableParDoExpander[1] to expand SDF into SPLITTABLE_PAIR_WITH_RESTRICTION_URN, SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN and SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN. I do see that SamzaPipelineRunner running the expansion[2]. Can you double check whether your job invokes that code path? [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java [2] https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47 On Thu, Feb 4, 2021 at 11:31 AM Ke Wu <[email protected]> wrote: > Hello Beamers, > > I am trying out a simple pipeline to be executed on PortableRunner: > > ```` > PortablePipelineOptions options = > PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class); > options.setJobEndpoint(some_url); > options.setDefaultEnvironmentType("LOOPBACK"); > options.setRunner(PortableRunner.class); > > Pipeline pipeline = Pipeline.create(options); > > pipeline > .apply(Create.of("1", "2", "3”)) > .apply(…print to console...); > > pipeline.run() > ``` > > This pipeline works with runners such as SamzaRunner, however, when in > portable mode, it does not work. > > I did some debugging and it turns out that it failed because > when Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding > RestrictionTracke is null. This seems to be caused the expanded SDF > transform has urn of "beam:transform:pardo:v1”, in which case > FnApiDoFnRunner is created with > > ``` > mainInputConsumer = this::processElementForParDo; > ``` > > which does not create tracker at all. I do see the other processing method > such as > > - processElementForSplitRestriction() > - processElementForWindowObservingSplitRestriction() > - processElementForTruncateRestriction() > > > etc are creating trackers properly before invoking DoFn, however, they are > requiring a different Urn for the Transform. > > My questions here are, did I miss anything? is it expected that Create.of > will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is > the expected behavior when FnApiDoFnRunner > invokes Read.BoundedSourceAsSDFWrapperFn? > > Best, > Ke > > >
