This is an automated email from the ASF dual-hosted git repository.

austin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bdbbef9b54d [YAML] - PubSubLite proto (#30129)
bdbbef9b54d is described below

commit bdbbef9b54dea5f76a2836047f5c56645b6f0d10
Author: Ferran Fernández Garrido <ffernandez....@gmail.com>
AuthorDate: Thu Feb 1 19:07:37 2024 +0100

    [YAML] - PubSubLite proto (#30129)
    
    * [YAML] - PubSubLite proto
    
    * [YAML] - PubSubLite proto
---
 sdks/java/extensions/protobuf/build.gradle         |  1 +
 .../sdk/extensions/protobuf/ProtoByteUtils.java    | 79 +++++++++++++++++--
 .../extensions/protobuf/ProtoByteUtilsTest.java    | 72 +++++++++++++++++
 .../PubsubLiteReadSchemaTransformProvider.java     | 84 ++++++++++++++++----
 .../PubsubLiteWriteSchemaTransformProvider.java    | 51 ++++++++++++-
 .../gcp/pubsublite/internal/PubsubLiteDlqTest.java | 89 ++++++++++++++++++++++
 .../internal/PubsubLiteWriteDlqTest.java           | 31 ++++++++
 sdks/python/apache_beam/yaml/standard_io.yaml      |  8 ++
 8 files changed, 392 insertions(+), 23 deletions(-)

diff --git a/sdks/java/extensions/protobuf/build.gradle 
b/sdks/java/extensions/protobuf/build.gradle
index 1582492c293..088f5aca63c 100644
--- a/sdks/java/extensions/protobuf/build.gradle
+++ b/sdks/java/extensions/protobuf/build.gradle
@@ -38,6 +38,7 @@ dependencies {
   implementation library.java.commons_compress
   implementation library.java.slf4j_api
   implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation("com.google.cloud:google-cloud-storage:2.32.1")
   implementation library.java.protobuf_java
   implementation("com.squareup.wire:wire-schema-jvm:4.9.3")
   
implementation("io.apicurio:apicurio-registry-protobuf-schema-utilities:3.0.0.M2")
diff --git 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
index 02419ec0f61..dd73739246d 100644
--- 
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
+++ 
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
@@ -20,6 +20,9 @@ package org.apache.beam.sdk.extensions.protobuf;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
@@ -41,15 +44,15 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
 import org.apache.commons.compress.utils.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Utility class for working with Protocol Buffer (Proto) data in the context 
of Apache Beam. This
- * class provides methods to retrieve Beam Schemas from Proto messages, 
convert Proto bytes to Beam
- * Rows, and vice versa. It also includes utilities for handling Protocol 
Buffer schemas and related
- * file operations.
+ * Utility class for working with Protocol Buffer (Proto) data. This class 
provides methods to
+ * retrieve Beam Schemas from Proto messages, convert Proto bytes to Beam 
Rows, and vice versa. It
+ * also includes utilities for handling Protocol Buffer schemas and related 
file operations.
  *
  * <p>Users can utilize the methods in this class to facilitate the 
integration of Proto data
  * processing within Apache Beam pipelines, allowing for the seamless 
transformation of Proto
@@ -105,7 +108,11 @@ public class ProtoByteUtils {
     try {
       Descriptors.FileDescriptor fileDescriptor =
           FileDescriptorUtils.protoFileToFileDescriptor(result);
-      return fileDescriptor.findMessageTypeByName(messageName);
+
+      List<String> messageElements = Splitter.on('.').splitToList(messageName);
+      String messageTypeByName = messageElements.get(messageElements.size() - 
1);
+
+      return fileDescriptor.findMessageTypeByName(messageTypeByName);
     } catch (Descriptors.DescriptorValidationException e) {
       throw new RuntimeException(e);
     }
@@ -148,10 +155,12 @@ public class ProtoByteUtils {
       @Override
       public Row apply(byte[] input) {
         try {
+          List<String> messageElements = 
Splitter.on('.').splitToList(messageName);
+          String messageTypeByName = 
messageElements.get(messageElements.size() - 1);
           final Descriptors.Descriptor descriptor =
               protoDomain
                   .getFileDescriptor(dynamicProtoDomain.getFileName())
-                  .findMessageTypeByName(messageName);
+                  .findMessageTypeByName(messageTypeByName);
           DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, 
input);
           SerializableFunction<DynamicMessage, Row> res =
               protoDynamicMessageSchema.getToRowFunction();
@@ -243,6 +252,41 @@ public class ProtoByteUtils {
    * @throws RuntimeException if an error occurs while finding or opening the 
file.
    */
   private static ReadableByteChannel getFileByteChannel(String filePath) {
+    if (isGcsPath(filePath)) {
+      return openGcsFile(filePath);
+    } else {
+      return openLocalFile(filePath);
+    }
+  }
+
+  private static boolean isGcsPath(String filePath) {
+    return filePath.startsWith("gs://");
+  }
+
+  /**
+   * Opens a ReadableByteChannel for reading from a Google Cloud Storage (GCS) 
file.
+   *
+   * @param filePath The GCS file path (e.g., 
"gs://your-bucket-name/your-object-name").
+   * @return A ReadableByteChannel for reading from the specified GCS file.
+   */
+  private static ReadableByteChannel openGcsFile(String filePath) {
+    Storage storage = StorageOptions.getDefaultInstance().getService();
+    String bucketName = getBucketName(filePath);
+    String objectName = getObjectName(filePath);
+    Blob blob = storage.get(bucketName, objectName);
+    return blob.reader();
+  }
+
+  /**
+   * Opens a ReadableByteChannel for reading from a local file using the 
Apache Beam FileSystems
+   * API.
+   *
+   * @param filePath The local file path.
+   * @return A ReadableByteChannel for reading from the specified local file.
+   * @throws IllegalArgumentException If no files match the specified pattern 
or if more than one
+   *     file matches.
+   */
+  private static ReadableByteChannel openLocalFile(String filePath) {
     try {
       MatchResult result = FileSystems.match(filePath);
       checkArgument(
@@ -259,6 +303,29 @@ public class ProtoByteUtils {
     }
   }
 
+  /**
+   * Extracts the bucket name from a Google Cloud Storage (GCS) file path.
+   *
+   * @param gcsPath The GCS file path (e.g., 
"gs://your-bucket-name/your-object-name").
+   * @return The bucket name extracted from the GCS path.
+   */
+  private static String getBucketName(String gcsPath) {
+    int startIndex = "gs://".length();
+    int endIndex = gcsPath.indexOf('/', startIndex);
+    return gcsPath.substring(startIndex, endIndex);
+  }
+
+  /**
+   * Extracts the object name from a Google Cloud Storage (GCS) file path.
+   *
+   * @param gcsPath The GCS file path (e.g., 
"gs://your-bucket-name/your-object-name").
+   * @return The object name extracted from the GCS path.
+   */
+  private static String getObjectName(String gcsPath) {
+    int startIndex = gcsPath.indexOf('/', "gs://".length()) + 1;
+    return gcsPath.substring(startIndex);
+  }
+
   /**
    * Represents metadata associated with a Protocol Buffer schema, including 
the File Name and
    * ProtoDomain.
diff --git 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java
 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java
index 04bcde6a0fe..6105208d836 100644
--- 
a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java
+++ 
b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtilsTest.java
@@ -48,6 +48,26 @@ public class ProtoByteUtilsTest {
           + "  Address address = 4;\n"
           + "}";
 
+  private static final String PROTO_STRING_PACKAGE_SCHEMA =
+      "syntax = \"proto3\";\n"
+          + "package com.test.proto;"
+          + "\n"
+          + "message MyMessage {\n"
+          + "  int32 id = 1;\n"
+          + "  string name = 2;\n"
+          + "  bool active = 3;\n"
+          + "\n"
+          + "  // Nested field\n"
+          + "  message Address {\n"
+          + "    string street = 1;\n"
+          + "    string city = 2;\n"
+          + "    string state = 3;\n"
+          + "    string zip_code = 4;\n"
+          + "  }\n"
+          + "\n"
+          + "  Address address = 4;\n"
+          + "}";
+
   private static final String DESCRIPTOR_PATH =
       Objects.requireNonNull(
               ProtoByteUtilsTest.class.getResource(
@@ -84,6 +104,14 @@ public class ProtoByteUtilsTest {
     Assert.assertEquals(schema.getFieldNames(), SCHEMA.getFieldNames());
   }
 
+  @Test
+  public void testProtoSchemaWitPackageStringToBeamSchema() {
+    Schema schema =
+        ProtoByteUtils.getBeamSchemaFromProtoSchema(
+            PROTO_STRING_PACKAGE_SCHEMA, "com.test.proto.MyMessage");
+    Assert.assertEquals(schema.getFieldNames(), SCHEMA.getFieldNames());
+  }
+
   @Test
   public void testProtoBytesToRowFunctionGenerateSerializableFunction() {
     SerializableFunction<byte[], Row> protoBytesToRowFunction =
@@ -111,6 +139,22 @@ public class ProtoByteUtilsTest {
     protoBytesToRowFunction.apply(inputBytes);
   }
 
+  @Test
+  public void testProtoBytesToRowFunctionReturnsRowSuccess() {
+    // Create a proto bytes to row function
+    SerializableFunction<byte[], Row> protoBytesToRowFunction =
+        ProtoByteUtils.getProtoBytesToRowFunction(DESCRIPTOR_PATH, 
MESSAGE_NAME);
+
+    byte[] byteArray = {
+      8, -46, 9, 18, 3, 68, 111, 101, 34, 35, 10, 7, 115, 101, 97, 116, 116, 
108, 101, 18, 11, 102,
+      97, 107, 101, 32, 115, 116, 114, 101, 101, 116, 26, 2, 119, 97, 34, 7, 
84, 79, 45, 49, 50, 51,
+      52
+    };
+
+    Row row = protoBytesToRowFunction.apply(byteArray);
+    Assert.assertEquals("Doe", row.getValue("name"));
+  }
+
   @Test
   public void testRowToProtoFunction() {
     Row row =
@@ -144,4 +188,32 @@ public class ProtoByteUtilsTest {
     Assert.assertNotNull(
         ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, 
"MyMessage").apply(row));
   }
+
+  @Test
+  public void testRowToProtoSchemaWithPackageFunction() {
+    Row row =
+        Row.withSchema(SCHEMA)
+            .withFieldValue("id", 1234)
+            .withFieldValue("name", "Doe")
+            .withFieldValue("active", false)
+            .withFieldValue("address.city", "seattle")
+            .withFieldValue("address.street", "fake street")
+            .withFieldValue("address.zip_code", "TO-1234")
+            .withFieldValue("address.state", "wa")
+            .build();
+
+    byte[] byteArray = {
+      8, -46, 9, 18, 3, 68, 111, 101, 34, 35, 10, 7, 115, 101, 97, 116, 116, 
108, 101, 18, 11, 102,
+      97, 107, 101, 32, 115, 116, 114, 101, 101, 116, 26, 2, 119, 97, 34, 7, 
84, 79, 45, 49, 50, 51,
+      52
+    };
+
+    byte[] resultBytes =
+        ProtoByteUtils.getRowToProtoBytesFromSchema(
+                PROTO_STRING_PACKAGE_SCHEMA, "com.test.proto.MyMessage")
+            .apply(row);
+
+    Assert.assertNotNull(resultBytes);
+    Assert.assertArrayEquals(byteArray, resultBytes);
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
index 98c5f8a6b38..8afe730f32c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.function.Consumer;
 import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
 import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -77,7 +78,7 @@ public class PubsubLiteReadSchemaTransformProvider
   private static final Logger LOG =
       LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class);
 
-  public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON";
+  public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON,PROTO";
   public static final Set<String> VALID_DATA_FORMATS =
       Sets.newHashSet(VALID_FORMATS_STR.split(","));
 
@@ -207,26 +208,39 @@ public class PubsubLiteReadSchemaTransformProvider
     Schema beamSchema;
 
     if (format != null && format.equals("RAW")) {
-      if (inputSchema != null) {
-        throw new IllegalArgumentException(
-            "To read from PubSubLite in RAW format, you can't provide a 
schema.");
-      }
+
       beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
       valueMapper = getRawBytesToRowFunction(beamSchema);
 
+    } else if (format != null && format.equals("PROTO")) {
+      String fileDescriptorPath = configuration.getFileDescriptorPath();
+      String messageName = configuration.getMessageName();
+
+      if (fileDescriptorPath != null && messageName != null) {
+        beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, 
messageName);
+        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
+      } else if (inputSchema != null && messageName != null) {
+        beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, 
messageName);
+        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
+      } else {
+        throw new IllegalArgumentException(
+            "To read from PubSubLite in PROTO format, either descriptorPath or 
schema must be provided.");
+      }
+
     } else {
-      if (inputSchema == null) {
+      if (inputSchema != null) {
+        beamSchema =
+            Objects.equals(configuration.getFormat(), "JSON")
+                ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
+                : AvroUtils.toBeamSchema(new 
org.apache.avro.Schema.Parser().parse(inputSchema));
+        valueMapper =
+            Objects.equals(configuration.getFormat(), "JSON")
+                ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+                : AvroUtils.getAvroBytesToRowFunction(beamSchema);
+      } else {
         throw new IllegalArgumentException(
-            "To read from PubSubLite in JSON or AVRO format, you must provide 
a schema.");
+            "To read from Pubsub Lite in JSON or AVRO format, you must provide 
a schema.");
       }
-      beamSchema =
-          Objects.equals(configuration.getFormat(), "JSON")
-              ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
-              : AvroUtils.toBeamSchema(new 
org.apache.avro.Schema.Parser().parse(inputSchema));
-      valueMapper =
-          Objects.equals(configuration.getFormat(), "JSON")
-              ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
-              : AvroUtils.getAvroBytesToRowFunction(beamSchema);
     }
     return new SchemaTransform() {
       @Override
@@ -404,13 +418,33 @@ public class PubsubLiteReadSchemaTransformProvider
   @AutoValue
   @DefaultSchema(AutoValueSchema.class)
   public abstract static class PubsubLiteReadSchemaTransformConfiguration {
+
+    public void validate() {
+      final String dataFormat = this.getFormat();
+      assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
+          : "Valid data formats are " + VALID_DATA_FORMATS;
+
+      final String inputSchema = this.getSchema();
+      final String messageName = this.getMessageName();
+
+      if (dataFormat != null && dataFormat.equals("RAW")) {
+        assert inputSchema == null
+            : "To read from Pubsub Lite in RAW format, you can't provide a 
schema.";
+      }
+
+      if (dataFormat != null && dataFormat.equals("PROTO")) {
+        assert messageName != null
+            : "To read from Pubsub Lite in PROTO format, messageName must be 
provided.";
+      }
+    }
+
     @SchemaFieldDescription(
         "The encoding format for the data stored in Pubsub Lite. Valid options 
are: "
             + VALID_FORMATS_STR)
     public abstract String getFormat();
 
     @SchemaFieldDescription(
-        "The schema in which the data is encoded in the Kafka topic. "
+        "The schema in which the data is encoded in the Pubsub Lite topic. "
             + "For AVRO data, this is a schema defined with AVRO schema syntax 
"
             + "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). "
             + "For JSON data, this is a schema defined with JSON-schema syntax 
(https://json-schema.org/).")
@@ -459,6 +493,18 @@ public class PubsubLiteReadSchemaTransformProvider
             + "case, deduplication of the stream will be strictly best 
effort.")
     public abstract @Nullable String getAttributeId();
 
+    @SchemaFieldDescription(
+        "The path to the Protocol Buffer File Descriptor Set file. This file 
is used for schema"
+            + " definition and message serialization.")
+    @Nullable
+    public abstract String getFileDescriptorPath();
+
+    @SchemaFieldDescription(
+        "The name of the Protocol Buffer message to be used for schema"
+            + " extraction and data conversion.")
+    @Nullable
+    public abstract String getMessageName();
+
     public static Builder builder() {
       return new 
AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration
           .Builder();
@@ -486,6 +532,12 @@ public class PubsubLiteReadSchemaTransformProvider
       @SuppressWarnings("unused")
       public abstract Builder setAttributeId(String attributeId);
 
+      @SuppressWarnings("unused")
+      public abstract Builder setFileDescriptorPath(String fileDescriptorPath);
+
+      @SuppressWarnings("unused")
+      public abstract Builder setMessageName(String messageName);
+
       public abstract PubsubLiteReadSchemaTransformConfiguration build();
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
index c669f84f330..8ba8176035d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
 import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -71,7 +72,7 @@ public class PubsubLiteWriteSchemaTransformProvider
     extends TypedSchemaTransformProvider<
         
PubsubLiteWriteSchemaTransformProvider.PubsubLiteWriteSchemaTransformConfiguration>
 {
 
-  public static final String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO";
+  public static final String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO,PROTO";
   public static final Set<String> SUPPORTED_FORMATS =
       Sets.newHashSet(SUPPORTED_FORMATS_STR.split(","));
   public static final TupleTag<PubSubMessage> OUTPUT_TAG = new 
TupleTag<PubSubMessage>() {};
@@ -211,6 +212,19 @@ public class PubsubLiteWriteSchemaTransformProvider
                 "The input schema must have exactly one field of type byte.");
           }
           toBytesFn = 
getRowToRawBytesFunction(inputSchema.getField(0).getName());
+        } else if (configuration.getFormat().equals("PROTO")) {
+          String descriptorPath = configuration.getFileDescriptorPath();
+          String schema = configuration.getSchema();
+          String messageName = configuration.getMessageName();
+
+          if (descriptorPath != null && messageName != null) {
+            toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath, 
messageName);
+          } else if (schema != null && messageName != null) {
+            toBytesFn = ProtoByteUtils.getRowToProtoBytesFromSchema(schema, 
messageName);
+          } else {
+            throw new IllegalArgumentException(
+                "At least a descriptorPath or a PROTO schema is required.");
+          }
         } else if (configuration.getFormat().equals("JSON")) {
           toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
         } else {
@@ -322,6 +336,20 @@ public class PubsubLiteWriteSchemaTransformProvider
   @AutoValue
   @DefaultSchema(AutoValueSchema.class)
   public abstract static class PubsubLiteWriteSchemaTransformConfiguration {
+
+    public void validate() {
+      final String dataFormat = this.getFormat();
+      final String inputSchema = this.getSchema();
+      final String messageName = this.getMessageName();
+      final String descriptorPath = this.getFileDescriptorPath();
+
+      if (dataFormat != null && dataFormat.equals("PROTO")) {
+        assert messageName != null : "Expecting messageName to be non-null.";
+        assert descriptorPath != null && inputSchema != null
+            : "You must include a descriptorPath or a PROTO schema but not 
both.";
+      }
+    }
+
     @SchemaFieldDescription(
         "The GCP project where the Pubsub Lite reservation resides. This can 
be a "
             + "project number of a project ID.")
@@ -358,6 +386,18 @@ public class PubsubLiteWriteSchemaTransformProvider
             + "in a ReadFromPubSubLite PTransform to deduplicate messages.")
     public abstract @Nullable String getAttributeId();
 
+    @SchemaFieldDescription(
+        "The path to the Protocol Buffer File Descriptor Set file. This file 
is used for schema"
+            + " definition and message serialization.")
+    public abstract @Nullable String getFileDescriptorPath();
+
+    @SchemaFieldDescription(
+        "The name of the Protocol Buffer message to be used for schema"
+            + " extraction and data conversion.")
+    public abstract @Nullable String getMessageName();
+
+    public abstract @Nullable String getSchema();
+
     public static Builder builder() {
       return new 
AutoValue_PubsubLiteWriteSchemaTransformProvider_PubsubLiteWriteSchemaTransformConfiguration
           .Builder();
@@ -380,6 +420,15 @@ public class PubsubLiteWriteSchemaTransformProvider
       @SuppressWarnings("unused")
       public abstract Builder setAttributeId(String attributeId);
 
+      @SuppressWarnings("unused")
+      public abstract Builder setFileDescriptorPath(String fileDescriptorPath);
+
+      @SuppressWarnings("unused")
+      public abstract Builder setMessageName(String messageName);
+
+      @SuppressWarnings("unused")
+      public abstract Builder setSchema(String schema);
+
       public abstract PubsubLiteWriteSchemaTransformConfiguration build();
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
index e23536d800e..4acf0a1149e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
 import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
 import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider;
 import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.ErrorFn;
@@ -252,6 +253,67 @@ public class PubsubLiteDlqTest {
                       .build())
               .build());
 
+  private static final String PROTO_STRING_SCHEMA =
+      "syntax = \"proto3\";\n"
+          + "package com.test.proto;"
+          + "\n"
+          + "message MyMessage {\n"
+          + "  int32 id = 1;\n"
+          + "  string name = 2;\n"
+          + "  bool active = 3;\n"
+          + "\n"
+          + "  // Nested field\n"
+          + "  message Address {\n"
+          + "    string street = 1;\n"
+          + "    string city = 2;\n"
+          + "    string state = 3;\n"
+          + "    string zip_code = 4;\n"
+          + "  }\n"
+          + "\n"
+          + "  Address address = 4;\n"
+          + "}";
+
+  private static final Schema BEAM_PROTO_SCHEMA =
+      Schema.builder()
+          .addField("id", Schema.FieldType.INT32)
+          .addField("name", Schema.FieldType.STRING)
+          .addField("active", Schema.FieldType.BOOLEAN)
+          .addField(
+              "address",
+              Schema.FieldType.row(
+                  Schema.builder()
+                      .addField("city", Schema.FieldType.STRING)
+                      .addField("street", Schema.FieldType.STRING)
+                      .addField("state", Schema.FieldType.STRING)
+                      .addField("zip_code", Schema.FieldType.STRING)
+                      .build()))
+          .build();
+
+  private static final Row INPUT_ROW =
+      Row.withSchema(BEAM_PROTO_SCHEMA)
+          .withFieldValue("id", 1234)
+          .withFieldValue("name", "Doe")
+          .withFieldValue("active", false)
+          .withFieldValue("address.city", "seattle")
+          .withFieldValue("address.street", "fake street")
+          .withFieldValue("address.zip_code", "TO-1234")
+          .withFieldValue("address.state", "wa")
+          .build();
+  private static final SerializableFunction<Row, byte[]> INPUT_MAPPER =
+      ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, 
"com.test.proto.MyMessage");
+
+  private static final byte[] INPUT_SOURCE = INPUT_MAPPER.apply(INPUT_ROW);
+
+  private static final List<SequencedMessage> INPUT_MESSAGES =
+      Collections.singletonList(
+          SequencedMessage.newBuilder()
+              .setMessage(
+                  PubSubMessage.newBuilder()
+                      .setData(ByteString.copyFrom(INPUT_SOURCE))
+                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
+                      .build())
+              .build());
+
   final SerializableFunction<byte[], Row> valueMapper =
       JsonUtils.getJsonBytesToRowFunction(BEAM_SCHEMA);
 
@@ -471,4 +533,31 @@ public class PubsubLiteDlqTest {
 
     p.run().waitUntilFinish();
   }
+
+  @Test
+  public void testPubSubLiteErrorFnReadProto() {
+    Schema errorSchema = ErrorHandling.errorSchemaBytes();
+
+    List<String> attributes = new ArrayList<>();
+    String attributesMap = "";
+    Schema beamAttributeSchema =
+        PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
+            BEAM_PROTO_SCHEMA, attributes, attributesMap);
+
+    SerializableFunction<byte[], Row> protoValueMapper =
+        ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(
+            PROTO_STRING_SCHEMA, "com.test.proto.MyMessage");
+
+    PCollection<SequencedMessage> input = p.apply(Create.of(INPUT_MESSAGES));
+    PCollectionTuple output =
+        input.apply(
+            ParDo.of(new ErrorFn("Read-Error-Counter", protoValueMapper, 
errorSchema, Boolean.TRUE))
+                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
+    output.get(ERROR_TAG).setRowSchema(errorSchema);
+
+    PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(INPUT_ROW);
+    p.run().waitUntilFinish();
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
index eac0e4264c1..5afa4b7e516 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
 import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider;
 import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider.ErrorCounterFn;
 import org.apache.beam.sdk.schemas.Schema;
@@ -111,6 +112,14 @@ public class PubsubLiteWriteDlqTest {
               .withFieldValue("key2", "second_key")
               .build());
 
+  private static final String PROTO_STRING_SCHEMA =
+      "syntax = \"proto3\";\n"
+          + "package com.test.proto;"
+          + "\n"
+          + "message MyMessage {\n"
+          + " string name = 1;\n"
+          + "}";
+
   private static final Map<String, AttributeValues> ATTRIBUTE_VALUES_MAP = new 
HashMap<>();
 
   static {
@@ -239,4 +248,26 @@ public class PubsubLiteWriteDlqTest {
     PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L));
     p.run().waitUntilFinish();
   }
+
+  @Test
+  public void testPubsubLiteErrorFnSuccessProto() {
+    Schema errorSchema = ErrorHandling.errorSchemaBytes();
+
+    SerializableFunction<Row, byte[]> valueMapperProto =
+        ProtoByteUtils.getRowToProtoBytesFromSchema(
+            PROTO_STRING_SCHEMA, "com.test.proto.MyMessage");
+
+    PCollection<Row> input = p.apply(Create.of(ROWS));
+    PCollectionTuple output =
+        input.apply(
+            ParDo.of(
+                    new ErrorCounterFn("ErrorCounter", valueMapperProto, 
errorSchema, Boolean.TRUE))
+                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+    output.get(ERROR_TAG).setRowSchema(errorSchema);
+
+    PAssert.that(output.get(OUTPUT_TAG).apply(Count.globally()))
+        .containsInAnyOrder(Collections.singletonList(3L));
+    p.run().waitUntilFinish();
+  }
 }
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml 
b/sdks/python/apache_beam/yaml/standard_io.yaml
index 4d26ce96b67..d63729f1676 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -71,6 +71,7 @@
         'topic': 'topic'
         'bootstrap_servers': 'bootstrapServers'
         'producer_config_updates': 'producerConfigUpdates'
+        'error_handling': 'errorHandling'
         'file_descriptor_path': 'fileDescriptorPath'
         'message_name': 'messageName'
         'schema': 'schema'
@@ -97,6 +98,9 @@
         'attributes': 'attributes'
         'attribute_map': 'attributeMap'
         'attribute_id': 'attributeId'
+        'error_handling': 'errorHandling'
+        'file_descriptor_path': 'fileDescriptorPath'
+        'message_name': 'messageName'
       'WriteToPubSubLite':
         'project': 'project'
         'format': 'format'
@@ -104,6 +108,10 @@
         'location': 'location'
         'attributes': 'attributes'
         'attribute_id': 'attributeId'
+        'error_handling': 'errorHandling'
+        'file_descriptor_path': 'fileDescriptorPath'
+        'message_name': 'messageName'
+        'schema': 'schema'
     underlying_provider:
       type: beamJar
       transforms:

Reply via email to