Thank you Boyuan for the explanation! This explains why it did not work since Samza does not wire in SamzaPipelineRunner when executing in portable mode yet.
I will create a ticket to update Samza runner. Best, Ke > On Feb 4, 2021, at 12:07 PM, Boyuan Zhang <[email protected]> wrote: > > Hi Ke, > > is it expected that Create.of will be expanded to a SDF > In Java SDK, Create.of will be expanded into CreateSource, which will be > wrapped into SDF implementation. > > with regular pardo:v1 urn? > No, the runner should run SplittableParDoExpander[1] to expand SDF into > SPLITTABLE_PAIR_WITH_RESTRICTION_URN, > SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN and > SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN. > > I do see that SamzaPipelineRunner running the expansion[2]. Can you double > check whether your job invokes that code path? > [1] > https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java > > <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java> > [2] > https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47 > > <https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47> > > > On Thu, Feb 4, 2021 at 11:31 AM Ke Wu <[email protected] > <mailto:[email protected]>> wrote: > Hello Beamers, > > I am trying out a simple pipeline to be executed on PortableRunner: > > ```` > PortablePipelineOptions options = > PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class); > options.setJobEndpoint(some_url); > options.setDefaultEnvironmentType("LOOPBACK"); > options.setRunner(PortableRunner.class); > > Pipeline pipeline = Pipeline.create(options); > > pipeline > .apply(Create.of("1", "2", "3”)) > .apply(…print to console...); > > pipeline.run() > ``` > > This pipeline works with runners such as SamzaRunner, however, when in > portable mode, it does not work. > > I did some debugging and it turns out that it failed because when > Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding > RestrictionTracke is null. This seems to be caused the expanded SDF transform > has urn of "beam:transform:pardo:v1”, in which case FnApiDoFnRunner is > created with > > ``` > mainInputConsumer = this::processElementForParDo; > ``` > > which does not create tracker at all. I do see the other processing method > such as > processElementForSplitRestriction() > processElementForWindowObservingSplitRestriction() > processElementForTruncateRestriction() > > etc are creating trackers properly before invoking DoFn, however, they are > requiring a different Urn for the Transform. > > My questions here are, did I miss anything? is it expected that Create.of > will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is the > expected behavior when FnApiDoFnRunner invokes > Read.BoundedSourceAsSDFWrapperFn? > > Best, > Ke > >
