That's great. Thanks for pushing forward this effort : )

On Thu, Feb 4, 2021 at 3:29 PM Ke Wu <[email protected]> wrote:

> Thank you Boyuan for the explanation! This explains why it did not work
> since Samza does not wire in SamzaPipelineRunner when executing in portable
> mode yet.
>
> I will create a ticket to update Samza runner.
>
> Best,
> Ke
>
> On Feb 4, 2021, at 12:07 PM, Boyuan Zhang <[email protected]> wrote:
>
> Hi Ke,
>
>  is it expected that Create.of will be expanded to a SDF
>
> In Java SDK, Create.of will be expanded into CreateSource, which will be
> wrapped into SDF implementation.
>
>  with regular pardo:v1 urn?
>
> No, the runner should run SplittableParDoExpander[1] to expand SDF
> into SPLITTABLE_PAIR_WITH_RESTRICTION_URN, 
> SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
> and SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.
>
> I do see that SamzaPipelineRunner running the expansion[2]. Can you double
> check whether your job invokes that code path?
> [1]
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
> [2]
> https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47
>
>
> On Thu, Feb 4, 2021 at 11:31 AM Ke Wu <[email protected]> wrote:
>
>> Hello Beamers,
>>
>> I am trying out a simple pipeline to be executed on PortableRunner:
>>
>> ````
>> PortablePipelineOptions options =
>> PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
>> options.setJobEndpoint(some_url);
>> options.setDefaultEnvironmentType("LOOPBACK");
>> options.setRunner(PortableRunner.class);
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>> pipeline
>>     .apply(Create.of("1", "2", "3”))
>>     .apply(…print to console...);
>>
>> pipeline.run()
>> ```
>>
>> This pipeline works with runners such as SamzaRunner, however, when in
>> portable mode, it does not work.
>>
>> I did some debugging and it turns out that it failed because
>> when Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding
>> RestrictionTracke is null. This seems to be caused the expanded SDF
>> transform has urn of "beam:transform:pardo:v1”, in which case
>> FnApiDoFnRunner is created with
>>
>> ```
>> mainInputConsumer = this::processElementForParDo;
>> ```
>>
>> which does not create tracker at all. I do see the other processing
>> method such as
>>
>>    - processElementForSplitRestriction()
>>    - processElementForWindowObservingSplitRestriction()
>>    - processElementForTruncateRestriction()
>>
>>
>> etc are creating trackers properly before invoking DoFn, however, they
>> are requiring a different Urn for the Transform.
>>
>> My questions here are, did I miss anything? is it expected that Create.of
>> will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is
>> the expected behavior when FnApiDoFnRunner
>> invokes Read.BoundedSourceAsSDFWrapperFn?
>>
>> Best,
>> Ke
>>
>>
>>
>

Reply via email to