Hello Beamers,
I am trying out a simple pipeline to be executed on PortableRunner:
````
PortablePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
options.setJobEndpoint(some_url);
options.setDefaultEnvironmentType("LOOPBACK");
options.setRunner(PortableRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(Create.of("1", "2", "3”))
.apply(…print to console...);
pipeline.run()
```
This pipeline works with runners such as SamzaRunner, however, when in portable
mode, it does not work.
I did some debugging and it turns out that it failed because when
Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding
RestrictionTracke is null. This seems to be caused the expanded SDF transform
has urn of "beam:transform:pardo:v1”, in which case FnApiDoFnRunner is created
with
```
mainInputConsumer = this::processElementForParDo;
```
which does not create tracker at all. I do see the other processing method such
as
processElementForSplitRestriction()
processElementForWindowObservingSplitRestriction()
processElementForTruncateRestriction()
etc are creating trackers properly before invoking DoFn, however, they are
requiring a different Urn for the Transform.
My questions here are, did I miss anything? is it expected that Create.of will
be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is the
expected behavior when FnApiDoFnRunner invokes Read.BoundedSourceAsSDFWrapperFn?
Best,
Ke