chamikaramj commented on code in PR #31398:
URL: https://github.com/apache/beam/pull/31398#discussion_r1614065093


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -522,11 +521,24 @@ public RunnerApi.PTransform translate(
         }
 
         if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) {
+          ExternalTransforms.SchemaTransformPayload payload =
+              
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload());
+          String identifier = payload.getIdentifier();
           transformBuilder.putAnnotations(
-              SCHEMATRANSFORM_URN_KEY,
-              ByteString.copyFromUtf8(
-                  
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload())
-                      .getIdentifier()));
+              BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY),
+              ByteString.copyFromUtf8(identifier));
+          if (identifier.equals(MANAGED_TRANSFORM_URN)) {
+            Schema configSchema =
+                
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+            Row configRow =
+                
RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput());
+            String underlyingIdentifier =
+                MoreObjects.firstNonNull(
+                    configRow.getString("transform_identifier"), 
"unknown_identifier");
+            transformBuilder.putAnnotations(

Review Comment:
   Is there a valid case where "transform_identifier" would not be set ? If not 
we should just error out.



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto:
##########
@@ -111,6 +111,15 @@ message BuilderMethod {
   bytes payload = 3;
 }
 
+message Annotations {
+  enum Enum {
+    CONFIG_ROW_KEY = 0 [(org.apache.beam.model.pipeline.v1.beam_constant) = 
"config_row"];

Review Comment:
   Let's add short descriptions regarding each of these.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -522,11 +521,24 @@ public RunnerApi.PTransform translate(
         }
 
         if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) {
+          ExternalTransforms.SchemaTransformPayload payload =
+              
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload());
+          String identifier = payload.getIdentifier();
           transformBuilder.putAnnotations(
-              SCHEMATRANSFORM_URN_KEY,
-              ByteString.copyFromUtf8(
-                  
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload())
-                      .getIdentifier()));
+              BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY),
+              ByteString.copyFromUtf8(identifier));
+          if (identifier.equals(MANAGED_TRANSFORM_URN)) {
+            Schema configSchema =
+                
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+            Row configRow =
+                
RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput());
+            String underlyingIdentifier =
+                MoreObjects.firstNonNull(
+                    configRow.getString("transform_identifier"), 
"unknown_identifier");

Review Comment:
   Let's add unit tests to make sure that the annotations get added correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to