This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c490304040a Moving Pubsub Transforms to beam (#27366)
c490304040a is described below

commit c490304040a3cf5782bda412a8395dda0fb63554
Author: Dip Patel <37777672+dippate...@users.noreply.github.com>
AuthorDate: Tue Jul 18 08:59:41 2023 -0500

    Moving Pubsub Transforms to beam (#27366)
---
 .../PubsubReadSchemaTransformConfiguration.java    | 160 ++---
 .../pubsub/PubsubReadSchemaTransformProvider.java  | 329 ++++-----
 .../PubsubSchemaTransformMessageToRowFactory.java  | 179 -----
 .../PubsubWriteSchemaTransformConfiguration.java   | 174 +----
 .../pubsub/PubsubWriteSchemaTransformProvider.java | 447 +++---------
 .../PubsubReadSchemaTransformProviderTest.java     | 419 ++++-------
 ...bsubSchemaTransformMessageToRowFactoryTest.java | 337 ---------
 .../PubsubWriteSchemaTransformProviderIT.java      | 180 -----
 .../PubsubWriteSchemaTransformProviderTest.java    | 786 ---------------------
 9 files changed, 459 insertions(+), 2552 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
index f663f60f09b..befb22ca6dc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
+import com.google.api.client.util.Clock;
 import com.google.auto.value.AutoValue;
 import javax.annotation.Nullable;
+import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 
 /**
  * Configuration for reading from Pub/Sub.
@@ -33,137 +35,57 @@ import 
org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 @DefaultSchema(AutoValueSchema.class)
 @AutoValue
 public abstract class PubsubReadSchemaTransformConfiguration {
+  @SchemaFieldDescription(
+      "The name of the topic to consume data from. If a topic is specified, "
+          + " will create a new subscription for that topic and start 
consuming from that point. "
+          + "Either a topic or a subscription must be provided. "
+          + "Format: projects/${PROJECT}/topics/${TOPIC}")
+  public abstract @Nullable String getTopic();
+
+  @SchemaFieldDescription(
+      "The name of the subscription to consume data. "
+          + "Either a topic or subscription must be provided. "
+          + "Format: projects/${PROJECT}/subscriptions/${SUBSCRIPTION}")
+  public abstract @Nullable String getSubscription();
+
+  @SchemaFieldDescription(
+      "The encoding format for the data stored in Pubsub. Valid options are: "
+          + PubsubReadSchemaTransformProvider.VALID_FORMATS_STR)
+  public abstract String getFormat(); // AVRO, JSON
+
+  @SchemaFieldDescription(
+      "The schema in which the data is encoded in the Pubsub topic. "
+          + "For AVRO data, this is a schema defined with AVRO schema syntax "
+          + "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). "
+          + "For JSON data, this is a schema defined with JSON-schema syntax 
(https://json-schema.org/).")
+  public abstract String getSchema();
+
+  // Used for testing only.
+  public abstract @Nullable PubsubTestClientFactory getClientFactory();
+
+  // Used for testing only.
+  public abstract @Nullable Clock getClock();
 
-  /** Instantiates a {@link PubsubReadSchemaTransformConfiguration.Builder}. */
   public static Builder builder() {
     return new AutoValue_PubsubReadSchemaTransformConfiguration.Builder();
   }
 
-  /** The expected schema of the Pub/Sub message. */
-  public abstract Schema getDataSchema();
-
-  /**
-   * The Pub/Sub topic path to write failures.
-   *
-   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on 
the format of the dead
-   * letter queue topic string.
-   */
-  @Nullable
-  public abstract String getDeadLetterQueue();
-
-  /**
-   * The expected format of the Pub/Sub message.
-   *
-   * <p>Used to retrieve the {@link 
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
-   * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
-   */
-  @Nullable
-  public abstract String getFormat();
-
-  /** Used by the ProtoPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
-  @Nullable
-  public abstract String getProtoClass();
-
-  /**
-   * The subscription from which to read Pub/Sub messages.
-   *
-   * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more 
details on the format of
-   * the subscription string.
-   */
-  @Nullable
-  public abstract String getSubscription();
-
-  /** Used by the ThriftPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
-  @Nullable
-  public abstract String getThriftClass();
-
-  /** Used by the ThriftPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
-  @Nullable
-  public abstract String getThriftProtocolFactoryClass();
-
-  /**
-   * When reading from Cloud Pub/Sub where record timestamps are provided as 
Pub/Sub message
-   * attributes, specifies the name of the attribute that contains the 
timestamp.
-   */
-  @Nullable
-  public abstract String getTimestampAttribute();
-
-  /**
-   * When reading from Cloud Pub/Sub where unique record identifiers are 
provided as Pub/Sub message
-   * attributes, specifies the name of the attribute containing the unique 
identifier.
-   */
-  @Nullable
-  public abstract String getIdAttribute();
-
-  /**
-   * The topic from which to read Pub/Sub messages.
-   *
-   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on 
the format of the
-   * topic string.
-   */
-  @Nullable
-  public abstract String getTopic();
-
   @AutoValue.Builder
   public abstract static class Builder {
+    public abstract Builder setTopic(@Nullable String topic);
 
-    /** The expected schema of the Pub/Sub message. */
-    public abstract Builder setDataSchema(Schema value);
-
-    /**
-     * The Pub/Sub topic path to write failures.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details 
on the format of the
-     * dead letter queue topic string.
-     */
-    public abstract Builder setDeadLetterQueue(String value);
-
-    /**
-     * The expected format of the Pub/Sub message.
-     *
-     * <p>Used to retrieve the {@link 
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer}
-     * from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
-     */
-    public abstract Builder setFormat(String value);
-
-    /** Used by the ProtoPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
-    public abstract Builder setProtoClass(String value);
-
-    /**
-     * The subscription from which to read Pub/Sub messages.
-     *
-     * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more 
details on the format of
-     * the subscription string.
-     */
-    public abstract Builder setSubscription(String value);
-
-    /** Used by the ThriftPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
-    public abstract Builder setThriftClass(String value);
+    public abstract Builder setSubscription(@Nullable String subscription);
 
-    /** Used by the ThriftPayloadSerializerProvider when serializing from a 
Pub/Sub message. */
-    public abstract Builder setThriftProtocolFactoryClass(String value);
+    public abstract Builder setFormat(String format);
 
-    /**
-     * When reading from Cloud Pub/Sub where record timestamps are provided as 
Pub/Sub message
-     * attributes, specifies the name of the attribute that contains the 
timestamp.
-     */
-    public abstract Builder setTimestampAttribute(String value);
+    public abstract Builder setSchema(String schema);
 
-    /**
-     * When reading from Cloud Pub/Sub where unique record identifiers are 
provided as Pub/Sub
-     * message attributes, specifies the name of the attribute containing the 
unique identifier.
-     */
-    public abstract Builder setIdAttribute(String value);
+    // Used for testing only.
+    public abstract Builder setClientFactory(@Nullable PubsubTestClientFactory 
clientFactory);
 
-    /**
-     * The topic from which to read Pub/Sub messages.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details 
on the format of the
-     * topic string.
-     */
-    public abstract Builder setTopic(String value);
+    // Used for testing only.
+    public abstract Builder setClock(@Nullable Clock clock);
 
-    /** Builds a {@link PubsubReadSchemaTransformConfiguration} instance. */
     public abstract PubsubReadSchemaTransformConfiguration build();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
index cec07dafef4..c0e8880d028 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
@@ -17,23 +17,39 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
-
 import com.google.api.client.util.Clock;
 import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import java.util.Objects;
+import java.util.Set;
+import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
 
 /**
  * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads 
configured using
@@ -43,196 +59,191 @@ import 
org.checkerframework.checker.nullness.qual.Nullable;
  * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
  * repository.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
 @AutoService(SchemaTransformProvider.class)
 public class PubsubReadSchemaTransformProvider
     extends 
TypedSchemaTransformProvider<PubsubReadSchemaTransformConfiguration> {
-  static final String OUTPUT_TAG = "OUTPUT";
 
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<PubsubReadSchemaTransformConfiguration> configurationClass() 
{
-    return PubsubReadSchemaTransformConfiguration.class;
-  }
+  public static final String VALID_FORMATS_STR = "AVRO,JSON";
+  public static final Set<String> VALID_DATA_FORMATS =
+      Sets.newHashSet(VALID_FORMATS_STR.split(","));
 
-  /** Returns the expected {@link SchemaTransform} of the configuration. */
-  @Override
-  protected SchemaTransform from(PubsubReadSchemaTransformConfiguration 
configuration) {
-    PubsubMessageToRow toRowTransform =
-        
PubsubSchemaTransformMessageToRowFactory.from(configuration).buildMessageToRow();
-    return new PubsubReadSchemaTransform(configuration, toRowTransform);
-  }
+  public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+  public static final Schema ERROR_SCHEMA =
+      
Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
 
-  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
   @Override
-  public String identifier() {
-    return "beam:schematransform:org.apache.beam:pubsub_read:v1";
-  }
-
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
-   * no input is expected, this returns an empty list.
-   */
-  @Override
-  public List<String> inputCollectionNames() {
-    return Collections.emptyList();
+  public Class<PubsubReadSchemaTransformConfiguration> configurationClass() {
+    return PubsubReadSchemaTransformConfiguration.class;
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
-   * a single output is expected, this returns a list with a single name.
-   */
   @Override
-  public List<String> outputCollectionNames() {
-    return Collections.singletonList(OUTPUT_TAG);
-  }
-
-  /**
-   * An implementation of {@link SchemaTransform} for Pub/Sub reads configured 
using {@link
-   * PubsubReadSchemaTransformConfiguration}.
-   */
-  static class PubsubReadSchemaTransform extends SchemaTransform {
+  public SchemaTransform from(PubsubReadSchemaTransformConfiguration 
configuration) {
+    if (configuration.getSubscription() == null && configuration.getTopic() == 
null) {
+      throw new IllegalArgumentException(
+          "To read from Pubsub, a subscription name or a topic name must be 
provided");
+    }
 
-    private final PubsubReadSchemaTransformConfiguration configuration;
-    private final PubsubMessageToRow pubsubMessageToRow;
+    if (configuration.getSubscription() != null && configuration.getTopic() != 
null) {
+      throw new IllegalArgumentException(
+          "To read from Pubsub, a subscription name or a topic name must be 
provided. Not both.");
+    }
 
-    private PubsubClient.PubsubClientFactory clientFactory;
+    if ((Strings.isNullOrEmpty(configuration.getSchema())
+            && !Strings.isNullOrEmpty(configuration.getFormat()))
+        || (!Strings.isNullOrEmpty(configuration.getSchema())
+            && Strings.isNullOrEmpty(configuration.getFormat()))) {
+      throw new IllegalArgumentException(
+          "A schema was provided without a data format (or viceversa). Please 
provide "
+              + "both of these parameters to read from Pubsub, or if you would 
like to use the Pubsub schema service,"
+              + " please leave both of these blank.");
+    }
 
-    private Clock clock;
+    Schema beamSchema;
+    SerializableFunction<byte[], Row> valueMapper;
 
-    private PubsubReadSchemaTransform(
-        PubsubReadSchemaTransformConfiguration configuration,
-        PubsubMessageToRow pubsubMessageToRow) {
-      this.configuration = configuration;
-      this.pubsubMessageToRow = pubsubMessageToRow;
+    if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Format %s not supported. Only supported formats are %s",
+              configuration.getFormat(), VALID_FORMATS_STR));
     }
-
-    /**
-     * Sets the {@link PubsubClient.PubsubClientFactory}.
-     *
-     * <p>Used for testing.
-     */
-    void setClientFactory(PubsubClient.PubsubClientFactory value) {
-      this.clientFactory = value;
+    beamSchema =
+        Objects.equals(configuration.getFormat(), "JSON")
+            ? JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema())
+            : AvroUtils.toBeamSchema(
+                new 
org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
+    valueMapper =
+        Objects.equals(configuration.getFormat(), "JSON")
+            ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+            : AvroUtils.getAvroBytesToRowFunction(beamSchema);
+
+    PubsubReadSchemaTransform transform =
+        new PubsubReadSchemaTransform(
+            configuration.getTopic(), configuration.getSubscription(), 
beamSchema, valueMapper);
+
+    if (configuration.getClientFactory() != null) {
+      transform.setClientFactory(configuration.getClientFactory());
+    }
+    if (configuration.getClock() != null) {
+      transform.setClock(configuration.getClock());
     }
 
-    /**
-     * Sets the {@link Clock}.
-     *
-     * <p>Used for testing.
-     */
-    void setClock(Clock clock) {
-      this.clock = clock;
+    return transform;
+  }
+
+  private static class PubsubReadSchemaTransform extends SchemaTransform 
implements Serializable {
+    final Schema beamSchema;
+    final SerializableFunction<byte[], Row> valueMapper;
+    final @Nullable String topic;
+    final @Nullable String subscription;
+    @Nullable PubsubTestClientFactory clientFactory;
+    @Nullable Clock clock;
+
+    PubsubReadSchemaTransform(
+        @Nullable String topic,
+        @Nullable String subscription,
+        Schema beamSchema,
+        SerializableFunction<byte[], Row> valueMapper) {
+      this.topic = topic;
+      this.subscription = subscription;
+      this.beamSchema = beamSchema;
+      this.valueMapper = valueMapper;
     }
 
-    /** Validates the {@link PubsubReadSchemaTransformConfiguration}. */
-    @Override
-    public void validate(@Nullable PipelineOptions options) {
-      if (configuration.getSubscription() == null && configuration.getTopic() 
== null) {
-        throw new IllegalArgumentException(
-            String.format(
-                "%s needs to set either the topic or the subscription",
-                PubsubReadSchemaTransformConfiguration.class));
-      }
+    private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> {
+      private Counter pubsubErrorCounter;
+      private Long errorsInBundle = 0L;
+      private SerializableFunction<byte[], Row> valueMapper;
 
-      if (configuration.getSubscription() != null && configuration.getTopic() 
!= null) {
-        throw new IllegalArgumentException(
-            String.format(
-                "%s should not set both the topic or the subscription",
-                PubsubReadSchemaTransformConfiguration.class));
+      ErrorCounterFn(String name, SerializableFunction<byte[], Row> 
valueMapper) {
+        this.pubsubErrorCounter = 
Metrics.counter(PubsubReadSchemaTransformProvider.class, name);
+        this.valueMapper = valueMapper;
       }
 
-      try {
-        PayloadSerializers.getSerializer(
-            configuration.getFormat(), configuration.getDataSchema(), new 
HashMap<>());
-      } catch (IllegalArgumentException e) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Invalid %s, no serializer provider exists for format `%s`",
-                PubsubReadSchemaTransformConfiguration.class, 
configuration.getFormat()));
+      @ProcessElement
+      public void process(@DoFn.Element PubsubMessage message, 
MultiOutputReceiver receiver) {
+
+        try {
+          
receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload()));
+        } catch (Exception e) {
+          errorsInBundle += 1;
+          receiver
+              .get(ERROR_TAG)
+              .output(
+                  Row.withSchema(ERROR_SCHEMA)
+                      .addValues(e.toString(), message.getPayload())
+                      .build());
+        }
       }
-    }
 
-    /** Reads from Pub/Sub according to {@link 
PubsubReadSchemaTransformConfiguration}. */
-    @Override
-    public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      if (!input.getAll().isEmpty()) {
-        throw new IllegalArgumentException(
-            String.format(
-                "%s %s input is expected to be empty",
-                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      @FinishBundle
+      public void finish(FinishBundleContext c) {
+        pubsubErrorCounter.inc(errorsInBundle);
+        errorsInBundle = 0L;
       }
-
-      PCollectionTuple rowsWithDlq =
-          input
-              .getPipeline()
-              .apply("ReadFromPubsub", buildPubsubRead())
-              .apply("PubsubMessageToRow", pubsubMessageToRow);
-
-      writeToDeadLetterQueue(rowsWithDlq);
-
-      return PCollectionRowTuple.of(OUTPUT_TAG, rowsWithDlq.get(MAIN_TAG));
     }
 
-    private void writeToDeadLetterQueue(PCollectionTuple rowsWithDlq) {
-      PubsubIO.Write<PubsubMessage> deadLetterQueue = 
buildDeadLetterQueueWrite();
-      if (deadLetterQueue == null) {
-        return;
-      }
-      rowsWithDlq.get(DLQ_TAG).apply("WriteToDeadLetterQueue", 
deadLetterQueue);
+    void setClientFactory(@Nullable PubsubTestClientFactory factory) {
+      this.clientFactory = factory;
     }
 
-    /**
-     * Builds {@link PubsubIO.Write} dead letter queue from {@link
-     * PubsubReadSchemaTransformConfiguration}.
-     */
-    PubsubIO.Write<PubsubMessage> buildDeadLetterQueueWrite() {
-      if (configuration.getDeadLetterQueue() == null) {
-        return null;
-      }
-
-      PubsubIO.Write<PubsubMessage> writeDlq =
-          PubsubIO.writeMessages().to(configuration.getDeadLetterQueue());
-
-      if (configuration.getTimestampAttribute() != null) {
-        writeDlq = 
writeDlq.withTimestampAttribute(configuration.getTimestampAttribute());
-      }
-
-      return writeDlq;
+    void setClock(@Nullable Clock clock) {
+      this.clock = clock;
     }
 
-    /** Builds {@link PubsubIO.Read} from a {@link 
PubsubReadSchemaTransformConfiguration}. */
+    @SuppressWarnings("nullness")
     PubsubIO.Read<PubsubMessage> buildPubsubRead() {
-      PubsubIO.Read<PubsubMessage> read = 
PubsubIO.readMessagesWithAttributes();
-
-      if (configuration.getSubscription() != null) {
-        read = read.fromSubscription(configuration.getSubscription());
+      PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages();
+      if (!Strings.isNullOrEmpty(topic)) {
+        pubsubRead = pubsubRead.fromTopic(topic);
+      } else {
+        pubsubRead = pubsubRead.fromSubscription(subscription);
       }
-
-      if (configuration.getTopic() != null) {
-        read = read.fromTopic(configuration.getTopic());
+      if (clientFactory != null && clock != null) {
+        pubsubRead = pubsubRead.withClientFactory(clientFactory);
+        pubsubRead = clientFactory.setClock(pubsubRead, clock);
+      } else if (clientFactory != null || clock != null) {
+        throw new IllegalArgumentException(
+            "Both PubsubTestClientFactory and Clock need to be specified for 
testing, but only one is provided");
       }
+      return pubsubRead;
+    }
 
-      if (configuration.getTimestampAttribute() != null) {
-        read = 
read.withTimestampAttribute(configuration.getTimestampAttribute());
-      }
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      PubsubIO.Read<PubsubMessage> pubsubRead = buildPubsubRead();
 
-      if (configuration.getIdAttribute() != null) {
-        read = read.withIdAttribute(configuration.getIdAttribute());
-      }
+      PCollectionTuple outputTuple =
+          input
+              .getPipeline()
+              .apply(pubsubRead)
+              .apply(
+                  ParDo.of(new ErrorCounterFn("PubSub-read-error-counter", 
valueMapper))
+                      .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+      return PCollectionRowTuple.of(
+          "output",
+          outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema),
+          "errors",
+          outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
+    }
+  }
 
-      if (clientFactory != null) {
-        read = read.withClientFactory(clientFactory);
-      }
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:pubsub_read:v1";
+  }
 
-      if (clock != null) {
-        read = read.withClock(clock);
-      }
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      inputCollectionNames() {
+    return Collections.emptyList();
+  }
 
-      return read;
-    }
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      outputCollectionNames() {
+    return Arrays.asList("output", "errors");
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
deleted file mode 100644
index 988c593e32f..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
-
-/**
- * Builds a {@link PubsubMessageToRow} from a {@link 
PubsubReadSchemaTransformConfiguration}.
- *
- * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
- * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
- * repository.
- */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
-class PubsubSchemaTransformMessageToRowFactory {
-  private static final String DEFAULT_FORMAT = "json";
-
-  private static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
-      Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), 
Schema.FieldType.STRING);
-  private static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
-      Schema.builder().addStringField("key").addStringField("value").build();
-  private static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
-      
Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
-
-  private static final String THRIFT_CLASS_KEY = "thriftClass";
-  private static final String THRIFT_PROTOCOL_FACTORY_CLASS_KEY = 
"thriftProtocolFactoryClass";
-  private static final String PROTO_CLASS_KEY = "protoClass";
-
-  /**
-   * Instantiate a {@link PubsubSchemaTransformMessageToRowFactory} from a 
{@link
-   * PubsubReadSchemaTransformConfiguration}.
-   */
-  static PubsubSchemaTransformMessageToRowFactory from(
-      PubsubReadSchemaTransformConfiguration configuration) {
-    return new PubsubSchemaTransformMessageToRowFactory(configuration);
-  }
-
-  /** Build the {@link PubsubMessageToRow}. */
-  PubsubMessageToRow buildMessageToRow() {
-    PubsubMessageToRow.Builder builder =
-        PubsubMessageToRow.builder()
-            .messageSchema(configuration.getDataSchema())
-            .useDlq(
-                configuration.getDeadLetterQueue() != null
-                    && !configuration.getDeadLetterQueue().isEmpty())
-            .useFlatSchema(!shouldUseNestedSchema());
-
-    if (needsSerializer()) {
-      builder = builder.serializerProvider(serializer());
-    }
-
-    return builder.build();
-  }
-
-  private final PubsubReadSchemaTransformConfiguration configuration;
-
-  private PubsubSchemaTransformMessageToRowFactory(
-      PubsubReadSchemaTransformConfiguration configuration) {
-    this.configuration = configuration;
-  }
-
-  private PayloadSerializer payloadSerializer() {
-    Schema schema = configuration.getDataSchema();
-    String format = DEFAULT_FORMAT;
-
-    if (configuration.getFormat() != null && 
!configuration.getFormat().isEmpty()) {
-      format = configuration.getFormat();
-    }
-
-    Map<String, Object> params = new HashMap<>();
-
-    if (configuration.getThriftClass() != null && 
!configuration.getThriftClass().isEmpty()) {
-      params.put(THRIFT_CLASS_KEY, configuration.getThriftClass());
-    }
-
-    if (configuration.getThriftProtocolFactoryClass() != null
-        && !configuration.getThriftProtocolFactoryClass().isEmpty()) {
-      params.put(THRIFT_PROTOCOL_FACTORY_CLASS_KEY, 
configuration.getThriftProtocolFactoryClass());
-    }
-
-    if (configuration.getProtoClass() != null && 
!configuration.getProtoClass().isEmpty()) {
-      params.put(PROTO_CLASS_KEY, configuration.getProtoClass());
-    }
-
-    return PayloadSerializers.getSerializer(format, schema, params);
-  }
-
-  PubsubMessageToRow.SerializerProvider serializer() {
-    return input -> payloadSerializer();
-  }
-
-  /**
-   * Determines whether the {@link PubsubMessageToRow} needs a {@link
-   * PubsubMessageToRow.SerializerProvider}.
-   *
-   * <p>The determination is based on {@link #shouldUseNestedSchema()} is 
false or if the {@link
-   * PubsubMessageToRow#PAYLOAD_FIELD} is not present.
-   */
-  boolean needsSerializer() {
-    return !shouldUseNestedSchema() || !fieldPresent(PAYLOAD_FIELD, 
Schema.FieldType.BYTES);
-  }
-
-  /**
-   * Determines whether a nested schema should be used for {@link
-   * PubsubReadSchemaTransformConfiguration#getDataSchema()}.
-   *
-   * <p>The determination is based on {@link #schemaHasValidPayloadField()} 
and {@link
-   * #schemaHasValidAttributesField()}}
-   */
-  boolean shouldUseNestedSchema() {
-    return schemaHasValidPayloadField() && schemaHasValidAttributesField();
-  }
-
-  /**
-   * Determines whether {@link 
PubsubReadSchemaTransformConfiguration#getDataSchema()} has a valid
-   * {@link PubsubMessageToRow#PAYLOAD_FIELD}.
-   */
-  boolean schemaHasValidPayloadField() {
-    Schema schema = configuration.getDataSchema();
-    if (!schema.hasField(PAYLOAD_FIELD)) {
-      return false;
-    }
-    if (fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES)) {
-      return true;
-    }
-    return schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(ROW);
-  }
-
-  /**
-   * Determines whether {@link 
PubsubReadSchemaTransformConfiguration#getDataSchema()} has a valid
-   * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} field.
-   *
-   * <p>The determination is based on whether {@link #fieldPresent(String, 
Schema.FieldType)} for
-   * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} is true for either {@link
-   * #ATTRIBUTE_MAP_FIELD_TYPE} or {@link #ATTRIBUTE_ARRAY_FIELD_TYPE} {@link 
Schema.FieldType}s.
-   */
-  boolean schemaHasValidAttributesField() {
-    return fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE)
-        || fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
-  }
-
-  /**
-   * Determines whether {@link 
PubsubReadSchemaTransformConfiguration#getDataSchema()} contains the
-   * field and whether that field is an expectedType {@link Schema.FieldType}.
-   */
-  boolean fieldPresent(String field, Schema.FieldType expectedType) {
-    Schema schema = configuration.getDataSchema();
-    return schema.hasField(field)
-        && expectedType.equivalent(
-            schema.getField(field).getType(), 
Schema.EquivalenceNullablePolicy.IGNORE);
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
index acaf04cdfc6..57620c968c5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.io.gcp.pubsub;
 
 import com.google.auto.value.AutoValue;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 
 /**
  * Configuration for writing to Pub/Sub.
@@ -32,179 +32,25 @@ import 
org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 @DefaultSchema(AutoValueSchema.class)
 @AutoValue
 public abstract class PubsubWriteSchemaTransformConfiguration {
+  @SchemaFieldDescription(
+      "The encoding format for the data stored in Pubsub. Valid options are: "
+          + PubsubWriteSchemaTransformProvider.VALID_FORMATS_STR)
+  public abstract String getFormat();
 
-  public static final String DEFAULT_TIMESTAMP_ATTRIBUTE = "event_timestamp";
+  @SchemaFieldDescription(
+      "The name of the topic to write data to. " + "Format: 
projects/${PROJECT}/topics/${TOPIC}")
+  public abstract String getTopic();
 
   public static Builder builder() {
     return new AutoValue_PubsubWriteSchemaTransformConfiguration.Builder();
   }
 
-  public static TargetConfiguration.Builder targetConfigurationBuilder() {
-    return new 
AutoValue_PubsubWriteSchemaTransformConfiguration_TargetConfiguration.Builder()
-        .setTimestampAttributeKey(DEFAULT_TIMESTAMP_ATTRIBUTE);
-  }
-
-  public static SourceConfiguration.Builder sourceConfigurationBuilder() {
-    return new 
AutoValue_PubsubWriteSchemaTransformConfiguration_SourceConfiguration.Builder();
-  }
-
-  /**
-   * Configuration details of the source {@link 
org.apache.beam.sdk.values.Row} {@link
-   * org.apache.beam.sdk.schemas.Schema}.
-   */
-  @Nullable
-  public abstract SourceConfiguration getSource();
-
-  /** Configuration details of the target {@link PubsubMessage}. */
-  public abstract TargetConfiguration getTarget();
-
-  /**
-   * The topic to which to write Pub/Sub messages.
-   *
-   * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on 
the format of the
-   * topic string.
-   */
-  public abstract String getTopic();
-
-  /**
-   * The expected format of the Pub/Sub message.
-   *
-   * <p>Used to retrieve the {@link 
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
-   * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See 
list of supported
-   * values by invoking {@link 
org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}.
-   *
-   * <pre>{@code 
Providers.loadProviders(PayloadSerializer.class).keySet()}</pre>
-   */
-  @Nullable
-  public abstract String getFormat();
-
-  /**
-   * When writing to Cloud Pub/Sub where unique record identifiers are 
provided as Pub/Sub message
-   * attributes, specifies the name of the attribute containing the unique 
identifier.
-   */
-  @Nullable
-  public abstract String getIdAttribute();
-
-  /** Builder for {@link PubsubWriteSchemaTransformConfiguration}. */
   @AutoValue.Builder
   public abstract static class Builder {
+    public abstract Builder setFormat(String format);
 
-    /**
-     * Configuration details of the source {@link 
org.apache.beam.sdk.values.Row} {@link
-     * org.apache.beam.sdk.schemas.Schema}.
-     */
-    public abstract Builder setSource(SourceConfiguration value);
-
-    /** Configuration details of the target {@link PubsubMessage}. */
-    public abstract Builder setTarget(TargetConfiguration value);
-
-    /**
-     * The topic to which to write Pub/Sub messages.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details 
on the format of the
-     * topic string.
-     */
-    public abstract Builder setTopic(String value);
-
-    /**
-     * The expected format of the Pub/Sub message.
-     *
-     * <p>Used to retrieve the {@link 
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer}
-     * from {@link 
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of
-     * supported values by invoking {@link
-     * org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}.
-     *
-     * <pre>{@code 
Providers.loadProviders(PayloadSerializer.class).keySet()}</pre>
-     */
-    public abstract Builder setFormat(String value);
-
-    /**
-     * When reading from Cloud Pub/Sub where unique record identifiers are 
provided as Pub/Sub
-     * message attributes, specifies the name of the attribute containing the 
unique identifier.
-     */
-    public abstract Builder setIdAttribute(String value);
+    public abstract Builder setTopic(String topic);
 
     public abstract PubsubWriteSchemaTransformConfiguration build();
   }
-
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class SourceConfiguration {
-    /**
-     * The attributes field name of the source {@link 
org.apache.beam.sdk.values.Row}. {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType} must be a 
<code>Map&lt;String, String&gt;
-     * </code>
-     */
-    @Nullable
-    public abstract String getAttributesFieldName();
-
-    /**
-     * The timestamp field name of the source {@link 
org.apache.beam.sdk.values.Row}. {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}.
-     */
-    @Nullable
-    public abstract String getTimestampFieldName();
-
-    /**
-     * The payload field name of the source {@link 
org.apache.beam.sdk.values.Row}. {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link
-     * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link
-     * org.apache.beam.sdk.values.Row}. If null, payload serialized from user 
fields other than
-     * attributes. Not compatible with other payload intended fields.
-     */
-    @Nullable
-    public abstract String getPayloadFieldName();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      /**
-       * The attributes field name of the source {@link 
org.apache.beam.sdk.values.Row}. {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType} must be a 
<code>Map&lt;String, String&gt;
-       * </code>
-       */
-      public abstract Builder setAttributesFieldName(String value);
-
-      /**
-       * The timestamp field name of the source {@link 
org.apache.beam.sdk.values.Row}. {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}.
-       */
-      public abstract Builder setTimestampFieldName(String value);
-
-      /**
-       * The payload field name of the source {@link 
org.apache.beam.sdk.values.Row}. {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link
-       * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link
-       * org.apache.beam.sdk.values.Row}. If null, payload serialized from 
user fields other than
-       * attributes. Not compatible with other payload intended fields.
-       */
-      public abstract Builder setPayloadFieldName(String value);
-
-      public abstract SourceConfiguration build();
-    }
-  }
-
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class TargetConfiguration {
-
-    /**
-     * The attribute key to assign the {@link PubsubMessage} stringified 
timestamp value. {@link
-     * #builder()} method defaults value to {@link 
#DEFAULT_TIMESTAMP_ATTRIBUTE}.
-     */
-    public abstract String getTimestampAttributeKey();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-
-      /**
-       * The attribute key to assign the {@link PubsubMessage} stringified 
timestamp value. Defaults
-       * to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}.
-       */
-      public abstract Builder setTimestampAttributeKey(String value);
-
-      public abstract TargetConfiguration build();
-    }
-  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
index 7f3f6f2c702..8e8b804801b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
@@ -17,56 +17,29 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME;
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ERROR;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE;
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.OUTPUT;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_ROW_TYPE_NAME;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.removeFields;
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.util.Clock;
 import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.ArrayList;
+import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.FieldMatcher;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.SchemaReflection;
-import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.SourceConfiguration;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
-import org.apache.beam.sdk.schemas.io.Providers;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
 
 /**
  * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads 
configured using
@@ -76,360 +49,102 @@ import org.joda.time.Instant;
  * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
  * repository.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
 @AutoService(SchemaTransformProvider.class)
 public class PubsubWriteSchemaTransformProvider
     extends 
TypedSchemaTransformProvider<PubsubWriteSchemaTransformConfiguration> {
-  private static final String IDENTIFIER = 
"beam:schematransform:org.apache.beam:pubsub_write:v1";
-  static final String INPUT_TAG = "input";
-  static final String ERROR_TAG = "error";
 
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<PubsubWriteSchemaTransformConfiguration> 
configurationClass() {
-    return PubsubWriteSchemaTransformConfiguration.class;
-  }
+  public static final TupleTag<PubsubMessage> OUTPUT_TAG = new 
TupleTag<PubsubMessage>() {};
+  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
 
-  /** Returns the expected {@link SchemaTransform} of the configuration. */
-  @Override
-  public SchemaTransform from(PubsubWriteSchemaTransformConfiguration 
configuration) {
-    return new PubsubWriteSchemaTransform(configuration);
-  }
-
-  /** Implementation of the {@link SchemaTransformProvider} identifier method. 
*/
-  @Override
-  public String identifier() {
-    return IDENTIFIER;
-  }
-
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
-   * single input is expected, this returns a list with a single name.
-   */
-  @Override
-  public List<String> inputCollectionNames() {
-    return Collections.singletonList(INPUT_TAG);
-  }
+  public static final String VALID_FORMATS_STR = "AVRO,JSON";
+  public static final Set<String> VALID_DATA_FORMATS =
+      Sets.newHashSet(VALID_FORMATS_STR.split(","));
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. The
-   * only expected output is the {@link #ERROR_TAG}.
-   */
   @Override
-  public List<String> outputCollectionNames() {
-    return Collections.singletonList(ERROR_TAG);
+  public Class<PubsubWriteSchemaTransformConfiguration> configurationClass() {
+    return PubsubWriteSchemaTransformConfiguration.class;
   }
 
-  /**
-   * An implementation of {@link SchemaTransform} for Pub/Sub writes 
configured using {@link
-   * PubsubWriteSchemaTransformConfiguration}.
-   */
-  static class PubsubWriteSchemaTransform extends SchemaTransform {
-
-    private final PubsubWriteSchemaTransformConfiguration configuration;
+  public static class ErrorFn extends DoFn<Row, PubsubMessage> {
+    private SerializableFunction<Row, byte[]> valueMapper;
+    private Schema errorSchema;
 
-    private PubsubClient.PubsubClientFactory pubsubClientFactory;
-
-    PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration 
configuration) {
-      this.configuration = configuration;
+    ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema) 
{
+      this.valueMapper = valueMapper;
+      this.errorSchema = errorSchema;
     }
 
-    PubsubWriteSchemaTransform 
withPubsubClientFactory(PubsubClient.PubsubClientFactory factory) {
-      this.pubsubClientFactory = factory;
-      return this;
-    }
-
-    @Override
-    public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      if (input.getAll().size() != 1 || !input.has(INPUT_TAG)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "%s %s input is expected to contain a single %s tagged 
PCollection<Row>",
-                input.getClass().getSimpleName(), getClass().getSimpleName(), 
INPUT_TAG));
-      }
-
-      PCollection<Row> rows = input.get(INPUT_TAG);
-      if (rows.getSchema().getFieldCount() == 0) {
-        throw new IllegalArgumentException(String.format("empty Schema for 
%s", INPUT_TAG));
-      }
-
-      Schema targetSchema = buildTargetSchema(rows.getSchema());
-
-      rows =
-          rows.apply(
-                  ConvertForRowToMessage.class.getSimpleName(),
-                  convertForRowToMessage(targetSchema))
-              .setRowSchema(targetSchema);
-
-      Schema schema = rows.getSchema();
-
-      Schema serializableSchema =
-          removeFields(schema, DEFAULT_ATTRIBUTES_KEY_NAME, 
DEFAULT_EVENT_TIMESTAMP_KEY_NAME);
-      FieldMatcher payloadRowMatcher = 
FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW);
-      if (payloadRowMatcher.match(serializableSchema)) {
-        serializableSchema =
-            
serializableSchema.getField(DEFAULT_PAYLOAD_KEY_NAME).getType().getRowSchema();
-      }
-
-      validateTargetSchemaAgainstPubsubSchema(serializableSchema, 
input.getPipeline().getOptions());
-
-      PCollectionTuple pct =
-          rows.apply(
-              PubsubRowToMessage.class.getSimpleName(),
-              buildPubsubRowToMessage(serializableSchema));
-
-      PCollection<PubsubMessage> messages = pct.get(OUTPUT);
-      messages.apply(PubsubIO.Write.class.getSimpleName(), buildPubsubWrite());
-      return PCollectionRowTuple.of(ERROR_TAG, pct.get(ERROR));
-    }
-
-    PayloadSerializer getPayloadSerializer(Schema schema) {
-      if (configuration.getFormat() == null) {
-        return null;
-      }
-      String format = configuration.getFormat();
-      Set<String> availableFormats =
-          Providers.loadProviders(PayloadSerializerProvider.class).keySet();
-      if (!availableFormats.contains(format)) {
-        String availableFormatsString = String.join(",", availableFormats);
-        throw new IllegalArgumentException(
-            String.format(
-                "%s is not among the valid formats: [%s]", format, 
availableFormatsString));
-      }
-      return PayloadSerializers.getSerializer(configuration.getFormat(), 
schema, ImmutableMap.of());
-    }
-
-    PubsubRowToMessage buildPubsubRowToMessage(Schema schema) {
-      PubsubRowToMessage.Builder builder =
-          
PubsubRowToMessage.builder().setPayloadSerializer(getPayloadSerializer(schema));
-
-      if (configuration.getTarget() != null) {
-        builder =
-            builder.setTargetTimestampAttributeName(
-                configuration.getTarget().getTimestampAttributeKey());
-      }
-
-      return builder.build();
-    }
-
-    PubsubIO.Write<PubsubMessage> buildPubsubWrite() {
-      PubsubIO.Write<PubsubMessage> write = 
PubsubIO.writeMessages().to(configuration.getTopic());
-
-      if (configuration.getIdAttribute() != null) {
-        write = write.withIdAttribute(configuration.getIdAttribute());
-      }
-
-      if (pubsubClientFactory != null) {
-        write = write.withClientFactory(pubsubClientFactory);
+    @ProcessElement
+    public void processElement(@Element Row row, MultiOutputReceiver receiver) 
{
+      try {
+        receiver.get(OUTPUT_TAG).output(new 
PubsubMessage(valueMapper.apply(row), null));
+      } catch (Exception e) {
+        receiver
+            .get(ERROR_TAG)
+            .output(Row.withSchema(errorSchema).addValues(e.toString(), 
row).build());
       }
-
-      return write;
     }
+  }
 
-    void validateSourceSchemaAgainstConfiguration(Schema sourceSchema) {
-      if (sourceSchema.getFieldCount() == 0) {
-        throw new IllegalArgumentException(String.format("empty Schema for 
%s", INPUT_TAG));
-      }
-
-      if (configuration.getSource() == null) {
-        return;
-      }
-
-      SourceConfiguration source = configuration.getSource();
-
-      if (source.getAttributesFieldName() != null) {
-        String fieldName = source.getAttributesFieldName();
-        FieldType fieldType = ATTRIBUTES_FIELD_TYPE;
-        FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType);
-        checkArgument(
-            fieldMatcher.match(sourceSchema),
-            String.format("schema missing field: %s for type %s: ", fieldName, 
fieldType));
-      }
-
-      if (source.getTimestampFieldName() != null) {
-        String fieldName = source.getTimestampFieldName();
-        FieldType fieldType = EVENT_TIMESTAMP_FIELD_TYPE;
-        FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType);
-        checkArgument(
-            fieldMatcher.match(sourceSchema),
-            String.format("schema missing field: %s for type: %s", fieldName, 
fieldType));
-      }
-
-      if (source.getPayloadFieldName() == null) {
-        return;
-      }
-
-      String fieldName = source.getPayloadFieldName();
-      FieldMatcher bytesFieldMatcher = FieldMatcher.of(fieldName, 
PAYLOAD_BYTES_TYPE_NAME);
-      FieldMatcher rowFieldMatcher = FieldMatcher.of(fieldName, 
PAYLOAD_ROW_TYPE_NAME);
-      SchemaReflection schemaReflection = SchemaReflection.of(sourceSchema);
-      checkArgument(
-          schemaReflection.matchesAny(bytesFieldMatcher, rowFieldMatcher),
+  @Override
+  public SchemaTransform from(PubsubWriteSchemaTransformConfiguration 
configuration) {
+    if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+      throw new IllegalArgumentException(
           String.format(
-              "schema missing field: %s for types %s or %s",
-              fieldName, PAYLOAD_BYTES_TYPE_NAME, PAYLOAD_ROW_TYPE_NAME));
-
-      String[] fieldsToExclude =
-          Stream.of(
-                  source.getAttributesFieldName(),
-                  source.getTimestampFieldName(),
-                  source.getPayloadFieldName())
-              .filter(Objects::nonNull)
-              .toArray(String[]::new);
-
-      Schema userFieldsSchema = removeFields(sourceSchema, fieldsToExclude);
-
-      if (userFieldsSchema.getFieldCount() > 0) {
-        throw new IllegalArgumentException(
-            String.format("user fields incompatible with %s field", 
source.getPayloadFieldName()));
-      }
-    }
-
-    void validateTargetSchemaAgainstPubsubSchema(Schema targetSchema, 
PipelineOptions options) {
-      checkArgument(options != null);
-
-      try (PubsubClient pubsubClient = 
getPubsubClient(options.as(PubsubOptions.class))) {
-        PubsubClient.TopicPath topicPath = 
PubsubClient.topicPathFromPath(configuration.getTopic());
-        PubsubClient.SchemaPath schemaPath = 
pubsubClient.getSchemaPath(topicPath);
-        if (schemaPath == null || 
schemaPath.equals(SchemaPath.DELETED_SCHEMA)) {
-          return;
-        }
-        Schema expectedSchema = pubsubClient.getSchema(schemaPath);
-        checkState(
-            targetSchema.equals(expectedSchema),
-            String.format(
-                "input schema mismatch with expected schema at path: %s\ninput 
schema: %s\nPub/Sub schema: %s",
-                schemaPath, targetSchema, expectedSchema));
-      } catch (IOException e) {
-        throw new IllegalStateException(e.getMessage());
-      }
-    }
-
-    Schema buildTargetSchema(Schema sourceSchema) {
-      validateSourceSchemaAgainstConfiguration(sourceSchema);
-      FieldType payloadFieldType = null;
-
-      List<String> fieldsToRemove = new ArrayList<>();
-
-      if (configuration.getSource() != null) {
-        SourceConfiguration source = configuration.getSource();
-
-        if (source.getAttributesFieldName() != null) {
-          fieldsToRemove.add(source.getAttributesFieldName());
-        }
-
-        if (source.getTimestampFieldName() != null) {
-          fieldsToRemove.add(source.getTimestampFieldName());
-        }
-
-        if (source.getPayloadFieldName() != null) {
-          String fieldName = source.getPayloadFieldName();
-          Field field = sourceSchema.getField(fieldName);
-          payloadFieldType = field.getType();
-          fieldsToRemove.add(fieldName);
-        }
-      }
-
-      Schema targetSchema =
-          PubsubRowToMessage.builder()
-              .build()
-              .inputSchemaFactory(payloadFieldType)
-              .buildSchema(sourceSchema.getFields().toArray(new Field[0]));
-
-      return removeFields(targetSchema, fieldsToRemove.toArray(new String[0]));
+              "Format %s not supported. Only supported formats are %s",
+              configuration.getFormat(), VALID_FORMATS_STR));
     }
+    return new PubsubWriteSchemaTransform(configuration.getTopic(), 
configuration.getFormat());
+  }
 
-    private PubsubClient.PubsubClientFactory getPubsubClientFactory() {
-      if (pubsubClientFactory != null) {
-        return pubsubClientFactory;
-      }
-      return PubsubGrpcClient.FACTORY;
-    }
+  private static class PubsubWriteSchemaTransform extends SchemaTransform 
implements Serializable {
+    final String topic;
+    final String format;
 
-    private PubsubClient getPubsubClient(PubsubOptions options) throws 
IOException {
-      return getPubsubClientFactory()
-          .newClient(
-              configuration.getTarget().getTimestampAttributeKey(),
-              configuration.getIdAttribute(),
-              options);
+    PubsubWriteSchemaTransform(String topic, String format) {
+      this.topic = topic;
+      this.format = format;
     }
 
-    ParDo.SingleOutput<Row, Row> convertForRowToMessage(Schema targetSchema) {
-      return convertForRowToMessage(targetSchema, null);
-    }
-
-    ParDo.SingleOutput<Row, Row> convertForRowToMessage(
-        Schema targetSchema, @Nullable Clock clock) {
-      String attributesName = null;
-      String timestampName = null;
-      String payloadName = null;
-      SourceConfiguration source = configuration.getSource();
-      if (source != null) {
-        attributesName = source.getAttributesFieldName();
-        timestampName = source.getTimestampFieldName();
-        payloadName = source.getPayloadFieldName();
-      }
-      return ParDo.of(
-          new ConvertForRowToMessage(
-              targetSchema, clock, attributesName, timestampName, 
payloadName));
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      final Schema errorSchema =
+          Schema.builder()
+              .addStringField("error")
+              .addNullableRowField("row", input.get("input").getSchema())
+              .build();
+      SerializableFunction<Row, byte[]> fn =
+          format.equals("AVRO")
+              ? 
AvroUtils.getRowToAvroBytesFunction(input.get("input").getSchema())
+              : 
JsonUtils.getRowToJsonBytesFunction(input.get("input").getSchema());
+
+      PCollectionTuple outputTuple =
+          input
+              .get("input")
+              .apply(
+                  ParDo.of(new ErrorFn(fn, errorSchema))
+                      .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+      outputTuple.get(OUTPUT_TAG).apply(PubsubIO.writeMessages().to(topic));
+
+      return PCollectionRowTuple.of("errors", 
outputTuple.get(ERROR_TAG).setRowSchema(errorSchema));
     }
   }
 
-  private static class ConvertForRowToMessage extends DoFn<Row, Row> {
-    private final Schema targetSchema;
-    @Nullable private final Clock clock;
-    @Nullable private final String attributesFieldName;
-    @Nullable private final String timestampFieldName;
-    @Nullable private final String payloadFieldName;
-
-    ConvertForRowToMessage(
-        Schema targetSchema,
-        @Nullable Clock clock,
-        @Nullable String attributesFieldName,
-        @Nullable String timestampFieldName,
-        @Nullable String payloadFieldName) {
-      this.targetSchema = targetSchema;
-      this.clock = clock;
-      this.attributesFieldName = attributesFieldName;
-      this.timestampFieldName = timestampFieldName;
-      this.payloadFieldName = payloadFieldName;
-    }
-
-    @ProcessElement
-    public void process(@Element Row row, OutputReceiver<Row> receiver) {
-      Instant now = Instant.now();
-      if (clock != null) {
-        now = Instant.ofEpochMilli(clock.currentTimeMillis());
-      }
-      Map<String, Object> values = new HashMap<>();
-
-      // Default attributes value
-      checkState(targetSchema.hasField(DEFAULT_ATTRIBUTES_KEY_NAME));
-      values.put(DEFAULT_ATTRIBUTES_KEY_NAME, ImmutableMap.of());
-
-      // Default timestamp value
-      checkState(targetSchema.hasField(DEFAULT_EVENT_TIMESTAMP_KEY_NAME));
-      values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, now);
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:pubsub_write:v1";
+  }
 
-      for (String fieldName : row.getSchema().getFieldNames()) {
-        if (targetSchema.hasField(fieldName)) {
-          values.put(fieldName, row.getValue(fieldName));
-        }
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      inputCollectionNames() {
+    return Collections.singletonList("input");
+  }
 
-        if (attributesFieldName != null) {
-          values.put(DEFAULT_ATTRIBUTES_KEY_NAME, 
row.getValue(attributesFieldName));
-        }
-        if (timestampFieldName != null) {
-          values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, 
row.getValue(timestampFieldName));
-        }
-        if (payloadFieldName != null) {
-          values.put(DEFAULT_PAYLOAD_KEY_NAME, row.getValue(payloadFieldName));
-        }
-      }
-      
receiver.output(Row.withSchema(targetSchema).withFieldValues(values).build());
-    }
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      outputCollectionNames() {
+    return Collections.singletonList("errors");
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
index 848549f1929..0de998f1112 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
@@ -18,300 +18,237 @@
 package org.apache.beam.sdk.io.gcp.pubsub;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 
 import com.google.api.client.util.Clock;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.PipelineResult;
 import 
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link PubsubReadSchemaTransformProvider}. */
+/** Tests for {@link 
org.apache.beam.sdk.io.gcp.pubsub.PubsubReadSchemaTransformProvider}. */
 @RunWith(JUnit4.class)
 public class PubsubReadSchemaTransformProviderTest {
 
-  private static final Schema SCHEMA =
+  private static final Schema BEAM_SCHEMA =
       Schema.of(
           Schema.Field.of("name", Schema.FieldType.STRING),
           Schema.Field.of("number", Schema.FieldType.INT64));
-
+  private static final Schema BEAM_SCHEMA_WITH_ERROR =
+      Schema.of(Schema.Field.of("error", Schema.FieldType.STRING));
+  private static final String SCHEMA = 
AvroUtils.toAvroSchema(BEAM_SCHEMA).toString();
   private static final String SUBSCRIPTION = 
"projects/project/subscriptions/subscription";
   private static final String TOPIC = "projects/project/topics/topic";
 
-  private static final List<TestCase> cases =
-      Arrays.asList(
-          testCase(
-                  "no configured topic or subscription",
-                  
PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).build())
-              .expectInvalidConfiguration(),
-          testCase(
-                  "both topic and subscription configured",
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setSubscription(SUBSCRIPTION)
-                      .setSubscription(TOPIC)
-                      .setDataSchema(SCHEMA)
-                      .build())
-              .expectInvalidConfiguration(),
-          testCase(
-                  "invalid format configured",
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setSubscription(SUBSCRIPTION)
-                      .setDataSchema(SCHEMA)
-                      .setFormat("invalidformat")
-                      .build())
-              .expectInvalidConfiguration(),
-          testCase(
-                  "configuration with subscription",
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setSubscription(SUBSCRIPTION)
-                      .setDataSchema(SCHEMA)
-                      .build())
-              
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)),
-          testCase(
-                  "configuration with topic",
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setTopic(TOPIC)
-                      .setDataSchema(SCHEMA)
-                      .build())
-              
.withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)),
-          testCase(
-                  "configuration with subscription, timestamp and id 
attributes",
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setSubscription(SUBSCRIPTION)
-                      .setTimestampAttribute("timestampAttribute")
-                      .setIdAttribute("idAttribute")
-                      .setDataSchema(SCHEMA)
-                      .build())
-              .withExpectedPubsubRead(
-                  PubsubIO.readMessages()
-                      .fromSubscription(SUBSCRIPTION)
-                      .withTimestampAttribute("timestampAttribute")
-                      .withIdAttribute("idAttribute")),
-          testCase(
-                  "configuration with subscription and dead letter queue",
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setSubscription(SUBSCRIPTION)
-                      .setDataSchema(SCHEMA)
-                      .setDeadLetterQueue(TOPIC)
-                      .build())
-              
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION))
-              .withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)),
-          testCase(
-                  "configuration with subscription, timestamp attribute, and 
dead letter queue",
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setSubscription(SUBSCRIPTION)
-                      .setTimestampAttribute("timestampAttribute")
-                      .setDataSchema(SCHEMA)
-                      .setDeadLetterQueue(TOPIC)
-                      .build())
-              .withExpectedPubsubRead(
-                  PubsubIO.readMessages()
-                      .fromSubscription(SUBSCRIPTION)
-                      .withTimestampAttribute("timestampAttribute"))
-              .withExpectedDeadLetterQueue(
-                  
PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute")));
-
-  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
-  private static final TypeDescriptor<PubsubReadSchemaTransformConfiguration> 
TYPE_DESCRIPTOR =
-      TypeDescriptor.of(PubsubReadSchemaTransformConfiguration.class);
-  private static final 
SerializableFunction<PubsubReadSchemaTransformConfiguration, Row>
-      ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
-
   private static final List<Row> ROWS =
       Arrays.asList(
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"a").withFieldValue("number", 100L).build(),
-          Row.withSchema(SCHEMA).withFieldValue("name", 
"b").withFieldValue("number", 200L).build(),
-          Row.withSchema(SCHEMA)
+          Row.withSchema(BEAM_SCHEMA)
+              .withFieldValue("name", "a")
+              .withFieldValue("number", 100L)
+              .build(),
+          Row.withSchema(BEAM_SCHEMA)
+              .withFieldValue("name", "b")
+              .withFieldValue("number", 200L)
+              .build(),
+          Row.withSchema(BEAM_SCHEMA)
               .withFieldValue("name", "c")
               .withFieldValue("number", 300L)
               .build());
 
-  private static final Clock CLOCK = (Clock & Serializable) () -> 
1656788475425L;
+  private static final List<Row> ROWSWITHERROR =
+      Arrays.asList(
+          Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error", 
"a").build(),
+          Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error", 
"b").build(),
+          Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error", 
"c").build());
+
+  private static final Clock CLOCK = (Clock & Serializable) () -> 
1678988970000L;
 
   private static final AvroPayloadSerializerProvider 
