chamikaramj commented on code in PR #31398: URL: https://github.com/apache/beam/pull/31398#discussion_r1614065093
########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java: ########## @@ -522,11 +521,24 @@ public RunnerApi.PTransform translate( } if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { + ExternalTransforms.SchemaTransformPayload payload = + ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()); + String identifier = payload.getIdentifier(); transformBuilder.putAnnotations( - SCHEMATRANSFORM_URN_KEY, - ByteString.copyFromUtf8( - ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier())); + BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY), + ByteString.copyFromUtf8(identifier)); + if (identifier.equals(MANAGED_TRANSFORM_URN)) { + Schema configSchema = + SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + Row configRow = + RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput()); + String underlyingIdentifier = + MoreObjects.firstNonNull( + configRow.getString("transform_identifier"), "unknown_identifier"); + transformBuilder.putAnnotations( Review Comment: Is there a valid case where "transform_identifier" would not be set ? If not we should just error out. ########## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto: ########## @@ -111,6 +111,15 @@ message BuilderMethod { bytes payload = 3; } +message Annotations { + enum Enum { + CONFIG_ROW_KEY = 0 [(org.apache.beam.model.pipeline.v1.beam_constant) = "config_row"]; Review Comment: Let's add short descriptions regarding each of these. ########## sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java: ########## @@ -522,11 +521,24 @@ public RunnerApi.PTransform translate( } if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { + ExternalTransforms.SchemaTransformPayload payload = + ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()); + String identifier = payload.getIdentifier(); transformBuilder.putAnnotations( - SCHEMATRANSFORM_URN_KEY, - ByteString.copyFromUtf8( - ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier())); + BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY), + ByteString.copyFromUtf8(identifier)); + if (identifier.equals(MANAGED_TRANSFORM_URN)) { + Schema configSchema = + SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + Row configRow = + RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput()); + String underlyingIdentifier = + MoreObjects.firstNonNull( + configRow.getString("transform_identifier"), "unknown_identifier"); Review Comment: Let's add unit tests to make sure that the annotations get added correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org