That's great. Thanks for pushing forward this effort : ) On Thu, Feb 4, 2021 at 3:29 PM Ke Wu <[email protected]> wrote:
> Thank you Boyuan for the explanation! This explains why it did not work > since Samza does not wire in SamzaPipelineRunner when executing in portable > mode yet. > > I will create a ticket to update Samza runner. > > Best, > Ke > > On Feb 4, 2021, at 12:07 PM, Boyuan Zhang <[email protected]> wrote: > > Hi Ke, > > is it expected that Create.of will be expanded to a SDF > > In Java SDK, Create.of will be expanded into CreateSource, which will be > wrapped into SDF implementation. > > with regular pardo:v1 urn? > > No, the runner should run SplittableParDoExpander[1] to expand SDF > into SPLITTABLE_PAIR_WITH_RESTRICTION_URN, > SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN > and SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN. > > I do see that SamzaPipelineRunner running the expansion[2]. Can you double > check whether your job invokes that code path? > [1] > https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java > [2] > https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47 > > > On Thu, Feb 4, 2021 at 11:31 AM Ke Wu <[email protected]> wrote: > >> Hello Beamers, >> >> I am trying out a simple pipeline to be executed on PortableRunner: >> >> ```` >> PortablePipelineOptions options = >> PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class); >> options.setJobEndpoint(some_url); >> options.setDefaultEnvironmentType("LOOPBACK"); >> options.setRunner(PortableRunner.class); >> >> Pipeline pipeline = Pipeline.create(options); >> >> pipeline >> .apply(Create.of("1", "2", "3”)) >> .apply(…print to console...); >> >> pipeline.run() >> ``` >> >> This pipeline works with runners such as SamzaRunner, however, when in >> portable mode, it does not work. >> >> I did some debugging and it turns out that it failed because >> when Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding >> RestrictionTracke is null. This seems to be caused the expanded SDF >> transform has urn of "beam:transform:pardo:v1”, in which case >> FnApiDoFnRunner is created with >> >> ``` >> mainInputConsumer = this::processElementForParDo; >> ``` >> >> which does not create tracker at all. I do see the other processing >> method such as >> >> - processElementForSplitRestriction() >> - processElementForWindowObservingSplitRestriction() >> - processElementForTruncateRestriction() >> >> >> etc are creating trackers properly before invoking DoFn, however, they >> are requiring a different Urn for the Transform. >> >> My questions here are, did I miss anything? is it expected that Create.of >> will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is >> the expected behavior when FnApiDoFnRunner >> invokes Read.BoundedSourceAsSDFWrapperFn? >> >> Best, >> Ke >> >> >> >