AVRO_PAYLOAD_SERIALIZER_PROVIDER =
       new AvroPayloadSerializerProvider();
   private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER =
-      AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(SCHEMA, new HashMap<>());
+      AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAM_SCHEMA, new 
HashMap<>());
+  private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER_WITH_ERROR =
+      AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAM_SCHEMA_WITH_ERROR, 
new HashMap<>());
 
-  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public transient TestPipeline p = TestPipeline.create();
 
   @Test
-  public void testBuildDeadLetterQueueWrite() {
-    for (TestCase testCase : cases) {
-      PubsubIO.Write<PubsubMessage> dlq =
-          testCase.pubsubReadSchemaTransform().buildDeadLetterQueueWrite();
-
-      if (testCase.expectedDeadLetterQueue == null) {
-        assertNull(testCase.name, dlq);
-        return;
-      }
-
-      Map<DisplayData.Identifier, DisplayData.Item> actual = 
DisplayData.from(dlq).asMap();
-      Map<DisplayData.Identifier, DisplayData.Item> expected = 
testCase.expectedDeadLetterQueue;
-
-      assertEquals(testCase.name, expected, actual);
-    }
+  public void testInvalidConfigNoTopicOrSubscription() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            new PubsubReadSchemaTransformProvider()
+                .from(
+                    PubsubReadSchemaTransformConfiguration.builder()
+                        .setSchema(SCHEMA)
+                        .setFormat("AVRO")
+                        .build()));
   }
 
   @Test
