Thank you Robert for the response.
After some more digging, I realized that Flink/Spark runner is translating
composite transforms with
// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
TrivialNativeTransformExpander.forKnownUrns(
pipelineWithSdfExpanded, translator.knownUrns());
which will mark composite transform whose urn is in translator list primitive,
i.e, leaf node, therefore the traverse latter will be able to return it to be
translated.
I believe this is a step that is currently missing in the Samza runner which
makes the composite transform translation not working properly.
Since I am working on Samza runner, I will post a PR to fix it.
Best,
Ke
> On Jul 26, 2021, at 5:45 PM, Robert Bradshaw <[email protected]> wrote:
>
> You can think of composite transforms like subroutines--they're useful
> concepts for representing the logical structure of the pipeline, but
> for the purposes of execution it is just as valid to inline them all
> as a single monolithic function/pipeline composed of nothing but
> primitive calls. Flink/Spark/Samza have no native notion of composite
> transforms, so this is what they do. If you can preserve the more rich
> structure that has advantages (e.g. for monitoring, debugging, rolling
> up counters and messages, visualizing the pipeline).
>
> There is one other important case for composites that runners may want
> to take advantage of: runners may recognize higher-level transforms
> and substitute their own (equivalent, of course) implementations. The
> prototypical example of this is combiner lifting, where CombinePerKey
> is naively implemented as GroupByKey + CombineAllValuesDoFn, but most
> runners have more sophisticated ways of handling associative,
> commutative CombineFn aggregations (See
> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_260
> )
>
> - Robert
>
> On Mon, Jul 26, 2021 at 5:27 PM Ke Wu <[email protected]> wrote:
>>
>> Hello All,
>>
>> I noticed that Flink/Spark/Samza runners are translating portable pipeline
>> in the similar manner:
>>
>> QueryablePipeline p =
>> QueryablePipeline.forTransforms(
>> pipeline.getRootTransformIdsList(), pipeline.getComponents());
>>
>> for (PipelineNode.PTransformNode transform :
>> p.getTopologicallyOrderedTransforms()) {
>> // Translation logic
>> }
>>
>> However, IIUC, this only iterates through leaf nodes of the pipeline, i.e.
>> composite transforms are NOT being translated at all.
>>
>> Is this the expected behavior for runner to implement translation logic for
>> portable pipeline? If Yes, what are the suggestions if certain runners need
>> to translate composite transforms?
>>
>> Best,
>> Ke