This is an automated email from the ASF dual-hosted git repository. austin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new bdbbef9b54d [YAML] - PubSubLite proto (#30129) bdbbef9b54d is described below commit bdbbef9b54dea5f76a2836047f5c56645b6f0d10 Author: Ferran Fernández Garrido <ffernandez....@gmail.com> AuthorDate: Thu Feb 1 19:07:37 2024 +0100 [YAML] - PubSubLite proto (#30129) * [YAML] - PubSubLite proto * [YAML] - PubSubLite proto --- sdks/java/extensions/protobuf/build.gradle | 1 + .../sdk/extensions/protobuf/ProtoByteUtils.java | 79 +++++++++++++++++-- .../extensions/protobuf/ProtoByteUtilsTest.java | 72 +++++++++++++++++ .../PubsubLiteReadSchemaTransformProvider.java | 84 ++++++++++++++++---- .../PubsubLiteWriteSchemaTransformProvider.java | 51 ++++++++++++- .../gcp/pubsublite/internal/PubsubLiteDlqTest.java | 89 ++++++++++++++++++++++ .../internal/PubsubLiteWriteDlqTest.java | 31 ++++++++ sdks/python/apache_beam/yaml/standard_io.yaml | 8 ++ 8 files changed, 392 insertions(+), 23 deletions(-) diff --git a/sdks/java/extensions/protobuf/build.gradle b/sdks/java/extensions/protobuf/build.gradle index 1582492c293..088f5aca63c 100644 --- a/sdks/java/extensions/protobuf/build.gradle +++ b/sdks/java/extensions/protobuf/build.gradle @@ -38,6 +38,7 @@ dependencies { implementation library.java.commons_compress implementation library.java.slf4j_api implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation("com.google.cloud:google-cloud-storage:2.32.1") implementation library.java.protobuf_java implementation("com.squareup.wire:wire-schema-jvm:4.9.3") implementation("io.apicurio:apicurio-registry-protobuf-schema-utilities:3.0.0.M2") diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java index 02419ec0f61..dd73739246d 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java @@ -20,6 +20,9 @@ package org.apache.beam.sdk.extensions.protobuf; import static java.util.stream.Collectors.toList; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -41,15 +44,15 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.commons.compress.utils.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Utility class for working with Protocol Buffer (Proto) data in the context of Apache Beam. This - * class provides methods to retrieve Beam Schemas from Proto messages, convert Proto bytes to Beam - * Rows, and vice versa. It also includes utilities for handling Protocol Buffer schemas and related - * file operations. + * Utility class for working with Protocol Buffer (Proto) data. This class provides methods to + * retrieve Beam Schemas from Proto messages, convert Proto bytes to Beam Rows, and vice versa. It + * also includes utilities for handling Protocol Buffer schemas and related file operations. * * <p>Users can utilize the methods in this class to facilitate the integration of Proto data * processing within Apache Beam pipelines, allowing for the seamless transformation of Proto @@ -105,7 +108,11 @@ public class ProtoByteUtils { try { Descriptors.FileDescriptor fileDescriptor = FileDescriptorUtils.protoFileToFileDescriptor(result); - return fileDescriptor.findMessageTypeByName(messageName); + + List<String> messageElements = Splitter.on('.').splitToList(messageName); + String messageTypeByName = messageElements.get(messageElements.size() - 1); + + return fileDescriptor.findMessageTypeByName(messageTypeByName); } catch (Descriptors.DescriptorValidationException e) { throw new RuntimeException(e); } @@ -148,10 +155,12 @@ public class ProtoByteUtils { @Override public Row apply(byte[] input) { try { + List<String> messageElements = Splitter.on('.').splitToList(messageName); + String messageTypeByName = messageElements.get(messageElements.size() - 1); final Descriptors.Descriptor descriptor = protoDomain .getFileDescriptor(dynamicProtoDomain.getFileName()) - .findMessageTypeByName(messageName); + .findMessageTypeByName(messageTypeByName); DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, input); SerializableFunction<DynamicMessage, Row> res = protoDynamicMessageSchema.getToRowFunction(); @@ -243,6 +252,41 @@ public class ProtoByteUtils { * @throws RuntimeException if an error occurs while finding or opening the file. */ private static ReadableByteChannel getFileByteChannel(String filePath) { + if (isGcsPath(filePath)) { + return openGcsFile(filePath); + } else { + return openLocalFile(filePath); + } + } + + private static boolean isGcsPath(String filePath) { + return filePath.startsWith("gs://"); + } + + /** + * Opens a ReadableByteChannel for reading from a Google Cloud Storage (GCS) file. + * + * @param filePath The GCS file path (e.g., "gs://your-bucket-name/your-object-name"). + * @return A ReadableByteChannel for reading from the specified GCS file. + */ + private static ReadableByteChannel openGcsFile(String filePath) { + Storage storage = StorageOptions.getDefaultInstance().getService(); + String bucketName = getBucketName(filePath); + String objectName = getObjectName(filePath); + Blob blob = storage.get(bucketName, objectName); + return blob.reader(); + } + + /** + * Opens a ReadableByteChannel for reading from a local file using the Apache Beam FileSystems + * API. + * + * @param filePath The local file path. + * @return A ReadableByteChannel for reading from the specified local file. + * @throws IllegalArgumentException If no files match the specified pattern or if more than one + * file matches. + */ + private static ReadableByteChannel openLocalFile(String filePath) { try { MatchResult result = FileSystems.match(filePath); checkArgument( @@ -259,6 +303,29 @@ public class ProtoByteUtils { } } + /** + * Extracts the bucket name from a Google Cloud Storage (GCS) file path. + * + * @param gcsPath The GCS file path (e.g., "gs://your-bucket-name/your-object-name"). + * @return The bucket name extracted from the GCS path. + */ + private static String getBucketName(String gcsPath) { + int startIndex = "gs://".length(); + int endIndex = gcsPath.indexOf('/', startIndex); + return gcsPath.substring(startIndex, endIndex); + } + + /** + * Extracts the object name from a Google Cloud Storage (GCS) file path. + * + * @param gcsPath The GCS file path (e.g., "gs://your-bucket-name/your-object-name"). + * @return The object name extracted from the GCS path. + */ + private static String getObjectName(String gcsPath) { + int startIndex = gcsPath.indexOf('/', "gs://".length()) + 1; + return gcsPath.substring(startIndex); + } + /** * Represents metadata associated with a Protocol Buffer schema, including the File Name and * ProtoDomain. diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java index 04bcde6a0fe..6105208d836 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java @@ -48,6 +48,26 @@ public class ProtoByteUtilsTest { + " Address address = 4;\n" + "}"; + private static final String PROTO_STRING_PACKAGE_SCHEMA = + "syntax = \"proto3\";\n" + + "package com.test.proto;" + + "\n" + + "message MyMessage {\n" + + " int32 id = 1;\n" + + " string name = 2;\n" + + " bool active = 3;\n" + + "\n" + + " // Nested field\n" + + " message Address {\n" + + " string street = 1;\n" + + " string city = 2;\n" + + " string state = 3;\n" + + " string zip_code = 4;\n" + + " }\n" + + "\n" + + " Address address = 4;\n" + + "}"; + private static final String DESCRIPTOR_PATH = Objects.requireNonNull( ProtoByteUtilsTest.class.getResource( @@ -84,6 +104,14 @@ public class ProtoByteUtilsTest { Assert.assertEquals(schema.getFieldNames(), SCHEMA.getFieldNames()); } + @Test + public void testProtoSchemaWitPackageStringToBeamSchema() { + Schema schema = + ProtoByteUtils.getBeamSchemaFromProtoSchema( + PROTO_STRING_PACKAGE_SCHEMA, "com.test.proto.MyMessage"); + Assert.assertEquals(schema.getFieldNames(), SCHEMA.getFieldNames()); + } + @Test public void testProtoBytesToRowFunctionGenerateSerializableFunction() { SerializableFunction<byte[], Row> protoBytesToRowFunction = @@ -111,6 +139,22 @@ public class ProtoByteUtilsTest { protoBytesToRowFunction.apply(inputBytes); } + @Test + public void testProtoBytesToRowFunctionReturnsRowSuccess() { + // Create a proto bytes to row function + SerializableFunction<byte[], Row> protoBytesToRowFunction = + ProtoByteUtils.getProtoBytesToRowFunction(DESCRIPTOR_PATH, MESSAGE_NAME); + + byte[] byteArray = { + 8, -46, 9, 18, 3, 68, 111, 101, 34, 35, 10, 7, 115, 101, 97, 116, 116, 108, 101, 18, 11, 102, + 97, 107, 101, 32, 115, 116, 114, 101, 101, 116, 26, 2, 119, 97, 34, 7, 84, 79, 45, 49, 50, 51, + 52 + }; + + Row row = protoBytesToRowFunction.apply(byteArray); + Assert.assertEquals("Doe", row.getValue("name")); + } + @Test public void testRowToProtoFunction() { Row row = @@ -144,4 +188,32 @@ public class ProtoByteUtilsTest { Assert.assertNotNull( ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, "MyMessage").apply(row)); } + + @Test + public void testRowToProtoSchemaWithPackageFunction() { + Row row = + Row.withSchema(SCHEMA) + .withFieldValue("id", 1234) + .withFieldValue("name", "Doe") + .withFieldValue("active", false) + .withFieldValue("address.city", "seattle") + .withFieldValue("address.street", "fake street") + .withFieldValue("address.zip_code", "TO-1234") + .withFieldValue("address.state", "wa") + .build(); + + byte[] byteArray = { + 8, -46, 9, 18, 3, 68, 111, 101, 34, 35, 10, 7, 115, 101, 97, 116, 116, 108, 101, 18, 11, 102, + 97, 107, 101, 32, 115, 116, 114, 101, 101, 116, 26, 2, 119, 97, 34, 7, 84, 79, 45, 49, 50, 51, + 52 + }; + + byte[] resultBytes = + ProtoByteUtils.getRowToProtoBytesFromSchema( + PROTO_STRING_PACKAGE_SCHEMA, "com.test.proto.MyMessage") + .apply(row); + + Assert.assertNotNull(resultBytes); + Assert.assertArrayEquals(byteArray, resultBytes); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java index 98c5f8a6b38..8afe730f32c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.function.Consumer; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -77,7 +78,7 @@ public class PubsubLiteReadSchemaTransformProvider private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class); - public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON"; + public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON,PROTO"; public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet(VALID_FORMATS_STR.split(",")); @@ -207,26 +208,39 @@ public class PubsubLiteReadSchemaTransformProvider Schema beamSchema; if (format != null && format.equals("RAW")) { - if (inputSchema != null) { - throw new IllegalArgumentException( - "To read from PubSubLite in RAW format, you can't provide a schema."); - } + beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); valueMapper = getRawBytesToRowFunction(beamSchema); + } else if (format != null && format.equals("PROTO")) { + String fileDescriptorPath = configuration.getFileDescriptorPath(); + String messageName = configuration.getMessageName(); + + if (fileDescriptorPath != null && messageName != null) { + beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, messageName); + valueMapper = ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName); + } else if (inputSchema != null && messageName != null) { + beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, messageName); + valueMapper = ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName); + } else { + throw new IllegalArgumentException( + "To read from PubSubLite in PROTO format, either descriptorPath or schema must be provided."); + } + } else { - if (inputSchema == null) { + if (inputSchema != null) { + beamSchema = + Objects.equals(configuration.getFormat(), "JSON") + ? JsonUtils.beamSchemaFromJsonSchema(inputSchema) + : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema)); + valueMapper = + Objects.equals(configuration.getFormat(), "JSON") + ? JsonUtils.getJsonBytesToRowFunction(beamSchema) + : AvroUtils.getAvroBytesToRowFunction(beamSchema); + } else { throw new IllegalArgumentException( - "To read from PubSubLite in JSON or AVRO format, you must provide a schema."); + "To read from Pubsub Lite in JSON or AVRO format, you must provide a schema."); } - beamSchema = - Objects.equals(configuration.getFormat(), "JSON") - ? JsonUtils.beamSchemaFromJsonSchema(inputSchema) - : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema)); - valueMapper = - Objects.equals(configuration.getFormat(), "JSON") - ? JsonUtils.getJsonBytesToRowFunction(beamSchema) - : AvroUtils.getAvroBytesToRowFunction(beamSchema); } return new SchemaTransform() { @Override @@ -404,13 +418,33 @@ public class PubsubLiteReadSchemaTransformProvider @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract static class PubsubLiteReadSchemaTransformConfiguration { + + public void validate() { + final String dataFormat = this.getFormat(); + assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat) + : "Valid data formats are " + VALID_DATA_FORMATS; + + final String inputSchema = this.getSchema(); + final String messageName = this.getMessageName(); + + if (dataFormat != null && dataFormat.equals("RAW")) { + assert inputSchema == null + : "To read from Pubsub Lite in RAW format, you can't provide a schema."; + } + + if (dataFormat != null && dataFormat.equals("PROTO")) { + assert messageName != null + : "To read from Pubsub Lite in PROTO format, messageName must be provided."; + } + } + @SchemaFieldDescription( "The encoding format for the data stored in Pubsub Lite. Valid options are: " + VALID_FORMATS_STR) public abstract String getFormat(); @SchemaFieldDescription( - "The schema in which the data is encoded in the Kafka topic. " + "The schema in which the data is encoded in the Pubsub Lite topic. " + "For AVRO data, this is a schema defined with AVRO schema syntax " + "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). " + "For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/).") @@ -459,6 +493,18 @@ public class PubsubLiteReadSchemaTransformProvider + "case, deduplication of the stream will be strictly best effort.") public abstract @Nullable String getAttributeId(); + @SchemaFieldDescription( + "The path to the Protocol Buffer File Descriptor Set file. This file is used for schema" + + " definition and message serialization.") + @Nullable + public abstract String getFileDescriptorPath(); + + @SchemaFieldDescription( + "The name of the Protocol Buffer message to be used for schema" + + " extraction and data conversion.") + @Nullable + public abstract String getMessageName(); + public static Builder builder() { return new AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration .Builder(); @@ -486,6 +532,12 @@ public class PubsubLiteReadSchemaTransformProvider @SuppressWarnings("unused") public abstract Builder setAttributeId(String attributeId); + @SuppressWarnings("unused") + public abstract Builder setFileDescriptorPath(String fileDescriptorPath); + + @SuppressWarnings("unused") + public abstract Builder setMessageName(String messageName); + public abstract PubsubLiteReadSchemaTransformConfiguration build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java index c669f84f330..8ba8176035d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -71,7 +72,7 @@ public class PubsubLiteWriteSchemaTransformProvider extends TypedSchemaTransformProvider< PubsubLiteWriteSchemaTransformProvider.PubsubLiteWriteSchemaTransformConfiguration> { - public static final String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO"; + public static final String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO,PROTO"; public static final Set<String> SUPPORTED_FORMATS = Sets.newHashSet(SUPPORTED_FORMATS_STR.split(",")); public static final TupleTag<PubSubMessage> OUTPUT_TAG = new TupleTag<PubSubMessage>() {}; @@ -211,6 +212,19 @@ public class PubsubLiteWriteSchemaTransformProvider "The input schema must have exactly one field of type byte."); } toBytesFn = getRowToRawBytesFunction(inputSchema.getField(0).getName()); + } else if (configuration.getFormat().equals("PROTO")) { + String descriptorPath = configuration.getFileDescriptorPath(); + String schema = configuration.getSchema(); + String messageName = configuration.getMessageName(); + + if (descriptorPath != null && messageName != null) { + toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath, messageName); + } else if (schema != null && messageName != null) { + toBytesFn = ProtoByteUtils.getRowToProtoBytesFromSchema(schema, messageName); + } else { + throw new IllegalArgumentException( + "At least a descriptorPath or a PROTO schema is required."); + } } else if (configuration.getFormat().equals("JSON")) { toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema); } else { @@ -322,6 +336,20 @@ public class PubsubLiteWriteSchemaTransformProvider @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract static class PubsubLiteWriteSchemaTransformConfiguration { + + public void validate() { + final String dataFormat = this.getFormat(); + final String inputSchema = this.getSchema(); + final String messageName = this.getMessageName(); + final String descriptorPath = this.getFileDescriptorPath(); + + if (dataFormat != null && dataFormat.equals("PROTO")) { + assert messageName != null : "Expecting messageName to be non-null."; + assert descriptorPath != null && inputSchema != null + : "You must include a descriptorPath or a PROTO schema but not both."; + } + } + @SchemaFieldDescription( "The GCP project where the Pubsub Lite reservation resides. This can be a " + "project number of a project ID.") @@ -358,6 +386,18 @@ public class PubsubLiteWriteSchemaTransformProvider + "in a ReadFromPubSubLite PTransform to deduplicate messages.") public abstract @Nullable String getAttributeId(); + @SchemaFieldDescription( + "The path to the Protocol Buffer File Descriptor Set file. This file is used for schema" + + " definition and message serialization.") + public abstract @Nullable String getFileDescriptorPath(); + + @SchemaFieldDescription( + "The name of the Protocol Buffer message to be used for schema" + + " extraction and data conversion.") + public abstract @Nullable String getMessageName(); + + public abstract @Nullable String getSchema(); + public static Builder builder() { return new AutoValue_PubsubLiteWriteSchemaTransformProvider_PubsubLiteWriteSchemaTransformConfiguration .Builder(); @@ -380,6 +420,15 @@ public class PubsubLiteWriteSchemaTransformProvider @SuppressWarnings("unused") public abstract Builder setAttributeId(String attributeId); + @SuppressWarnings("unused") + public abstract Builder setFileDescriptorPath(String fileDescriptorPath); + + @SuppressWarnings("unused") + public abstract Builder setMessageName(String messageName); + + @SuppressWarnings("unused") + public abstract Builder setSchema(String schema); + public abstract PubsubLiteWriteSchemaTransformConfiguration build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java index e23536d800e..4acf0a1149e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java @@ -31,6 +31,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO; import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider; import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.ErrorFn; @@ -252,6 +253,67 @@ public class PubsubLiteDlqTest { .build()) .build()); + private static final String PROTO_STRING_SCHEMA = + "syntax = \"proto3\";\n" + + "package com.test.proto;" + + "\n" + + "message MyMessage {\n" + + " int32 id = 1;\n" + + " string name = 2;\n" + + " bool active = 3;\n" + + "\n" + + " // Nested field\n" + + " message Address {\n" + + " string street = 1;\n" + + " string city = 2;\n" + + " string state = 3;\n" + + " string zip_code = 4;\n" + + " }\n" + + "\n" + + " Address address = 4;\n" + + "}"; + + private static final Schema BEAM_PROTO_SCHEMA = + Schema.builder() + .addField("id", Schema.FieldType.INT32) + .addField("name", Schema.FieldType.STRING) + .addField("active", Schema.FieldType.BOOLEAN) + .addField( + "address", + Schema.FieldType.row( + Schema.builder() + .addField("city", Schema.FieldType.STRING) + .addField("street", Schema.FieldType.STRING) + .addField("state", Schema.FieldType.STRING) + .addField("zip_code", Schema.FieldType.STRING) + .build())) + .build(); + + private static final Row INPUT_ROW = + Row.withSchema(BEAM_PROTO_SCHEMA) + .withFieldValue("id", 1234) + .withFieldValue("name", "Doe") + .withFieldValue("active", false) + .withFieldValue("address.city", "seattle") + .withFieldValue("address.street", "fake street") + .withFieldValue("address.zip_code", "TO-1234") + .withFieldValue("address.state", "wa") + .build(); + private static final SerializableFunction<Row, byte[]> INPUT_MAPPER = + ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, "com.test.proto.MyMessage"); + + private static final byte[] INPUT_SOURCE = INPUT_MAPPER.apply(INPUT_ROW); + + private static final List<SequencedMessage> INPUT_MESSAGES = + Collections.singletonList( + SequencedMessage.newBuilder() + .setMessage( + PubSubMessage.newBuilder() + .setData(ByteString.copyFrom(INPUT_SOURCE)) + .putAllAttributes(ATTRIBUTE_VALUES_MAP) + .build()) + .build()); + final SerializableFunction<byte[], Row> valueMapper = JsonUtils.getJsonBytesToRowFunction(BEAM_SCHEMA); @@ -471,4 +533,31 @@ public class PubsubLiteDlqTest { p.run().waitUntilFinish(); } + + @Test + public void testPubSubLiteErrorFnReadProto() { + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + + List<String> attributes = new ArrayList<>(); + String attributesMap = ""; + Schema beamAttributeSchema = + PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes( + BEAM_PROTO_SCHEMA, attributes, attributesMap); + + SerializableFunction<byte[], Row> protoValueMapper = + ProtoByteUtils.getProtoBytesToRowFromSchemaFunction( + PROTO_STRING_SCHEMA, "com.test.proto.MyMessage"); + + PCollection<SequencedMessage> input = p.apply(Create.of(INPUT_MESSAGES)); + PCollectionTuple output = + input.apply( + ParDo.of(new ErrorFn("Read-Error-Counter", protoValueMapper, errorSchema, Boolean.TRUE)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema); + output.get(ERROR_TAG).setRowSchema(errorSchema); + + PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(INPUT_ROW); + p.run().waitUntilFinish(); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java index eac0e4264c1..5afa4b7e516 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider; import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider.ErrorCounterFn; import org.apache.beam.sdk.schemas.Schema; @@ -111,6 +112,14 @@ public class PubsubLiteWriteDlqTest { .withFieldValue("key2", "second_key") .build()); + private static final String PROTO_STRING_SCHEMA = + "syntax = \"proto3\";\n" + + "package com.test.proto;" + + "\n" + + "message MyMessage {\n" + + " string name = 1;\n" + + "}"; + private static final Map<String, AttributeValues> ATTRIBUTE_VALUES_MAP = new HashMap<>(); static { @@ -239,4 +248,26 @@ public class PubsubLiteWriteDlqTest { PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L)); p.run().waitUntilFinish(); } + + @Test + public void testPubsubLiteErrorFnSuccessProto() { + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + + SerializableFunction<Row, byte[]> valueMapperProto = + ProtoByteUtils.getRowToProtoBytesFromSchema( + PROTO_STRING_SCHEMA, "com.test.proto.MyMessage"); + + PCollection<Row> input = p.apply(Create.of(ROWS)); + PCollectionTuple output = + input.apply( + ParDo.of( + new ErrorCounterFn("ErrorCounter", valueMapperProto, errorSchema, Boolean.TRUE)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + output.get(ERROR_TAG).setRowSchema(errorSchema); + + PAssert.that(output.get(OUTPUT_TAG).apply(Count.globally())) + .containsInAnyOrder(Collections.singletonList(3L)); + p.run().waitUntilFinish(); + } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 4d26ce96b67..d63729f1676 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -71,6 +71,7 @@ 'topic': 'topic' 'bootstrap_servers': 'bootstrapServers' 'producer_config_updates': 'producerConfigUpdates' + 'error_handling': 'errorHandling' 'file_descriptor_path': 'fileDescriptorPath' 'message_name': 'messageName' 'schema': 'schema' @@ -97,6 +98,9 @@ 'attributes': 'attributes' 'attribute_map': 'attributeMap' 'attribute_id': 'attributeId' + 'error_handling': 'errorHandling' + 'file_descriptor_path': 'fileDescriptorPath' + 'message_name': 'messageName' 'WriteToPubSubLite': 'project': 'project' 'format': 'format' @@ -104,6 +108,10 @@ 'location': 'location' 'attributes': 'attributes' 'attribute_id': 'attributeId' + 'error_handling': 'errorHandling' + 'file_descriptor_path': 'fileDescriptorPath' + 'message_name': 'messageName' + 'schema': 'schema' underlying_provider: type: beamJar transforms: