Hello All,
When I am looking at translating composite transforms in potable pipeline, I
realized that TrivialNativeTransformExpander[1] is being used to identify
native transforms by transform urn, and it removes sub-transform and
environment id in the corresponding transform node. However, QueryablePipeline
seems to identify primitive transforms in a different approach [2], which
requires us to register runner native transforms again [3][4] in addition to
the transform translators.
An idea came to me that we should be able to identify primitive/native
transform by look at its environment according to protobuf model [5],
// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;
therefore, I updated the logic:
private static boolean isPrimitiveTransform(PTransform transform) {
String urn = PTransformTranslation.urnForTransformOrNull(transform);
- return PRIMITIVE_URNS.contains(urn) ||
NativeTransforms.isNative(transform);
+ return transform.getEnvironmentId().isEmpty()
}
However, tests started to fail on SQL cases where I found that external
transforms seem to have empty environment id as well [6], which does not seem
to confront the protobuf model.
My questions here are:
1. Is NativeTranforms required to register a primitive/native transform in
addition to register with translators?
2. Is empty environment_id a good enough indicator to identify a
native/primitive transform?
3. Is external transform suppose to have empty or non-empty environment_id?
Best,
Ke
[1]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44>
[2]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186>
[3]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254>
[4]
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412>
[5]
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
<https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194>
[6]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392>