-  public void testReadAvro() throws IOException {
+  public void testInvalidConfigBothTopicAndSubscription() {
     PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
-    PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform =
-        schemaTransformWithClock("avro");
-    PubsubTestClient.PubsubTestClientFactory clientFactory =
-        clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis()));
-    transform.setClientFactory(clientFactory);
-    PCollectionRowTuple reads = begin.apply(transform);
-
-    
PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
-
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            begin.apply(
+                new PubsubReadSchemaTransformProvider()
+                    .from(
+                        PubsubReadSchemaTransformConfiguration.builder()
+                            .setSchema(SCHEMA)
+                            .setFormat("AVRO")
+                            .setTopic(TOPIC)
+                            .setSubscription(SUBSCRIPTION)
+                            .build())));
     p.run().waitUntilFinish();
-    clientFactory.close();
   }
 
   @Test
-  public void testReadJson() throws IOException {
+  public void testInvalidConfigInvalidFormat() {
     PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
-    PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform =
-        schemaTransformWithClock("json");
-    PubsubTestClient.PubsubTestClientFactory clientFactory =
-        clientFactory(incomingJsonMessagesOf(CLOCK.currentTimeMillis()));
-    transform.setClientFactory(clientFactory);
-    PCollectionRowTuple reads = begin.apply(transform);
-
-    
PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
-
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            begin.apply(
+                new PubsubReadSchemaTransformProvider()
+                    .from(
+                        PubsubReadSchemaTransformConfiguration.builder()
+                            .setSchema(SCHEMA)
+                            .setFormat("BadFormat")
+                            .setSubscription(SUBSCRIPTION)
+                            .build())));
     p.run().waitUntilFinish();
-
-    clientFactory.close();
-  }
-
-  @Test
-  public void testBuildPubSubRead() {
-    for (TestCase testCase : cases) {
-      if (testCase.invalidConfigurationExpected) {
-        continue;
-      }
-      Map<DisplayData.Identifier, DisplayData.Item> actual =
-          
DisplayData.from(testCase.pubsubReadSchemaTransform().buildPubsubRead()).asMap();
-
-      Map<DisplayData.Identifier, DisplayData.Item> expected = 
testCase.expectedPubsubRead;
-
-      assertEquals(testCase.name, expected, actual);
-    }
-  }
-
-  @Test
-  public void testInvalidConfiguration() {
-    for (TestCase testCase : cases) {
-      PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
-      if (testCase.invalidConfigurationExpected) {
-        assertThrows(
-            testCase.name,
-            RuntimeException.class,
-            () -> begin.apply(testCase.pubsubReadSchemaTransform()));
-      }
-    }
   }
 
   @Test
