Hi Ke,

 is it expected that Create.of will be expanded to a SDF

In Java SDK, Create.of will be expanded into CreateSource, which will be
wrapped into SDF implementation.

 with regular pardo:v1 urn?

No, the runner should run SplittableParDoExpander[1] to expand SDF
into SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
and SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.

I do see that SamzaPipelineRunner running the expansion[2]. Can you double
check whether your job invokes that code path?
[1]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
[2]
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47


On Thu, Feb 4, 2021 at 11:31 AM Ke Wu <[email protected]> wrote:

> Hello Beamers,
>
> I am trying out a simple pipeline to be executed on PortableRunner:
>
> ````
> PortablePipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
> options.setJobEndpoint(some_url);
> options.setDefaultEnvironmentType("LOOPBACK");
> options.setRunner(PortableRunner.class);
>
> Pipeline pipeline = Pipeline.create(options);
>
> pipeline
>     .apply(Create.of("1", "2", "3”))
>     .apply(…print to console...);
>
> pipeline.run()
> ```
>
> This pipeline works with runners such as SamzaRunner, however, when in
> portable mode, it does not work.
>
> I did some debugging and it turns out that it failed because
> when Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding
> RestrictionTracke is null. This seems to be caused the expanded SDF
> transform has urn of "beam:transform:pardo:v1”, in which case
> FnApiDoFnRunner is created with
>
> ```
> mainInputConsumer = this::processElementForParDo;
> ```
>
> which does not create tracker at all. I do see the other processing method
> such as
>
>    - processElementForSplitRestriction()
>    - processElementForWindowObservingSplitRestriction()
>    - processElementForTruncateRestriction()
>
>
> etc are creating trackers properly before invoking DoFn, however, they are
> requiring a different Urn for the Transform.
>
> My questions here are, did I miss anything? is it expected that Create.of
> will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is
> the expected behavior when FnApiDoFnRunner
> invokes Read.BoundedSourceAsSDFWrapperFn?
>
> Best,
> Ke
>
>
>

Reply via email to