-  public void testInvalidInput() {
-    PCollectionRowTuple begin = PCollectionRowTuple.of("BadInput", 
p.apply(Create.of(ROWS)));
+  public void testNoSchema() {
+    PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
     assertThrows(
-        IllegalArgumentException.class,
+        IllegalStateException.class,
         () ->
             begin.apply(
                 new PubsubReadSchemaTransformProvider()
                     .from(
                         PubsubReadSchemaTransformConfiguration.builder()
-                            .setDataSchema(SCHEMA)
+                            .setSubscription(SUBSCRIPTION)
+                            .setFormat("AVRO")
                             .build())));
+    p.run().waitUntilFinish();
   }
 
-  private PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform 
schemaTransformWithClock(
-      String format) {
-    PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform =
-        (PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform)
-            new PubsubReadSchemaTransformProvider()
-                .from(
-                    PubsubReadSchemaTransformConfiguration.builder()
-                        .setDataSchema(SCHEMA)
-                        .setSubscription(SUBSCRIPTION)
-                        .setFormat(format)
-                        .build());
-
-    transform.setClock(CLOCK);
+  @Test
+  public void testReadAvro() throws IOException {
+    PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
 
-    return transform;
+    try (PubsubTestClientFactory clientFactory = 
clientFactory(beamRowToMessage())) {
+      PubsubReadSchemaTransformConfiguration config =
+          PubsubReadSchemaTransformConfiguration.builder()
+              .setFormat("AVRO")
+              .setSchema(SCHEMA)
+              .setSubscription(SUBSCRIPTION)
+              .setClientFactory(clientFactory)
+              .setClock(CLOCK)
+              .build();
+      SchemaTransform transform = new 
PubsubReadSchemaTransformProvider().from(config);
+      PCollectionRowTuple reads = begin.apply(transform);
+
+      PAssert.that(reads.get("output")).containsInAnyOrder(ROWS);
+
+      p.run().waitUntilFinish();
+    } catch (Exception e) {
+      throw e;
+    }
   }
 
-  private static PubsubTestClient.PubsubTestClientFactory clientFactory(
-      List<PubsubClient.IncomingMessage> messages) {
-    return PubsubTestClient.createFactoryForPull(
-        CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60, 
messages);
-  }
+  @Test
+  public void testReadAvroWithError() throws IOException {
+    PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
 
-  private static List<PubsubClient.IncomingMessage> 
incomingAvroMessagesOf(long millisSinceEpoch) {
-    return ROWS.stream()
-        .map(row -> incomingAvroMessageOf(row, millisSinceEpoch))
-        .collect(Collectors.toList());
-  }
+    try (PubsubTestClientFactory clientFactory = 
clientFactory(beamRowToMessageWithError())) {
+      PubsubReadSchemaTransformConfiguration config =
+          PubsubReadSchemaTransformConfiguration.builder()
+              .setFormat("AVRO")
+              .setSchema(SCHEMA)
+              .setSubscription(SUBSCRIPTION)
+              .setClientFactory(clientFactory)
+              .setClock(CLOCK)
+              .build();
+      SchemaTransform transform = new 
PubsubReadSchemaTransformProvider().from(config);
+      PCollectionRowTuple reads = begin.apply(transform);
+
+      PAssert.that(reads.get("output")).empty();
+
+      PipelineResult result = p.run();
+      result.waitUntilFinish();
+
+      MetricResults metrics = result.metrics();
+      MetricQueryResults metricResults =
+          metrics.queryMetrics(
+              MetricsFilter.builder()
+                  .addNameFilter(
+                      MetricNameFilter.named(
+                          PubsubReadSchemaTransformProvider.class, 
"PubSub-read-error-counter"))
+                  .build());
+
+      Iterable<MetricResult<Long>> counters = metricResults.getCounters();
+      if (!counters.iterator().hasNext()) {
+        throw new RuntimeException("no counters available ");
+      }
 
-  private static PubsubClient.IncomingMessage incomingAvroMessageOf(
-      Row row, long millisSinceEpoch) {
-    byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row);
-    return incomingMessageOf(bytes, millisSinceEpoch);
+      Long expectedCount = 3L;
+      for (MetricResult<Long> count : counters) {
+        assertEquals(expectedCount, count.getAttempted());
+      }
+    } catch (Exception e) {
+      throw e;
+    }
   }
 
-  private static List<PubsubClient.IncomingMessage> 
incomingJsonMessagesOf(long millisSinceEpoch) {
-    return PubsubReadSchemaTransformProviderTest.ROWS.stream()
-        .map(row -> incomingJsonMessageOf(row, millisSinceEpoch))
+  private static List<PubsubClient.IncomingMessage> beamRowToMessage() {
+    long timestamp = CLOCK.currentTimeMillis();
+    return ROWS.stream()
+        .map(
+            row -> {
+              byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row);
+              return incomingMessageOf(bytes, timestamp);
+            })
         .collect(Collectors.toList());
   }
 
-  private static PubsubClient.IncomingMessage incomingJsonMessageOf(
-      Row row, long millisSinceEpoch) {
-    String name = Objects.requireNonNull(row.getString("name"));
-    long number = Objects.requireNonNull(row.getInt64("number"));
-    return incomingJsonMessageOf(name, number, millisSinceEpoch);
-  }
-
-  private static PubsubClient.IncomingMessage incomingJsonMessageOf(
-      String name, long number, long millisSinceEpoch) {
-    Gson gson = new Gson();
-    JsonObject obj = new JsonObject();
-    obj.add("name", new JsonPrimitive(name));
-    obj.add("number", new JsonPrimitive(number));
-    byte[] bytes = gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
-    return incomingMessageOf(bytes, millisSinceEpoch);
+  private static List<PubsubClient.IncomingMessage> 
beamRowToMessageWithError() {
+    long timestamp = CLOCK.currentTimeMillis();
+    return ROWSWITHERROR.stream()
+        .map(
+            row -> {
+              byte[] bytes = AVRO_PAYLOAD_SERIALIZER_WITH_ERROR.serialize(row);
+              return incomingMessageOf(bytes, timestamp);
+            })
+        .collect(Collectors.toList());
   }
 
   private static PubsubClient.IncomingMessage incomingMessageOf(
@@ -329,51 +266,9 @@ public class PubsubReadSchemaTransformProviderTest {
         UUID.randomUUID().toString());
   }
 
-  static TestCase testCase(String name, PubsubReadSchemaTransformConfiguration 
configuration) {
-    return new TestCase(name, configuration);
-  }
-
-  private static class TestCase {
-
-    private final String name;
-    private final PubsubReadSchemaTransformConfiguration configuration;
-
-    private Map<DisplayData.Identifier, DisplayData.Item> 
expectedDeadLetterQueue;
-
-    private Map<DisplayData.Identifier, DisplayData.Item> expectedPubsubRead =
-        DisplayData.from(PubsubIO.readMessages()).asMap();
-
-    private boolean invalidConfigurationExpected = false;
-
-    TestCase(String name, PubsubReadSchemaTransformConfiguration 
configuration) {
-      this.name = name;
-      this.configuration = configuration;
-    }
-
-    PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform 
pubsubReadSchemaTransform() {
-      PubsubReadSchemaTransformProvider provider = new 
PubsubReadSchemaTransformProvider();
-      Row configurationRow = toBeamRow();
-      return (PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform)
-          provider.from(configurationRow);
-    }
-
-    private Row toBeamRow() {
-      return ROW_SERIALIZABLE_FUNCTION.apply(configuration);
-    }
-
-    TestCase withExpectedDeadLetterQueue(PubsubIO.Write<PubsubMessage> value) {
-      this.expectedDeadLetterQueue = DisplayData.from(value).asMap();
-      return this;
-    }
-
-    TestCase withExpectedPubsubRead(PubsubIO.Read<PubsubMessage> value) {
-      this.expectedPubsubRead = DisplayData.from(value).asMap();
-      return this;
-    }
-
-    TestCase expectInvalidConfiguration() {
-      this.invalidConfigurationExpected = true;
-      return this;
-    }
+  private static PubsubTestClient.PubsubTestClientFactory clientFactory(
+      List<PubsubClient.IncomingMessage> messages) {
+    return PubsubTestClient.createFactoryForPull(
+        CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60, 
messages);
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
deleted file mode 100644
index 709fc35e02a..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import 
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider;
-import org.apache.beam.sdk.values.Row;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Test for {@link PubsubSchemaTransformMessageToRowFactory}. */
-@RunWith(JUnit4.class)
-public class PubsubSchemaTransformMessageToRowFactoryTest {
-
-  List<TestCase> cases =
-      Arrays.asList(
-          
testCase(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA))
-              
.expectPayloadSerializerProvider(JSON_PAYLOAD_SERIALIZER_PROVIDER)
-              .withSerializerInput(),
-          
testCase(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA))
-              .expectPubsubToRow(
-                  PubsubMessageToRow.builder()
-                      .messageSchema(SCHEMA)
-                      .useFlatSchema(true)
-                      .useDlq(false)),
-          testCase(
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setDataSchema(SCHEMA)
-                      .setDeadLetterQueue("projects/project/topics/topic"))
-              .expectPubsubToRow(
-                  PubsubMessageToRow.builder()
-                      .messageSchema(SCHEMA)
-                      .useFlatSchema(true)
-                      .useDlq(true)),
-          testCase(
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setDataSchema(SCHEMA)
-                      .setFormat("avro"))
-              
.expectPayloadSerializerProvider(AVRO_PAYLOAD_SERIALIZER_PROVIDER)
-              .withSerializerInput(),
-          testCase(
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY)))
-              .schemaShouldHaveValidAttributesField()
-              .fieldShouldBePresent(
-                  ATTRIBUTES_FIELD_ARRAY.getName(), 
ATTRIBUTES_FIELD_ARRAY.getType()),
-          testCase(
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setDataSchema(Schema.of(ATTRIBUTES_FIELD_MAP)))
-              .schemaShouldHaveValidAttributesField()
-              .fieldShouldBePresent(ATTRIBUTES_FIELD_MAP.getName(), 
ATTRIBUTES_FIELD_MAP.getType()),
-          testCase(
-              PubsubReadSchemaTransformConfiguration.builder()
-                  
.setDataSchema(Schema.of(ATTRIBUTES_FIELD_SHOULD_NOT_MATCH))),
-          testCase(
-              PubsubReadSchemaTransformConfiguration.builder()
-                  .setDataSchema(Schema.of(PAYLOAD_FIELD_SHOULD_NOT_MATCH))),
-          testCase(
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setDataSchema(Schema.of(PAYLOAD_FIELD_BYTES)))
-              .schemaShouldHaveValidPayloadField()
-              .fieldShouldBePresent(PAYLOAD_FIELD_BYTES.getName(), 
PAYLOAD_FIELD_BYTES.getType()),
-          testCase(
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setDataSchema(Schema.of(PAYLOAD_FIELD_ROW)))
-              .schemaShouldHaveValidPayloadField()
-              .fieldShouldBePresent(PAYLOAD_FIELD_ROW.getName(), 
PAYLOAD_FIELD_ROW.getType()),
-          testCase(
-                  PubsubReadSchemaTransformConfiguration.builder()
-                      .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, 
PAYLOAD_FIELD_BYTES)))
-              .schemaShouldHaveValidAttributesField()
-              .schemaShouldHaveValidPayloadField()
-              .shouldUseNestedSchema()
-              .shouldNotNeedSerializer()
-              .expectPubsubToRow(
-                  PubsubMessageToRow.builder()
-                      .messageSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, 
PAYLOAD_FIELD_BYTES))
-                      .useFlatSchema(false)
-                      .useDlq(false)));
-
-  static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
-      Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), 
Schema.FieldType.STRING);
-  static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
-      Schema.builder().addStringField("key").addStringField("value").build();
-
-  static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
-      
Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
-
-  private static final Schema.Field ATTRIBUTES_FIELD_SHOULD_NOT_MATCH =
-      Schema.Field.of(ATTRIBUTES_FIELD, Schema.FieldType.STRING);
-
-  private static final Schema.Field ATTRIBUTES_FIELD_MAP =
-      Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE);
-
-  private static final Schema.Field ATTRIBUTES_FIELD_ARRAY =
-      Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
-
-  private static final Schema.Field PAYLOAD_FIELD_SHOULD_NOT_MATCH =
-      Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.STRING);
-
-  private static final Schema.Field PAYLOAD_FIELD_BYTES =
-      Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.BYTES);
-
-  private static final Schema.Field PAYLOAD_FIELD_ROW =
-      Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.row(Schema.of()));
-
-  private static final PayloadSerializerProvider 
JSON_PAYLOAD_SERIALIZER_PROVIDER =
-      new JsonPayloadSerializerProvider();
-
-  private static final AvroPayloadSerializerProvider 
AVRO_PAYLOAD_SERIALIZER_PROVIDER =
-      new AvroPayloadSerializerProvider();
-
-  private static final Schema SCHEMA =
-      Schema.of(
-          Schema.Field.of("name", Schema.FieldType.STRING),
-          Schema.Field.of("number", Schema.FieldType.INT64));
-
-  private static final Row ROW =
-      Row.withSchema(SCHEMA).withFieldValue("name", 
"a").withFieldValue("number", 1L).build();
-
-  @Test
-  public void testBuildMessageToRow() {
-    for (TestCase testCase : cases) {
-      if (testCase.expectPubsubToRow == null) {
-        continue;
-      }
-
-      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
-      PubsubMessageToRow expected = testCase.expectPubsubToRow;
-      PubsubMessageToRow actual = factory.buildMessageToRow();
-
-      assertEquals("messageSchema", expected.messageSchema(), 
actual.messageSchema());
-      assertEquals("useFlatSchema", expected.useFlatSchema(), 
actual.useFlatSchema());
-      assertEquals("useDlq", expected.useDlq(), actual.useDlq());
-    }
-  }
-
-  @Test
-  public void serializer() {
-    for (TestCase testCase : cases) {
-      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
-      if (testCase.expectPayloadSerializerProvider == null) {
-        continue;
-      }
-
-      Row serializerInput = testCase.serializerInput;
-
-      byte[] expectedBytes =
-          testCase
-              .expectSerializerProvider()
-              .apply(testCase.dataSchema())
-              .serialize(serializerInput);
-
-      byte[] actualBytes =
-          
factory.serializer().apply(testCase.dataSchema()).serialize(serializerInput);
-
-      String expected = new String(expectedBytes, StandardCharsets.UTF_8);
-      String actual = new String(actualBytes, StandardCharsets.UTF_8);
-
-      assertEquals(expected, actual);
-    }
-  }
-
-  @Test
-  public void needsSerializer() {
-    for (TestCase testCase : cases) {
-      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
-      boolean expected = testCase.shouldNeedSerializer;
-      boolean actual = factory.needsSerializer();
-
-      assertEquals(expected, actual);
-    }
-  }
-
-  @Test
-  public void shouldUseNestedSchema() {
-    for (TestCase testCase : cases) {
-      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
-      boolean expected = testCase.shouldUseNestedSchema;
-      boolean actual = factory.shouldUseNestedSchema();
-
-      assertEquals(expected, actual);
-    }
-  }
-
-  @Test
-  public void schemaHasValidPayloadField() {
-    for (TestCase testCase : cases) {
-      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
-      boolean expected = testCase.shouldSchemaHaveValidPayloadField;
-      boolean actual = factory.schemaHasValidPayloadField();
-
-      assertEquals(expected, actual);
-    }
-  }
-
-  @Test
-  public void schemaHasValidAttributesField() {
-    for (TestCase testCase : cases) {
-      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
-      boolean expected = testCase.shouldSchemaHaveValidAttributesField;
-      boolean actual = factory.schemaHasValidAttributesField();
-
-      assertEquals(expected, actual);
-    }
-  }
-
-  @Test
-  public void fieldPresent() {
-    for (TestCase testCase : cases) {
-      PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-      for (Entry<String, FieldType> entry : 
testCase.shouldFieldPresent.entrySet()) {
-
-        boolean actual = factory.fieldPresent(entry.getKey(), 
entry.getValue());
-
-        assertTrue(actual);
-      }
-    }
-  }
-
-  static TestCase testCase(PubsubReadSchemaTransformConfiguration.Builder 
configurationBuilder) {
-    return new TestCase(configurationBuilder);
-  }
-
-  private static class TestCase {
-    private final PubsubReadSchemaTransformConfiguration configuration;
-
-    private PubsubMessageToRow expectPubsubToRow;
-
-    private PayloadSerializerProvider expectPayloadSerializerProvider;
-
-    private boolean shouldUseNestedSchema = false;
-    private boolean shouldNeedSerializer = true;
-    private boolean shouldSchemaHaveValidPayloadField = false;
-    private boolean shouldSchemaHaveValidAttributesField = false;
-    private final Map<String, FieldType> shouldFieldPresent = new HashMap<>();
-
-    private Row serializerInput;
-
-    TestCase(PubsubReadSchemaTransformConfiguration.Builder 
configurationBuilder) {
-      this.configuration = configurationBuilder.build();
-    }
-
-    PubsubSchemaTransformMessageToRowFactory factory() {
-      return PubsubSchemaTransformMessageToRowFactory.from(configuration);
-    }
-
-    Schema dataSchema() {
-      return configuration.getDataSchema();
-    }
-
-    TestCase expectPubsubToRow(PubsubMessageToRow.Builder 
pubsubMessageToRowBuilder) {
-      this.expectPubsubToRow = pubsubMessageToRowBuilder.build();
-      return this;
-    }
-
-    TestCase withSerializerInput() {
-      this.serializerInput = PubsubSchemaTransformMessageToRowFactoryTest.ROW;
-      return this;
-    }
-
-    TestCase expectPayloadSerializerProvider(PayloadSerializerProvider value) {
-      this.expectPayloadSerializerProvider = value;
-      return this;
-    }
-
-    PubsubMessageToRow.SerializerProvider expectSerializerProvider() {
-      Map<String, Object> params = new HashMap<>();
-      PayloadSerializer payloadSerializer =
-          
expectPayloadSerializerProvider.getSerializer(configuration.getDataSchema(), 
params);
-
-      return (input -> payloadSerializer);
-    }
-
-    TestCase shouldUseNestedSchema() {
-      this.shouldUseNestedSchema = true;
-      return this;
-    }
-
-    TestCase shouldNotNeedSerializer() {
-      this.shouldNeedSerializer = false;
-      return this;
-    }
-
-    TestCase schemaShouldHaveValidPayloadField() {
-      this.shouldSchemaHaveValidPayloadField = true;
-      return this;
-    }
-
-    TestCase schemaShouldHaveValidAttributesField() {
-      this.shouldSchemaHaveValidAttributesField = true;
-      return this;
-    }
-
-    TestCase fieldShouldBePresent(String name, Schema.FieldType expectedType) {
-      this.shouldFieldPresent.put(name, expectedType);
-      return this;
-    }
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java
deleted file mode 100644
index cb0e6ec03cc..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.DEFAULT_TIMESTAMP_ATTRIBUTE;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.tuple.Pair;
-import org.joda.time.Instant;
-import org.joda.time.format.ISODateTimeFormat;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/** Integration tests for {@link PubsubWriteSchemaTransformProvider}. */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-public class PubsubWriteSchemaTransformProviderIT {
-
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
-  private static final TestPubsubOptions TEST_PUBSUB_OPTIONS =
-      TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
-
-  static {
-    TEST_PUBSUB_OPTIONS.setBlockOnRun(false);
-  }
-
-  private static final String HAS_NO_SCHEMA = "has-no-schema";
-
-  private static PubsubClient pubsubClient;
-
-  private static PubsubClient.TopicPath hasNoSchemaTopic;
-
-  private static PubsubClient.SubscriptionPath hasNoSchemaSubscription;
-
-  private static final Instant TIMESTAMP = Instant.now();
-
-  private static final String RESOURCE_NAME_POSTFIX = "-" + 
TIMESTAMP.getMillis();
-
-  private static final int ACK_DEADLINE_SECONDS = 60;
-
-  private static final int AWAIT_TERMINATED_SECONDS = 30;
-
-  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
-
-  private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration>
-      CONFIGURATION_TYPE_DESCRIPTOR =
-          TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class);
-
-  private static final 
SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row>
-      TO_ROW_FN = 
AUTO_VALUE_SCHEMA.toRowFunction(CONFIGURATION_TYPE_DESCRIPTOR);
-
-  private final Field timestampField = Field.of("timestamp", 
FieldType.DATETIME);
-
-  private final Field payloadBytesField = Field.of("payload", FieldType.BYTES);
-
-  @BeforeClass
-  public static void setUp() throws IOException {
-    String project = TEST_PUBSUB_OPTIONS.as(PubsubOptions.class).getProject();
-    pubsubClient = PubsubGrpcClient.FACTORY.newClient(null, null, 
TEST_PUBSUB_OPTIONS);
-    hasNoSchemaTopic =
-        PubsubClient.topicPathFromName(project, HAS_NO_SCHEMA + 
RESOURCE_NAME_POSTFIX);
-    hasNoSchemaSubscription =
-        PubsubClient.subscriptionPathFromName(project, HAS_NO_SCHEMA + 
RESOURCE_NAME_POSTFIX);
-
-    pubsubClient.createTopic(hasNoSchemaTopic);
-    pubsubClient.createSubscription(
-        hasNoSchemaTopic, hasNoSchemaSubscription, ACK_DEADLINE_SECONDS);
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    pubsubClient.deleteSubscription(hasNoSchemaSubscription);
-    pubsubClient.deleteTopic(hasNoSchemaTopic);
-
-    pubsubClient.close();
-  }
-
-  @Test
-  public void testWritePayloadBytes() throws IOException {
-    Instant timestamp = Instant.ofEpochMilli(100000L);
-    Schema schema = Schema.of(payloadBytesField, timestampField);
-    List<Row> input =
-        Collections.singletonList(
-            
Row.withSchema(schema).attachValues("aaa".getBytes(StandardCharsets.UTF_8), 
timestamp));
-    Row configuration =
-        TO_ROW_FN.apply(
-            PubsubWriteSchemaTransformConfiguration.builder()
-                .setSource(
-                    
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                        .setPayloadFieldName(payloadBytesField.getName())
-                        .setTimestampFieldName(timestampField.getName())
-                        .build())
-                .setTopic(hasNoSchemaTopic.getPath())
-                .setTarget(
-                    
PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build())
-                .build());
-
-    PCollectionRowTuple.of(INPUT_TAG, 
pipeline.apply(Create.of(input).withRowSchema(schema)))
-        .apply(new PubsubWriteSchemaTransformProvider().from(configuration));
-
-    PipelineResult job = pipeline.run(TEST_PUBSUB_OPTIONS);
-    Instant now = Instant.now();
-    Instant stop = Instant.ofEpochMilli(now.getMillis() + 
AWAIT_TERMINATED_SECONDS * 1000);
-    List<Pair<String, Map<String, String>>> actualList = new ArrayList<>();
-    while (now.isBefore(stop)) {
-      List<IncomingMessage> received = pubsubClient.pull(0, 
hasNoSchemaSubscription, 1, true);
-      for (IncomingMessage incoming : received) {
-        actualList.add(
-            Pair.of(
-                incoming.message().getData().toStringUtf8(),
-                ImmutableMap.of(
-                    DEFAULT_TIMESTAMP_ATTRIBUTE,
-                    incoming
-                        .message()
-                        .getAttributesMap()
-                        .getOrDefault(DEFAULT_TIMESTAMP_ATTRIBUTE, ""))));
-      }
-      if (actualList.size() == input.size()) {
-        break;
-      }
-      now = Instant.now();
-    }
-    job.cancel();
-    assertFalse(
-        String.format(
-            "messages pulled from %s should not be empty", 
hasNoSchemaSubscription.getPath()),
-        actualList.isEmpty());
-    Pair<String, Map<String, String>> actual = actualList.get(0);
-    Row expected = input.get(0);
-    String payload =
-        new String(
-            
Objects.requireNonNull(expected.getBytes(payloadBytesField.getName())),
-            StandardCharsets.UTF_8);
-    assertEquals(payload, actual.getLeft());
-    assertEquals(
-        ISODateTimeFormat.dateTime().print(timestamp),
-        actual.getRight().get(DEFAULT_TIMESTAMP_ATTRIBUTE));
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java
deleted file mode 100644
index 98939f7ddc6..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java
+++ /dev/null
@@ -1,786 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.NON_USER_WITH_BYTES_PAYLOAD;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.rowWithAllDataTypes;
-import static 
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-
-import com.google.api.client.util.Clock;
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import org.apache.avro.SchemaParseException;
-import 
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
-import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.RowJson.UnsupportedRowJsonException;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link PubsubWriteSchemaTransformProvider}. */
-@RunWith(JUnit4.class)
-public class PubsubWriteSchemaTransformProviderTest {
-
-  private static final String ID_ATTRIBUTE = "id_attribute";
-  private static final String TOPIC = "projects/project/topics/topic";
-  private static final MockClock CLOCK = new MockClock(Instant.now());
-  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
-  private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration> 
TYPE_DESCRIPTOR =
-      TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class);
-  private static final 
SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row> TO_ROW =
-      AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
-
-  private static final PipelineOptions OPTIONS = 
PipelineOptionsFactory.create();
-
-  static {
-    OPTIONS.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
-  }
-
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
-  @Test
-  public void testBuildPubsubWrite() {
-    assertEquals(
-        "default configuration should yield a topic Pub/Sub write",
-        pubsubWrite(),
-        transform(configurationBuilder()).buildPubsubWrite());
-
-    assertEquals(
-        "idAttribute in configuration should yield a idAttribute set Pub/Sub 
write",
-        pubsubWrite().withIdAttribute(ID_ATTRIBUTE),
-        
transform(configurationBuilder().setIdAttribute(ID_ATTRIBUTE)).buildPubsubWrite());
-  }
-
-  @Test
-  public void testBuildPubsubRowToMessage() {
-    assertEquals(
-        "override timestamp attribute on configuration should yield a 
PubsubRowToMessage with target timestamp",
-        
rowToMessageBuilder().setTargetTimestampAttributeName("custom_timestamp_attribute").build(),
-        transform(
-                configurationBuilder()
-                    .setTarget(
-                        
PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder()
-                            
.setTimestampAttributeKey("custom_timestamp_attribute")
-                            .build()))
-            .buildPubsubRowToMessage(NON_USER_WITH_BYTES_PAYLOAD));
-
-    assertNull(
-        "failing to set format should yield a null payload serializer",
-        transform(configurationBuilder())
-            .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA)
-            .getPayloadSerializer());
-
-    assertThrows(
-        "setting 'json' format for a unsupported field containing Schema 
should throw an Exception",
-        UnsupportedRowJsonException.class,
-        () ->
-            transform(configurationBuilder().setFormat("json"))
-                .buildPubsubRowToMessage(
-                    Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, 
ATTRIBUTES_FIELD_TYPE))));
-
-    assertThrows(
-        "setting 'avro' format for a unsupported field containing Schema 
should throw an Exception",
-        SchemaParseException.class,
-        () ->
-            transform(configurationBuilder().setFormat("avro"))
-                .buildPubsubRowToMessage(
-                    Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, 
ATTRIBUTES_FIELD_TYPE))));
-
-    assertNotNull(
-        "setting 'json' format for valid schema should yield 
PayloadSerializer",
-        transform(configurationBuilder().setFormat("json"))
-            .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA)
-            .getPayloadSerializer());
-
-    assertNotNull(
-        "setting 'avro' format for valid schema should yield 
PayloadSerializer",
-        transform(configurationBuilder().setFormat("avro"))
-            .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA)
-            .getPayloadSerializer());
-  }
-
-  @Test
-  public void testInvalidTaggedInput() {
-    Row withAllDataTypes =
-        rowWithAllDataTypes(
-            true,
-            (byte) 0,
-            Instant.now().toDateTime(),
-            BigDecimal.valueOf(1L),
-            3.12345,
-            4.1f,
-            (short) 5,
-            2,
-            7L,
-            "asdfjkl;");
-
-    PCollection<Row> rows =
-        
pipeline.apply(Create.of(withAllDataTypes)).setRowSchema(ALL_DATA_TYPES_SCHEMA);
-
-    assertThrows(
-        "empty input should not be allowed",
-        IllegalArgumentException.class,
-        () -> 
transform(configurationBuilder()).expand(PCollectionRowTuple.empty(pipeline)));
-
-    assertThrows(
-        "input with >1 tagged rows should not be allowed",
-        IllegalArgumentException.class,
-        () ->
-            transform(configurationBuilder())
-                .expand(PCollectionRowTuple.of(INPUT_TAG, 
rows).and("somethingelse", rows)));
-
-    assertThrows(
-        "input missing INPUT tag should not be allowed",
-        IllegalArgumentException.class,
-        () ->
-            transform(configurationBuilder())
-                .expand(PCollectionRowTuple.of("somethingelse", rows)));
-
-    pipeline.run(OPTIONS);
-  }
-
-  @Test
-  public void testValidateSourceSchemaAgainstConfiguration() {
-    // Only containing user fields and no configuration details should be valid
-    transform(configurationBuilder())
-        .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA);
-
-    // Matching attributes, timestamp, and payload (bytes) fields configured 
with expected types
-    // should be valid
-    transform(
-            configurationBuilder()
-                .setSource(
-                    
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                        .setAttributesFieldName("attributes")
-                        .setTimestampFieldName("timestamp")
-                        .setPayloadFieldName("payload")
-                        .build()))
-        .validateSourceSchemaAgainstConfiguration(
-            Schema.of(
-                Field.of("attributes", ATTRIBUTES_FIELD_TYPE),
-                Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE),
-                Field.of("payload", Schema.FieldType.BYTES)));
-
-    // Matching attributes, timestamp, and payload (ROW) fields configured 
with expected types
-    // should be valid
-    transform(
-            configurationBuilder()
-                .setSource(
-                    
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                        .setAttributesFieldName("attributes")
-                        .setTimestampFieldName("timestamp")
-                        .setPayloadFieldName("payload")
-                        .build()))
-        .validateSourceSchemaAgainstConfiguration(
-            Schema.of(
-                Field.of("attributes", ATTRIBUTES_FIELD_TYPE),
-                Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE),
-                Field.of("payload", 
Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA))));
-
-    assertThrows(
-        "empty Schema should be invalid",
-        IllegalArgumentException.class,
-        () ->
-            transform(configurationBuilder())
-                .validateSourceSchemaAgainstConfiguration(Schema.of()));
-
-    assertThrows(
-        "attributes field in configuration but not in schema should be 
invalid",
-        IllegalArgumentException.class,
-        () ->
-            transform(
-                    configurationBuilder()
-                        .setSource(
-                            
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                                .setAttributesFieldName("attributes")
-                                .build()))
-                
.validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA));
-
-    assertThrows(
-        "timestamp field in configuration but not in schema should be invalid",
-        IllegalArgumentException.class,
-        () ->
-            transform(
-                    configurationBuilder()
-                        .setSource(
-                            
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                                .setTimestampFieldName("timestamp")
-                                .build()))
-                
.validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA));
-
-    assertThrows(
-        "payload field in configuration but not in schema should be invalid",
-        IllegalArgumentException.class,
-        () ->
-            transform(
-                    configurationBuilder()
-                        .setSource(
-                            
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                                .setPayloadFieldName("payload")
-                                .build()))
-                
.validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA));
-
-    assertThrows(
-        "attributes field in configuration but mismatching attributes type 
should be invalid",
-        IllegalArgumentException.class,
-        () ->
-            transform(
-                    configurationBuilder()
-                        .setSource(
-                            
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                                .setAttributesFieldName("attributes")
-                                .build()))
-                .validateSourceSchemaAgainstConfiguration(
-                    // should be FieldType.map(FieldType.STRING, 
FieldType.STRING)
-                    Schema.of(
-                        Field.of("attributes", FieldType.map(FieldType.BYTES, 
FieldType.STRING)))));
-
-    assertThrows(
-        "timestamp field in configuration but mismatching timestamp type 
should be invalid",
-        IllegalArgumentException.class,
-        () ->
-            transform(
-                    configurationBuilder()
-                        .setSource(
-                            
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                                .setAttributesFieldName("timestamp")
-                                .build()))
-                .validateSourceSchemaAgainstConfiguration(
-                    // should be FieldType.DATETIME
-                    Schema.of(Field.of("timestamp", FieldType.STRING))));
-
-    assertThrows(
-        "payload field in configuration but mismatching payload type should be 
invalid",
-        IllegalArgumentException.class,
-        () ->
-            transform(
-                    configurationBuilder()
-                        .setSource(
-                            
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                                .setAttributesFieldName("payload")
-                                .build()))
-                .validateSourceSchemaAgainstConfiguration(
-                    // should be FieldType.BYTES or FieldType.row(...)
-                    Schema.of(Field.of("payload", FieldType.STRING))));
-  }
-
-  @Test
-  public void testValidateTargetSchemaAgainstPubsubSchema() throws IOException 
{
-    TopicPath topicPath = PubsubClient.topicPathFromPath(TOPIC);
-    PubsubTestClientFactory noSchemaFactory =
-        PubsubTestClient.createFactoryForGetSchema(topicPath, null, null);
-
-    PubsubTestClientFactory schemaDeletedFactory =
-        PubsubTestClient.createFactoryForGetSchema(topicPath, 
SchemaPath.DELETED_SCHEMA, null);
-
-    PubsubTestClientFactory mismatchingSchemaFactory =
-        PubsubTestClient.createFactoryForGetSchema(
-            topicPath,
-            PubsubClient.schemaPathFromId("testProject", "misMatch"),
-            Schema.of(Field.of("StringField", FieldType.STRING)));
-
-    PubsubTestClientFactory matchingSchemaFactory =
-        PubsubTestClient.createFactoryForGetSchema(
-            topicPath,
-            PubsubClient.schemaPathFromId("testProject", "match"),
-            ALL_DATA_TYPES_SCHEMA);
-
-    // Should pass validation exceptions if Pub/Sub topic lacks schema
-    transform(configurationBuilder())
-        .withPubsubClientFactory(noSchemaFactory)
-        .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, 
OPTIONS);
-    noSchemaFactory.close();
-
-    // Should pass validation if Pub/Sub topic schema deleted
-    transform(configurationBuilder())
-        .withPubsubClientFactory(schemaDeletedFactory)
-        .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, 
OPTIONS);
-    schemaDeletedFactory.close();
-
-    assertThrows(
-        "mismatched schema should be detected from Pub/Sub topic",
-        IllegalStateException.class,
-        () ->
-            transform(configurationBuilder())
-                .withPubsubClientFactory(mismatchingSchemaFactory)
-                
.validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS));
-    mismatchingSchemaFactory.close();
-
-    // Should pass validation if Pub/Sub topic schema matches
-    transform(configurationBuilder())
-        .withPubsubClientFactory(matchingSchemaFactory)
-        .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, 
OPTIONS);
-    matchingSchemaFactory.close();
-  }
-
-  @Test
-  public void testBuildTargetSchema() {
-
-    Field sourceAttributesField = Field.of("attributes", 
ATTRIBUTES_FIELD_TYPE);
-    Field sourceTimestampField = Field.of("timestamp", 
EVENT_TIMESTAMP_FIELD_TYPE);
-    Field sourcePayloadBytesField = Field.of("payload", FieldType.BYTES);
-    Field sourcePayloadRowField = Field.of("payload", 
FieldType.row(ALL_DATA_TYPES_SCHEMA));
-
-    Field targetAttributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, 
ATTRIBUTES_FIELD_TYPE);
-    Field targetTimestampField =
-        Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE);
-    Field targetPayloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, 
FieldType.BYTES);
-    Field targetPayloadRowField =
-        Field.of(DEFAULT_PAYLOAD_KEY_NAME, 
FieldType.row(ALL_DATA_TYPES_SCHEMA));
-
-    assertEquals(
-        "attributes and timestamp field should append to user fields",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .build()))
-            .buildTargetSchema(ALL_DATA_TYPES_SCHEMA));
-
-    assertEquals(
-        "timestamp field should append to user fields; attributes field name 
changed",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .setAttributesFieldName("attributes")
-                            .build()))
-            .buildTargetSchema(
-                Schema.builder()
-                    .addField(sourceAttributesField)
-                    .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-                    .build()));
-
-    assertEquals(
-        "attributes field should append to user fields; timestamp field name 
changed",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .setTimestampFieldName("timestamp")
-                            .build()))
-            .buildTargetSchema(
-                Schema.builder()
-                    .addField(sourceTimestampField)
-                    .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-                    .build()));
-
-    assertEquals(
-        "attributes and timestamp field appended to user payload bytes field; 
payload field name changed",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addField(targetPayloadBytesField)
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .setPayloadFieldName("payload")
-                            .build()))
-            
.buildTargetSchema(Schema.builder().addField(sourcePayloadBytesField).build()));
-
-    assertEquals(
-        "attributes and timestamp field appended to user payload row field; 
payload field name changed",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addField(targetPayloadRowField)
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .setPayloadFieldName("payload")
-                            .build()))
-            
.buildTargetSchema(Schema.builder().addField(sourcePayloadRowField).build()));
-
-    assertEquals(
-        "attributes and timestamp fields name changed",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .setAttributesFieldName("attributes")
-                            .setTimestampFieldName("timestamp")
-                            .build()))
-            .buildTargetSchema(
-                Schema.builder()
-                    .addField(sourceAttributesField)
-                    .addField(sourceTimestampField)
-                    .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-                    .build()));
-
-    assertEquals(
-        "attributes, timestamp, payload bytes fields name changed",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addFields(targetPayloadBytesField)
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .setAttributesFieldName("attributes")
-                            .setTimestampFieldName("timestamp")
-                            .setPayloadFieldName("payload")
-                            .build()))
-            .buildTargetSchema(
-                Schema.builder()
-                    .addField(sourceAttributesField)
-                    .addField(sourceTimestampField)
-                    .addField(sourcePayloadBytesField)
-                    .build()));
-
-    assertEquals(
-        "attributes, timestamp, payload row fields name changed",
-        Schema.builder()
-            .addField(targetAttributesField)
-            .addField(targetTimestampField)
-            .addFields(targetPayloadRowField)
-            .build(),
-        transform(
-                configurationBuilder()
-                    .setSource(
-                        
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
-                            .setAttributesFieldName("attributes")
-                            .setTimestampFieldName("timestamp")
-                            .setPayloadFieldName("payload")
-                            .build()))
-            .buildTargetSchema(
-                Schema.builder()
-                    .addField(sourceAttributesField)
-                    .addField(sourceTimestampField)
-                    .addField(sourcePayloadRowField)
-                    .build()));
-  }
-
-  @Test
-  public void testConvertForRowToMessageTransform() {
-    Row userRow =
-        rowWithAllDataTypes(
-            false,
-            (byte) 0,
-            Instant.ofEpochMilli(CLOCK.currentTimeMillis()).toDateTime(),
-            BigDecimal.valueOf(1L),
-            1.12345,
-            1.1f,
-            (short) 1,
-            1,
-            1L,
-            "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮");
-
-    Field sourceAttributes = Field.of("attributes", ATTRIBUTES_FIELD_TYPE);
-    Field targetAttributes = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, 
ATTRIBUTES_FIELD_TYPE);
-
-    Field sourceTimestamp = Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE);
-    Field targetTimestamp = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, 
EVENT_TIMESTAMP_FIELD_TYPE);
-
-    Field sourcePayloadBytes = Field.of("payload", FieldType.BYTES);
-    Field targetPayloadBytes = Field.of(DEFAULT_PAYLOAD_KEY_NAME, 
FieldType.BYTES);
-
-    Field sourcePayloadRow = Field.of("payload", 
FieldType.row(ALL_DATA_TYPES_SCHEMA));
-    Field targetPayloadRow =
-        Field.of(DEFAULT_PAYLOAD_KEY_NAME, 
FieldType.row(ALL_DATA_TYPES_SCHEMA));
-
-    Map<String, String> attributes = ImmutableMap.of("a", "1");
-    Instant generatedTimestamp = 
Instant.ofEpochMilli(CLOCK.currentTimeMillis());
-    Instant timestampFromSource = 
Instant.ofEpochMilli(CLOCK.currentTimeMillis() + 10000L);
-    byte[] payloadBytes = 
"吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮".getBytes(StandardCharsets.UTF_8);
-
-    PAssert.that(
-            "attributes only source yields attributes + timestamp target",
-            pipeline
-                .apply(
-                    
Create.of(Row.withSchema(Schema.of(sourceAttributes)).attachValues(attributes)))
-                .setRowSchema(Schema.of(sourceAttributes))
-                .apply(
-                    transform(
-                            configurationBuilder()
-                                .setSource(
-                                    PubsubWriteSchemaTransformConfiguration
-                                        .sourceConfigurationBuilder()
-                                        
.setAttributesFieldName(sourceAttributes.getName())
-                                        .build()))
-                        .convertForRowToMessage(
-                            Schema.of(targetAttributes, targetTimestamp), 
CLOCK))
-                .setRowSchema(Schema.of(targetAttributes, targetTimestamp)))
-        .containsInAnyOrder(
-            Row.withSchema(Schema.of(targetAttributes, targetTimestamp))
-                .attachValues(attributes, generatedTimestamp));
-
-    PAssert.that(
-            "timestamp only source yields attributes + timestamp target",
-            pipeline
-                .apply(
-                    Create.of(
-                        Row.withSchema(Schema.of(sourceTimestamp))
-                            .attachValues(timestampFromSource)))
-                .setRowSchema(Schema.of(sourceTimestamp))
-                .apply(
-                    transform(
-                            configurationBuilder()
-                                .setSource(
-                                    PubsubWriteSchemaTransformConfiguration
-                                        .sourceConfigurationBuilder()
-                                        
.setTimestampFieldName(sourceTimestamp.getName())
-                                        .build()))
-                        .convertForRowToMessage(
-                            Schema.of(targetAttributes, targetTimestamp), 
CLOCK))
-                .setRowSchema(Schema.of(targetAttributes, targetTimestamp)))
-        .containsInAnyOrder(
-            Row.withSchema(Schema.of(targetAttributes, targetTimestamp))
-                .attachValues(ImmutableMap.of(), timestampFromSource));
-
-    PAssert.that(
-            "timestamp and attributes source yields renamed fields in target",
-            pipeline
-                .apply(
-                    Create.of(
-                        Row.withSchema(Schema.of(sourceAttributes, 
sourceTimestamp))
-                            .attachValues(attributes, timestampFromSource)))
-                .setRowSchema(Schema.of(sourceAttributes, sourceTimestamp))
-                .apply(
-                    transform(
-                            configurationBuilder()
-                                .setSource(
-                                    PubsubWriteSchemaTransformConfiguration
-                                        .sourceConfigurationBuilder()
-                                        
.setAttributesFieldName(sourceAttributes.getName())
-                                        
.setTimestampFieldName(sourceTimestamp.getName())
-                                        .build()))
-                        .convertForRowToMessage(
-                            Schema.of(targetAttributes, targetTimestamp), 
CLOCK))
-                .setRowSchema(Schema.of(targetAttributes, targetTimestamp)))
-        .containsInAnyOrder(
-            Row.withSchema(Schema.of(targetAttributes, targetTimestamp))
-                .attachValues(attributes, timestampFromSource));
-
-    PAssert.that(
-            "bytes payload only source yields attributes + timestamp + renamed 
bytes payload target",
-            pipeline
-                .apply(
-                    Create.of(
-                        Row.withSchema(Schema.of(sourcePayloadBytes))
-                            .withFieldValue(sourcePayloadBytes.getName(), 
payloadBytes)
-                            .build()))
-                .setRowSchema(Schema.of(sourcePayloadBytes))
-                .apply(
-                    transform(
-                            configurationBuilder()
-                                .setSource(
-                                    PubsubWriteSchemaTransformConfiguration
-                                        .sourceConfigurationBuilder()
-                                        
.setPayloadFieldName(sourcePayloadBytes.getName())
-                                        .build()))
-                        .convertForRowToMessage(
-                            Schema.of(targetAttributes, targetTimestamp, 
targetPayloadBytes),
-                            CLOCK))
-                .setRowSchema(Schema.of(targetAttributes, targetTimestamp, 
targetPayloadBytes)))
-        .containsInAnyOrder(
-            Row.withSchema(Schema.of(targetAttributes, targetTimestamp, 
targetPayloadBytes))
-                .attachValues(ImmutableMap.of(), generatedTimestamp, 
payloadBytes));
-
-    PAssert.that(
-            "row payload only source yields attributes + timestamp + renamed 
row payload target",
-            pipeline
-                
.apply(Create.of(Row.withSchema(Schema.of(sourcePayloadRow)).attachValues(userRow)))
-                .setRowSchema(Schema.of(sourcePayloadRow))
-                .apply(
-                    transform(
-                            configurationBuilder()
-                                .setSource(
-                                    PubsubWriteSchemaTransformConfiguration
-                                        .sourceConfigurationBuilder()
-                                        
.setPayloadFieldName(sourcePayloadRow.getName())
-                                        .build()))
-                        .convertForRowToMessage(
-                            Schema.of(targetAttributes, targetTimestamp, 
targetPayloadRow), CLOCK))
-                .setRowSchema(Schema.of(targetAttributes, targetTimestamp, 
targetPayloadRow)))
-        .containsInAnyOrder(
-            Row.withSchema(Schema.of(targetAttributes, targetTimestamp, 
targetPayloadRow))
-                .attachValues(ImmutableMap.of(), generatedTimestamp, userRow));
-
-    PAssert.that(
-            "user only fields source yields attributes + timestamp + user 
fields target",
-            pipeline
-                .apply(Create.of(userRow))
-                .setRowSchema(ALL_DATA_TYPES_SCHEMA)
-                .apply(
-                    transform(configurationBuilder())
-                        .convertForRowToMessage(
-                            Schema.builder()
-                                .addField(targetAttributes)
-                                .addField(targetTimestamp)
-                                .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-                                .build(),
-                            CLOCK))
-                .setRowSchema(
-                    Schema.builder()
-                        .addField(targetAttributes)
-                        .addField(targetTimestamp)
-                        .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-                        .build()))
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addField(targetAttributes)
-                        .addField(targetTimestamp)
-                        .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
-                        .build())
-                .addValue(ImmutableMap.of())
-                .addValue(generatedTimestamp)
-                .addValues(userRow.getValues())
-                .build());
-
-    pipeline.run(OPTIONS);
-  }
-
-  @Test
-  public void testGetPayloadSerializer() {
-    Row withAllDataTypes =
-        rowWithAllDataTypes(
-            false,
-            (byte) 0,
-            Instant.now().toDateTime(),
-            BigDecimal.valueOf(-1L),
-            -3.12345,
-            -4.1f,
-            (short) -5,
-            -2,
-            -7L,
-            "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮");
-
-    PayloadSerializer jsonPayloadSerializer =
-        new 
JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, 
ImmutableMap.of());
-    byte[] expectedJson = jsonPayloadSerializer.serialize(withAllDataTypes);
-    byte[] actualJson =
-        transform(configurationBuilder().setFormat("json"))
-            .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA)
-            .serialize(withAllDataTypes);
-
-    PayloadSerializer avroPayloadSerializer =
-        new 
AvroPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, 
ImmutableMap.of());
-    byte[] expectedAvro = avroPayloadSerializer.serialize(withAllDataTypes);
-    byte[] actualAvro =
-        transform(configurationBuilder().setFormat("avro"))
-            .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA)
-            .serialize(withAllDataTypes);
-
-    assertArrayEquals(
-        "configuration with json format should yield JSON PayloadSerializer",
-        expectedJson,
-        actualJson);
-
-    assertArrayEquals(
-        "configuration with avro format should yield Avro PayloadSerializer",
-        expectedAvro,
-        actualAvro);
-  }
-
-  private static PubsubWriteSchemaTransformConfiguration.Builder 
configurationBuilder() {
-    return PubsubWriteSchemaTransformConfiguration.builder()
-        .setTopic(TOPIC)
-        
.setTarget(PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build());
-  }
-
-  private static PubsubRowToMessage.Builder rowToMessageBuilder() {
-    return PubsubRowToMessage.builder();
-  }
-
-  private static PubsubIO.Write<PubsubMessage> pubsubWrite() {
-    return PubsubIO.writeMessages().to(TOPIC);
-  }
-
-  private static PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform 
transform(
-      PubsubWriteSchemaTransformConfiguration.Builder configurationBuilder) {
-    Row configurationRow = TO_ROW.apply(configurationBuilder.build());
-    PubsubWriteSchemaTransformProvider provider = new 
PubsubWriteSchemaTransformProvider();
-    return (PubsubWriteSchemaTransform) provider.from(configurationRow);
-  }
-
-  private static class MockClock implements Clock, Serializable {
-    private final Long millis;
-
-    private MockClock(Instant timestamp) {
-      this.millis = timestamp.getMillis();
-    }
-
-    @Override
-    public long currentTimeMillis() {
-      return millis;
-    }
-  }
-}


Reply via email to