This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b50a0992b95 Remove Read and Write PubsubLite for Yaml (#37383)
b50a0992b95 is described below

commit b50a0992b959a12253656bc03a280ddfac2b3a02
Author: Derrick Williams <[email protected]>
AuthorDate: Thu Jan 22 18:54:43 2026 -0500

    Remove Read and Write PubsubLite for Yaml (#37383)
    
    * remove ReadWrite PubsubLite Yaml
    
    * add back pubsub oops
    
    * and the configuration
---
 .../assets/symbols/java.g.yaml                     |  39 --
 .../PubsubLiteReadSchemaTransformProvider.java     | 544 --------------------
 .../PubsubLiteWriteSchemaTransformProvider.java    | 466 -----------------
 .../beam/sdk/io/gcp/pubsublite/ReadWriteIT.java    | 379 --------------
 .../gcp/pubsublite/internal/PubsubLiteDlqTest.java | 565 ---------------------
 .../internal/PubsubLiteWriteDlqTest.java           | 273 ----------
 sdks/python/apache_beam/yaml/standard_io.yaml      |  38 --
 7 files changed, 2304 deletions(-)

diff --git 
a/playground/frontend/playground_components/assets/symbols/java.g.yaml 
b/playground/frontend/playground_components/assets/symbols/java.g.yaml
index 345e11071b9..e0f2b1d15d1 100644
--- a/playground/frontend/playground_components/assets/symbols/java.g.yaml
+++ b/playground/frontend/playground_components/assets/symbols/java.g.yaml
@@ -9031,26 +9031,6 @@ PubsubLiteIO:
   - expand
   - read
   - write
-PubsubLiteReadSchemaTransformProvider: 
-  methods: 
-  - build
-  - buildTransform
-  - builder
-  - expand
-  - from
-  - getDataFormat
-  - getLocation
-  - getProject
-  - getSchema
-  - getSubscriptionName
-  - identifier
-  - inputCollectionNames
-  - outputCollectionNames
-  - setDataFormat
-  - setLocation
-  - setProject
-  - setSchema
-  - setSubscriptionName
 PubsubLiteSink: 
   methods: 
   - finishBundle
@@ -9060,25 +9040,6 @@ PubsubLiteTableProvider:
   methods: 
   - buildBeamSqlTable
   - getTableType
-PubsubLiteWriteSchemaTransformProvider: 
-  methods: 
-  - build
-  - buildTransform
-  - builder
-  - expand
-  - getFormat
-  - getLocation
-  - getProject
-  - getTopicName
-  - identifier
-  - inputCollectionNames
-  - outputCollectionNames
-  - setFormat
-  - setLocation
-  - setProject
-  - setTopicName
-  properties: 
-  - SUPPORTED_FORMATS
 PubsubMessage: 
   methods: 
   - equals
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
deleted file mode 100644
index 61b94aeee44..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.CloudRegionOrZone;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.SubscriptionName;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.protobuf.ByteString;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Consumer;
-import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@AutoService(SchemaTransformProvider.class)
-public class PubsubLiteReadSchemaTransformProvider
-    extends TypedSchemaTransformProvider<
-        
PubsubLiteReadSchemaTransformProvider.PubsubLiteReadSchemaTransformConfiguration>
 {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class);
-
-  public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON,PROTO";
-  public static final Set<String> VALID_DATA_FORMATS =
-      Sets.newHashSet(VALID_FORMATS_STR.split(","));
-
-  public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
-  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
-
-  @Override
-  protected Class<PubsubLiteReadSchemaTransformConfiguration> 
configurationClass() {
-    return PubsubLiteReadSchemaTransformConfiguration.class;
-  }
-
-  @Override
-  public String description() {
-    return "Performs a read from Google Pub/Sub Lite.\n"
-        + "\n"
-        + "**Note**: This provider is deprecated. See Pub/Sub Lite <a 
href=\"https://cloud.google.com/pubsub/lite/docs\";>documentation</a> for more 
information.";
-  }
-
-  public static class ErrorFn extends DoFn<SequencedMessage, Row> {
-    private final SerializableFunction<byte[], Row> valueMapper;
-    private final Counter errorCounter;
-    private Long errorsInBundle = 0L;
-    private final boolean handleErrors;
-
-    private final List<String> attributes;
-
-    private final String attributeMap;
-
-    private final Schema errorSchema;
-
-    private final Schema attributeSchema;
-
-    public ErrorFn(
-        String name,
-        SerializableFunction<byte[], Row> valueMapper,
-        Schema errorSchema,
-        boolean handleErrors) {
-      this.errorCounter = 
Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, name);
-      this.valueMapper = valueMapper;
-      this.errorSchema = errorSchema;
-      this.handleErrors = handleErrors;
-      this.attributes = new ArrayList<>();
-      this.attributeMap = "";
-      this.attributeSchema = Schema.builder().build();
-    }
-
-    public ErrorFn(
-        String name,
-        SerializableFunction<byte[], Row> valueMapper,
-        Schema errorSchema,
-        List<String> attributes,
-        String attributeMap,
-        Schema attributeSchema,
-        boolean handleErrors) {
-      this.errorCounter = 
Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, name);
-      this.valueMapper = valueMapper;
-      this.errorSchema = errorSchema;
-      this.handleErrors = handleErrors;
-      this.attributes = attributes;
-      this.attributeMap = attributeMap;
-      this.attributeSchema = attributeSchema;
-    }
-
-    @ProcessElement
-    public void process(@DoFn.Element SequencedMessage seqMessage, 
MultiOutputReceiver receiver) {
-      Row mappedRow = null;
-      try {
-        if (attributes.isEmpty()
-            && attributeSchema.getFields().isEmpty()
-            && attributeMap.isEmpty()) {
-          mappedRow = 
valueMapper.apply(seqMessage.getMessage().getData().toByteArray());
-        } else {
-          PubSubMessage message = seqMessage.getMessage();
-          Row row = valueMapper.apply(message.getData().toByteArray());
-          Row.Builder rowBuilder = 
Row.withSchema(attributeSchema).addValues(row.getValues());
-          Map<String, String> stringAttributeMap = new HashMap<>();
-          message
-              .getAttributesMap()
-              .forEach(
-                  (attributeName, attributeValues) -> {
-                    if (attributes.contains(attributeName)) {
-                      processAttribute(attributeValues, rowBuilder::addValue);
-                    }
-
-                    if (!attributeMap.isEmpty()) {
-                      processAttribute(
-                          attributeValues, value -> 
stringAttributeMap.put(attributeName, value));
-                    }
-                  });
-          if (!attributeMap.isEmpty() && !stringAttributeMap.isEmpty()) {
-            rowBuilder.addValue(stringAttributeMap);
-          }
-          mappedRow = rowBuilder.build();
-        }
-      } catch (Exception e) {
-        if (!handleErrors) {
-          throw new RuntimeException(e);
-        }
-        errorsInBundle += 1;
-        LOG.warn("Error while parsing the element", e);
-        receiver
-            .get(ERROR_TAG)
-            .output(
-                ErrorHandling.errorRecord(
-                    errorSchema, 
seqMessage.getMessage().getData().toByteArray(), e));
-      }
-      if (mappedRow != null) {
-        receiver.get(OUTPUT_TAG).output(mappedRow);
-      }
-    }
-
-    @FinishBundle
-    public void finish(FinishBundleContext c) {
-      errorCounter.inc(errorsInBundle);
-      errorsInBundle = 0L;
-    }
-  }
-
-  @Override
-  public SchemaTransform from(PubsubLiteReadSchemaTransformConfiguration 
configuration) {
-    if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Format %s not supported. Only supported formats are %s",
-              configuration.getFormat(), VALID_FORMATS_STR));
-    }
-    boolean handleErrors = 
ErrorHandling.hasOutput(configuration.getErrorHandling());
-    String format = configuration.getFormat();
-    String inputSchema = configuration.getSchema();
-    List<String> attributes = configuration.getAttributes();
-    SerializableFunction<byte[], Row> valueMapper;
-    Schema beamSchema;
-
-    if (format != null && format.equals("RAW")) {
-
-      beamSchema = Schema.builder().addField("payload", 
Schema.FieldType.BYTES).build();
-      valueMapper = getRawBytesToRowFunction(beamSchema);
-
-    } else if (format != null && format.equals("PROTO")) {
-      String fileDescriptorPath = configuration.getFileDescriptorPath();
-      String messageName = configuration.getMessageName();
-
-      if (fileDescriptorPath != null && messageName != null) {
-        beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, 
messageName);
-        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
-      } else if (inputSchema != null && messageName != null) {
-        beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, 
messageName);
-        valueMapper = 
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
-      } else {
-        throw new IllegalArgumentException(
-            "To read from PubSubLite in PROTO format, either descriptorPath or 
schema must be provided.");
-      }
-
-    } else {
-      if (inputSchema != null) {
-        beamSchema =
-            Objects.equals(configuration.getFormat(), "JSON")
-                ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
-                : AvroUtils.toBeamSchema(new 
org.apache.avro.Schema.Parser().parse(inputSchema));
-        valueMapper =
-            Objects.equals(configuration.getFormat(), "JSON")
-                ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
-                : AvroUtils.getAvroBytesToRowFunction(beamSchema);
-      } else {
-        throw new IllegalArgumentException(
-            "To read from Pubsub Lite in JSON or AVRO format, you must provide 
a schema.");
-      }
-    }
-    return new SchemaTransform() {
-      @Override
-      public PCollectionRowTuple expand(PCollectionRowTuple input) {
-        String project = configuration.getProject();
-        if (Strings.isNullOrEmpty(project)) {
-          project = 
input.getPipeline().getOptions().as(GcpOptions.class).getProject();
-        }
-        if (project == null) {
-          throw new IllegalArgumentException(
-              "Unable to infer the project to read from Pubsub Lite. Please 
provide a project.");
-        }
-        Schema errorSchema = ErrorHandling.errorSchemaBytes();
-        List<String> attributeList = new ArrayList<>();
-        if (attributes != null) {
-          attributeList = attributes;
-        }
-        String attributeMapValue = configuration.getAttributeMap();
-        String attributeMap = attributeMapValue == null ? "" : 
attributeMapValue;
-        Schema resultingBeamSchema =
-            buildSchemaWithAttributes(beamSchema, attributeList, attributeMap);
-        PCollection<SequencedMessage> readPubsubLite =
-            input
-                .getPipeline()
-                .apply(
-                    PubsubLiteIO.read(
-                        SubscriberOptions.newBuilder()
-                            .setSubscriptionPath(
-                                SubscriptionPath.newBuilder()
-                                    .setLocation(
-                                        
CloudRegionOrZone.parse(configuration.getLocation()))
-                                    .setProject(ProjectId.of(project))
-                                    .setName(
-                                        
SubscriptionName.of(configuration.getSubscriptionName()))
-                                    .build())
-                            .build()));
-
-        String attributeId = configuration.getAttributeId();
-        PCollectionTuple outputTuple;
-        PCollection<SequencedMessage> transformSequencedMessage;
-        if (attributeId != null && !attributeId.isEmpty()) {
-          UuidDeduplicationOptions.Builder uuidExtractor =
-              UuidDeduplicationOptions.newBuilder()
-                  .setUuidExtractor(getUuidFromMessage(attributeId));
-          transformSequencedMessage =
-              
readPubsubLite.apply(PubsubLiteIO.deduplicate(uuidExtractor.build()));
-        } else {
-          transformSequencedMessage = readPubsubLite;
-        }
-
-        outputTuple =
-            transformSequencedMessage.apply(
-                ParDo.of(
-                        new ErrorFn(
-                            "PubsubLite-read-error-counter",
-                            valueMapper,
-                            errorSchema,
-                            attributeList,
-                            attributeMap,
-                            resultingBeamSchema,
-                            handleErrors))
-                    .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-        return PCollectionRowTuple.of(
-            "output",
-            outputTuple.get(OUTPUT_TAG).setRowSchema(resultingBeamSchema),
-            "errors",
-            outputTuple.get(ERROR_TAG).setRowSchema(errorSchema));
-      }
-    };
-  }
-
-  /**
-   * Builds a new {@link Schema} by adding additional optional attributes and 
map field to the
-   * provided schema.
-   *
-   * @param schema The base schema to which additional attributes and map 
field will be added.
-   * @param attributes A list of optional attribute names to be added as 
STRING fields to the
-   *     schema.
-   * @param attributesMap The name of the optional map field to be added to 
the schema. If empty, no
-   *     map field will be added.
-   * @return A new {@link Schema} with the specified attributes and an 
optional map field.
-   * @throws IllegalArgumentException if the schema is null or if any 
attribute name in the
-   *     attributes list is null or empty.
-   */
-  public static Schema buildSchemaWithAttributes(
-      Schema schema, List<String> attributes, String attributesMap) {
-    Schema.Builder schemaBuilder = Schema.builder();
-    // Copy fields from the original schema
-    schema.getFields().forEach(field -> 
schemaBuilder.addField(field.getName(), field.getType()));
-
-    // Add optional additional attributes as STRING fields
-    attributes.forEach(
-        attribute -> {
-          if (attribute == null || attribute.isEmpty()) {
-            throw new IllegalArgumentException(
-                "Attribute names in the attributes list must not be null or 
empty.");
-          }
-          schemaBuilder.addField(attribute, Schema.FieldType.STRING);
-        });
-
-    // Add an optional map field if attributesMap is not empty
-    if (!attributesMap.isEmpty()) {
-      schemaBuilder
-          .addMapField(attributesMap, Schema.FieldType.STRING, 
Schema.FieldType.STRING)
-          .build();
-    }
-    return schemaBuilder.build();
-  }
-
-  /**
-   * Processes the attribute values, invoking the specified consumer with the 
processed value. If
-   * the attribute values are null or contain multiple values, an exception is 
thrown.
-   *
-   * @param attributeValues The attribute values to be processed. If null, the 
method does nothing.
-   * @param valueConsumer The consumer to accept the processed value.
-   * @throws RuntimeException if attributeValues is not null and contains 
multiple values.
-   */
-  private static void processAttribute(
-      @Nullable AttributeValues attributeValues, Consumer<String> 
valueConsumer) {
-    if (attributeValues != null) {
-      List<ByteString> valueList = attributeValues.getValuesList();
-      if (valueList.size() != 1) {
-        throw new RuntimeException(
-            "Received an unparseable message with multiple values for an 
attribute.");
-      }
-      valueConsumer.accept(valueList.get(0).toStringUtf8());
-    }
-  }
-
-  public static SerializableFunction<byte[], Row> 
getRawBytesToRowFunction(Schema rawSchema) {
-    return new SimpleFunction<byte[], Row>() {
-      @Override
-      public Row apply(byte[] input) {
-        return Row.withSchema(rawSchema).addValue(input).build();
-      }
-    };
-  }
-
-  public static SerializableFunction<SequencedMessage, Uuid> 
getUuidFromMessage(
-      String attributeId) {
-    return new SimpleFunction<SequencedMessage, Uuid>() {
-      @Override
-      public Uuid apply(SequencedMessage input) {
-        AttributeValues attribute = 
input.getMessage().getAttributesMap().get(attributeId);
-        if (attribute != null) {
-          if (attribute.getValuesCount() != 1) {
-            throw new RuntimeException(
-                "Received an unparseable message with multiple values for an 
attribute.");
-          }
-          return Uuid.of(attribute.getValues(0));
-        } else {
-          throw new RuntimeException("Uuid attribute missing.");
-        }
-      }
-    };
-  }
-
-  @Override
-  public String identifier() {
-    return "beam:schematransform:org.apache.beam:pubsublite_read:v1";
-  }
-
-  @Override
-  public List<String> inputCollectionNames() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public List<String> outputCollectionNames() {
-    return Arrays.asList("output", "errors");
-  }
-
-  @AutoValue
-  @DefaultSchema(AutoValueSchema.class)
-  public abstract static class PubsubLiteReadSchemaTransformConfiguration {
-
-    public void validate() {
-      final String dataFormat = this.getFormat();
-      assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
-          : "Valid data formats are " + VALID_DATA_FORMATS;
-
-      final String inputSchema = this.getSchema();
-      final String messageName = this.getMessageName();
-
-      if (dataFormat != null && dataFormat.equals("RAW")) {
-        assert inputSchema == null
-            : "To read from Pubsub Lite in RAW format, you can't provide a 
schema.";
-      }
-
-      if (dataFormat != null && dataFormat.equals("PROTO")) {
-        assert messageName != null
-            : "To read from Pubsub Lite in PROTO format, messageName must be 
provided.";
-      }
-    }
-
-    @SchemaFieldDescription(
-        "The encoding format for the data stored in Pubsub Lite. Valid options 
are: "
-            + VALID_FORMATS_STR)
-    public abstract String getFormat();
-
-    @SchemaFieldDescription(
-        "The schema in which the data is encoded in the Pubsub Lite topic. "
-            + "For AVRO data, this is a schema defined with AVRO schema syntax 
"
-            + "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). "
-            + "For JSON data, this is a schema defined with JSON-schema syntax 
(https://json-schema.org/).")
-    public abstract @Nullable String getSchema();
-
-    @SchemaFieldDescription(
-        "The GCP project where the Pubsub Lite reservation resides. This can 
be a "
-            + "project number of a project ID.")
-    public abstract @Nullable String getProject();
-
-    @SchemaFieldDescription(
-        "The name of the subscription to consume data. This will be 
concatenated with "
-            + "the project and location parameters to build a full 
subscription path.")
-    public abstract String getSubscriptionName();
-
-    @SchemaFieldDescription("The region or zone where the Pubsub Lite 
reservation resides.")
-    public abstract String getLocation();
-
-    @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
-    public abstract @Nullable ErrorHandling getErrorHandling();
-
-    @SchemaFieldDescription(
-        "List of attribute keys whose values will be flattened into the "
-            + "output message as additional fields.  For example, if the 
format is `RAW` "
-            + "and attributes is `[\"a\", \"b\"]` then this read will produce 
elements of "
-            + "the form `Row(payload=..., a=..., b=...)`")
-    public abstract @Nullable List<String> getAttributes();
-
-    @SchemaFieldDescription(
-        "Name of a field in which to store the full set of attributes "
-            + "associated with this message.  For example, if the format is 
`RAW` and "
-            + "`attribute_map` is set to `\"attrs\"` then this read will 
produce elements "
-            + "of the form `Row(payload=..., attrs=...)` where `attrs` is a 
Map type "
-            + "of string to string. "
-            + "If both `attributes` and `attribute_map` are set, the 
overlapping "
-            + "attribute values will be present in both the flattened 
structure and the "
-            + "attribute map.")
-    public abstract @Nullable String getAttributeMap();
-
-    @SchemaFieldDescription(
-        "The attribute on incoming Pubsub Lite messages to use as a unique "
-            + "record identifier. When specified, the value of this attribute 
(which "
-            + "can be any string that uniquely identifies the record) will be 
used for "
-            + "deduplication of messages. If not provided, we cannot guarantee 
"
-            + "that no duplicate data will be delivered on the Pub/Sub stream. 
In this "
-            + "case, deduplication of the stream will be strictly best 
effort.")
-    public abstract @Nullable String getAttributeId();
-
-    @SchemaFieldDescription(
-        "The path to the Protocol Buffer File Descriptor Set file. This file 
is used for schema"
-            + " definition and message serialization.")
-    @Nullable
-    public abstract String getFileDescriptorPath();
-
-    @SchemaFieldDescription(
-        "The name of the Protocol Buffer message to be used for schema"
-            + " extraction and data conversion.")
-    @Nullable
-    public abstract String getMessageName();
-
-    public static Builder builder() {
-      return new 
AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration
-          .Builder();
-    }
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setFormat(String format);
-
-      public abstract Builder setSchema(String schema);
-
-      public abstract Builder setProject(String project);
-
-      public abstract Builder setSubscriptionName(String subscriptionName);
-
-      public abstract Builder setLocation(String location);
-
-      public abstract Builder setErrorHandling(ErrorHandling errorHandling);
-
-      public abstract Builder setAttributes(List<String> attributes);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setAttributeMap(String attributeMap);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setAttributeId(String attributeId);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setFileDescriptorPath(String fileDescriptorPath);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setMessageName(String messageName);
-
-      public abstract PubsubLiteReadSchemaTransformConfiguration build();
-    }
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
deleted file mode 100644
index 54ed7ac495d..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.CloudRegionOrZone;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.TopicName;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.protobuf.ByteString;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@AutoService(SchemaTransformProvider.class)
-public class PubsubLiteWriteSchemaTransformProvider
-    extends TypedSchemaTransformProvider<
-        
PubsubLiteWriteSchemaTransformProvider.PubsubLiteWriteSchemaTransformConfiguration>
 {
-
-  public static final String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO,PROTO";
-  public static final Set<String> SUPPORTED_FORMATS =
-      Sets.newHashSet(SUPPORTED_FORMATS_STR.split(","));
-  public static final TupleTag<PubSubMessage> OUTPUT_TAG = new 
TupleTag<PubSubMessage>() {};
-  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class);
-
-  @Override
-  protected Class<PubsubLiteWriteSchemaTransformConfiguration> 
configurationClass() {
-    return PubsubLiteWriteSchemaTransformConfiguration.class;
-  }
-
-  @Override
-  public String description() {
-    return "Performs a write to Google Pub/Sub Lite.\n"
-        + "\n"
-        + "**Note**: This provider is deprecated. See Pub/Sub Lite <a 
href=\"https://cloud.google.com/pubsub/lite/docs\";>documentation</a> for more 
information.";
-  }
-
-  public static class ErrorCounterFn extends DoFn<Row, PubSubMessage> {
-    private final SerializableFunction<Row, byte[]> toBytesFn;
-    private final Counter errorCounter;
-    private long errorsInBundle = 0L;
-
-    private final Schema errorSchema;
-
-    private final boolean handleErrors;
-
-    private final List<String> attributes;
-
-    private final Schema schemaWithoutAttributes;
-
-    public ErrorCounterFn(
-        String name,
-        SerializableFunction<Row, byte[]> toBytesFn,
-        Schema errorSchema,
-        boolean handleErrors) {
-      this.toBytesFn = toBytesFn;
-      errorCounter = 
Metrics.counter(PubsubLiteWriteSchemaTransformProvider.class, name);
-      this.errorSchema = errorSchema;
-      this.handleErrors = handleErrors;
-      this.attributes = new ArrayList<>();
-      this.schemaWithoutAttributes = Schema.builder().build();
-    }
-
-    public ErrorCounterFn(
-        String name,
-        SerializableFunction<Row, byte[]> toBytesFn,
-        Schema errorSchema,
-        boolean handleErrors,
-        List<String> attributes,
-        Schema schemaWithoutAttributes) {
-      this.toBytesFn = toBytesFn;
-      errorCounter = 
Metrics.counter(PubsubLiteWriteSchemaTransformProvider.class, name);
-      this.errorSchema = errorSchema;
-      this.handleErrors = handleErrors;
-      this.attributes = attributes;
-      this.schemaWithoutAttributes = schemaWithoutAttributes;
-    }
-
-    @ProcessElement
-    public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
-      try {
-        PubSubMessage message;
-        if (attributes.isEmpty()) {
-          message =
-              PubSubMessage.newBuilder()
-                  
.setData(ByteString.copyFrom(Objects.requireNonNull(toBytesFn.apply(row))))
-                  .build();
-        } else {
-          Row.Builder builder = Row.withSchema(schemaWithoutAttributes);
-          schemaWithoutAttributes
-              .getFields()
-              .forEach(field -> 
builder.addValue(row.getValue(field.getName())));
-
-          Row resultingRow = builder.build();
-          Map<String, AttributeValues> attributeValuesHashMap =
-              getStringAttributeValuesMap(row, attributes);
-          message =
-              PubSubMessage.newBuilder()
-                  .setData(
-                      
ByteString.copyFrom(Objects.requireNonNull(toBytesFn.apply(resultingRow))))
-                  .putAllAttributes(attributeValuesHashMap)
-                  .build();
-        }
-
-        receiver.get(OUTPUT_TAG).output(message);
-      } catch (Exception e) {
-        if (!handleErrors) {
-          throw new RuntimeException(e);
-        }
-        errorsInBundle += 1;
-        LOG.warn("Error while processing the element", e);
-        receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, 
row, e));
-      }
-    }
-
-    @FinishBundle
-    public void finish() {
-      errorCounter.inc(errorsInBundle);
-      errorsInBundle = 0L;
-    }
-  }
-
-  @Override
-  public SchemaTransform from(PubsubLiteWriteSchemaTransformConfiguration 
configuration) {
-
-    if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) {
-      throw new IllegalArgumentException(
-          "Format "
-              + configuration.getFormat()
-              + " is not supported. "
-              + "Supported formats are: "
-              + String.join(", ", SUPPORTED_FORMATS));
-    }
-
-    return new SchemaTransform() {
-      @Override
-      public PCollectionRowTuple expand(PCollectionRowTuple input) {
-        List<String> attributesConfigValue = configuration.getAttributes();
-        String attributeId = configuration.getAttributeId();
-        List<String> attributes =
-            attributesConfigValue != null ? attributesConfigValue : new 
ArrayList<>();
-        Schema inputSchema;
-        if (!attributes.isEmpty()) {
-          inputSchema = 
getSchemaWithoutAttributes(input.get("input").getSchema(), attributes);
-        } else {
-          inputSchema = input.get("input").getSchema();
-        }
-        ErrorHandling errorHandling = configuration.getErrorHandling();
-        boolean handleErrors = ErrorHandling.hasOutput(errorHandling);
-        Schema errorSchema = ErrorHandling.errorSchema(inputSchema);
-
-        final SerializableFunction<Row, byte[]> toBytesFn;
-        if (configuration.getFormat().equals("RAW")) {
-          int numFields = inputSchema.getFields().size();
-          if (numFields != 1) {
-            throw new IllegalArgumentException("Expecting exactly one field, 
found " + numFields);
-          }
-          if 
(!inputSchema.getField(0).getType().equals(Schema.FieldType.BYTES)) {
-            throw new IllegalArgumentException(
-                "The input schema must have exactly one field of type byte.");
-          }
-          toBytesFn = 
getRowToRawBytesFunction(inputSchema.getField(0).getName());
-        } else if (configuration.getFormat().equals("PROTO")) {
-          String descriptorPath = configuration.getFileDescriptorPath();
-          String schema = configuration.getSchema();
-          String messageName = configuration.getMessageName();
-
-          if (descriptorPath != null && messageName != null) {
-            toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath, 
messageName);
-          } else if (schema != null && messageName != null) {
-            toBytesFn = ProtoByteUtils.getRowToProtoBytesFromSchema(schema, 
messageName);
-          } else {
-            throw new IllegalArgumentException(
-                "At least a descriptorPath or a PROTO schema is required.");
-          }
-        } else if (configuration.getFormat().equals("JSON")) {
-          toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
-        } else {
-          toBytesFn = AvroUtils.getRowToAvroBytesFunction(inputSchema);
-        }
-
-        PCollectionTuple outputTuple =
-            input
-                .get("input")
-                .apply(
-                    "Map Rows to PubSubMessages",
-                    ParDo.of(
-                            new ErrorCounterFn(
-                                "PubSubLite-write-error-counter",
-                                toBytesFn,
-                                errorSchema,
-                                handleErrors,
-                                attributes,
-                                inputSchema))
-                        .withOutputTags(OUTPUT_TAG, 
TupleTagList.of(ERROR_TAG)));
-
-        outputTuple
-            .get(OUTPUT_TAG)
-            .apply(
-                "Add UUIDs",
-                (attributeId != null && !attributeId.isEmpty())
-                    ? new SetUuidFromPubSubMessage(attributeId)
-                    : PubsubLiteIO.addUuids())
-            .apply(
-                "Write to PS Lite",
-                PubsubLiteIO.write(
-                    PublisherOptions.newBuilder()
-                        .setTopicPath(
-                            TopicPath.newBuilder()
-                                
.setProject(ProjectId.of(configuration.getProject()))
-                                
.setName(TopicName.of(configuration.getTopicName()))
-                                
.setLocation(CloudRegionOrZone.parse(configuration.getLocation()))
-                                .build())
-                        .build()));
-
-        PCollection<Row> errorOutput =
-            
outputTuple.get(ERROR_TAG).setRowSchema(ErrorHandling.errorSchema(errorSchema));
-
-        String outputString = errorHandling != null ? 
errorHandling.getOutput() : "errors";
-        return PCollectionRowTuple.of(handleErrors ? outputString : "errors", 
errorOutput);
-      }
-    };
-  }
-
-  public static Schema getSchemaWithoutAttributes(Schema inputSchema, 
List<String> attributes) {
-    Schema.Builder schemaBuilder = Schema.builder();
-
-    inputSchema
-        .getFields()
-        .forEach(
-            field -> {
-              if (!attributes.contains(field.getName())) {
-                schemaBuilder.addField(field.getName(), field.getType());
-              }
-            });
-    return schemaBuilder.build();
-  }
-
-  private static Map<String, AttributeValues> getStringAttributeValuesMap(
-      Row row, List<String> attributes) {
-    Map<String, AttributeValues> attributeValuesHashMap = new HashMap<>();
-    attributes.forEach(
-        attribute -> {
-          String value = row.getValue(attribute);
-          if (value != null) {
-            attributeValuesHashMap.put(
-                attribute,
-                
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8(value)).build());
-          }
-        });
-    return attributeValuesHashMap;
-  }
-
-  public static SerializableFunction<Row, byte[]> 
getRowToRawBytesFunction(String rowFieldName) {
-    return new SimpleFunction<Row, byte[]>() {
-      @Override
-      public byte[] apply(Row input) {
-        byte[] rawBytes = input.getBytes(rowFieldName);
-        if (rawBytes == null) {
-          throw new NullPointerException();
-        }
-        return rawBytes;
-      }
-    };
-  }
-
-  @Override
-  public String identifier() {
-    return "beam:schematransform:org.apache.beam:pubsublite_write:v1";
-  }
-
-  @Override
-  public List<String> inputCollectionNames() {
-    return Collections.singletonList("input");
-  }
-
-  @Override
-  public List<String> outputCollectionNames() {
-    return Collections.singletonList("errors");
-  }
-
-  @AutoValue
-  @DefaultSchema(AutoValueSchema.class)
-  public abstract static class PubsubLiteWriteSchemaTransformConfiguration {
-
-    public void validate() {
-      final String dataFormat = this.getFormat();
-      final String inputSchema = this.getSchema();
-      final String messageName = this.getMessageName();
-      final String descriptorPath = this.getFileDescriptorPath();
-
-      if (dataFormat != null && dataFormat.equals("PROTO")) {
-        assert messageName != null : "Expecting messageName to be non-null.";
-        assert descriptorPath != null && inputSchema != null
-            : "You must include a descriptorPath or a PROTO schema but not 
both.";
-      }
-    }
-
-    @SchemaFieldDescription(
-        "The GCP project where the Pubsub Lite reservation resides. This can 
be a "
-            + "project number of a project ID.")
-    public abstract String getProject();
-
-    @SchemaFieldDescription("The region or zone where the Pubsub Lite 
reservation resides.")
-    public abstract String getLocation();
-
-    @SchemaFieldDescription(
-        "The name of the topic to publish data into. This will be concatenated 
with "
-            + "the project and location parameters to build a full topic 
path.")
-    public abstract String getTopicName();
-
-    @SchemaFieldDescription(
-        "The encoding format for the data stored in Pubsub Lite. Valid options 
are: "
-            + SUPPORTED_FORMATS_STR)
-    public abstract String getFormat();
-
-    @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
-    public abstract @Nullable ErrorHandling getErrorHandling();
-
-    @SchemaFieldDescription(
-        "List of attribute keys whose values will be pulled out as "
-            + "Pubsub Lite message attributes.  For example, if the format is 
`JSON` "
-            + "and attributes is `[\"a\", \"b\"]` then elements of the form "
-            + "`Row(any_field=..., a=..., b=...)` will result in Pubsub Lite 
messages whose "
-            + "payload has the contents of any_field and whose attribute will 
be "
-            + "populated with the values of `a` and `b`.")
-    public abstract @Nullable List<String> getAttributes();
-
-    @SchemaFieldDescription(
-        "If set, will set an attribute for each Pubsub Lite message "
-            + "with the given name and a unique value. This attribute can then 
be used "
-            + "in a ReadFromPubSubLite PTransform to deduplicate messages.")
-    public abstract @Nullable String getAttributeId();
-
-    @SchemaFieldDescription(
-        "The path to the Protocol Buffer File Descriptor Set file. This file 
is used for schema"
-            + " definition and message serialization.")
-    public abstract @Nullable String getFileDescriptorPath();
-
-    @SchemaFieldDescription(
-        "The name of the Protocol Buffer message to be used for schema"
-            + " extraction and data conversion.")
-    public abstract @Nullable String getMessageName();
-
-    public abstract @Nullable String getSchema();
-
-    public static Builder builder() {
-      return new 
AutoValue_PubsubLiteWriteSchemaTransformProvider_PubsubLiteWriteSchemaTransformConfiguration
-          .Builder();
-    }
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setProject(String project);
-
-      public abstract Builder setLocation(String location);
-
-      public abstract Builder setTopicName(String topicName);
-
-      public abstract Builder setFormat(String format);
-
-      public abstract Builder setErrorHandling(ErrorHandling errorHandling);
-
-      public abstract Builder setAttributes(List<String> attributes);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setAttributeId(String attributeId);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setFileDescriptorPath(String fileDescriptorPath);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setMessageName(String messageName);
-
-      @SuppressWarnings("unused")
-      public abstract Builder setSchema(String schema);
-
-      public abstract PubsubLiteWriteSchemaTransformConfiguration build();
-    }
-  }
-
-  public static class SetUuidFromPubSubMessage
-      extends PTransform<PCollection<PubSubMessage>, 
PCollection<PubSubMessage>> {
-    private final String attributeId;
-
-    public SetUuidFromPubSubMessage(String attributeId) {
-      this.attributeId = attributeId;
-    }
-
-    @Override
-    public PCollection<PubSubMessage> expand(PCollection<PubSubMessage> input) 
{
-      return input.apply("SetUuidFromPubSubMessage", ParDo.of(new 
SetUuidFn(attributeId)));
-    }
-
-    public static class SetUuidFn extends DoFn<PubSubMessage, PubSubMessage> {
-      private final String attributeId;
-
-      public SetUuidFn(String attributeId) {
-        this.attributeId = attributeId;
-      }
-
-      @ProcessElement
-      public void processElement(
-          @Element PubSubMessage input, OutputReceiver<PubSubMessage> 
outputReceiver) {
-        PubSubMessage.Builder builder = input.toBuilder();
-        builder.putAttributes(
-            attributeId, 
AttributeValues.newBuilder().addValues(Uuid.random().value()).build());
-        outputReceiver.output(builder.build());
-      }
-    }
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
deleted file mode 100644
index 1f2819940ff..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
-import com.google.cloud.pubsublite.AdminClient;
-import com.google.cloud.pubsublite.AdminClientSettings;
-import com.google.cloud.pubsublite.BacklogLocation;
-import com.google.cloud.pubsublite.CloudZone;
-import com.google.cloud.pubsublite.Message;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.SubscriptionName;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.TopicName;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.cloud.pubsublite.proto.Subscription;
-import 
com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
-import com.google.cloud.pubsublite.proto.Topic;
-import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
-import com.google.protobuf.ByteString;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(JUnit4.class)
-public class ReadWriteIT {
-  private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
-  private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
-  private static final int MESSAGE_COUNT = 90;
-  private static final Schema SAMPLE_BEAM_SCHEMA =
-      
Schema.builder().addStringField("numberInString").addInt32Field("numberInInt").build();
-
-  @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
-  private static ProjectId getProject(PipelineOptions options) {
-    return 
ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject()));
-  }
-
-  private static String randomName() {
-    return "beam_it_resource_" + ThreadLocalRandom.current().nextLong();
-  }
-
-  private static AdminClient newAdminClient() {
-    return 
AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build());
-  }
-
-  private final Deque<Runnable> cleanupActions = new ArrayDeque<>();
-
-  private TopicPath createTopic(ProjectId id) throws Exception {
-    TopicPath toReturn =
-        TopicPath.newBuilder()
-            .setProject(id)
-            .setLocation(ZONE)
-            .setName(TopicName.of(randomName()))
-            .build();
-    Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString());
-    topic
-        .getPartitionConfigBuilder()
-        .setCount(2)
-        
.setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4));
-    topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30));
-    cleanupActions.addLast(
-        () -> {
-          try (AdminClient client = newAdminClient()) {
-            client.deleteTopic(toReturn).get();
-          } catch (Throwable t) {
-            LOG.error("Failed to clean up topic.", t);
-          }
-        });
-    LOG.info("Creating topic named {}", toReturn);
-    try (AdminClient client = newAdminClient()) {
-      client.createTopic(topic.build()).get();
-    }
-    return toReturn;
-  }
-
-  private SubscriptionPath createSubscription(TopicPath topic) throws 
Exception {
-    SubscriptionPath toReturn =
-        SubscriptionPath.newBuilder()
-            .setProject(topic.project())
-            .setLocation(ZONE)
-            .setName(SubscriptionName.of(randomName()))
-            .build();
-    Subscription.Builder subscription = 
Subscription.newBuilder().setName(toReturn.toString());
-    subscription
-        .getDeliveryConfigBuilder()
-        .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY);
-    subscription.setTopic(topic.toString());
-    cleanupActions.addLast(
-        () -> {
-          try (AdminClient client = newAdminClient()) {
-            client.deleteSubscription(toReturn).get();
-          } catch (Throwable t) {
-            LOG.error("Failed to clean up subscription.", t);
-          }
-        });
-    LOG.info("Creating subscription named {} from topic {}", toReturn, topic);
-    try (AdminClient client = newAdminClient()) {
-      client.createSubscription(subscription.build(), 
BacklogLocation.BEGINNING).get();
-    }
-    return toReturn;
-  }
-
-  @After
-  public void tearDown() {
-    while (!cleanupActions.isEmpty()) {
-      cleanupActions.removeLast().run();
-    }
-  }
-
-  // Workaround for https://github.com/apache/beam/issues/21257
-  // TODO(https://github.com/apache/beam/issues/21257): Remove this.
-  private static class CustomCreate extends PTransform<PCollection<Void>, 
PCollection<Integer>> {
-    @Override
-    public PCollection<Integer> expand(PCollection<Void> input) {
-      return input.apply(
-          "createIndexes",
-          FlatMapElements.via(
-              new SimpleFunction<Void, Iterable<Integer>>() {
-                @Override
-                public Iterable<Integer> apply(Void input) {
-                  return IntStream.range(0, 
MESSAGE_COUNT).boxed().collect(Collectors.toList());
-                }
-              }));
-    }
-  }
-
-  public static void writeJsonMessages(TopicPath topicPath, Pipeline pipeline) 
{
-    PCollectionRowTuple.of(
-            "input",
-            pipeline
-                .apply(Create.of((Void) null))
-                .apply("createIndexes", new CustomCreate())
-                .apply(
-                    "format to rows",
-                    MapElements.via(
-                        new SimpleFunction<Integer, Row>(
-                            index ->
-                                Row.withSchema(SAMPLE_BEAM_SCHEMA)
-                                    
.addValue(Objects.requireNonNull(index).toString())
-                                    .addValue(index)
-                                    .build()) {}))
-                .setRowSchema(SAMPLE_BEAM_SCHEMA))
-        .apply(
-            "write to pslite",
-            new PubsubLiteWriteSchemaTransformProvider()
-                .from(
-                    PubsubLiteWriteSchemaTransformProvider
-                        .PubsubLiteWriteSchemaTransformConfiguration.builder()
-                        .setFormat("JSON")
-                        .setLocation(ZONE.toString())
-                        .setTopicName(topicPath.name().value())
-                        .setProject(topicPath.project().name().value())
-                        .build()));
-  }
-
-  public static void writeMessages(TopicPath topicPath, Pipeline pipeline) {
-    PCollection<Void> trigger = pipeline.apply(Create.of((Void) null));
-    PCollection<Integer> indexes = trigger.apply("createIndexes", new 
CustomCreate());
-    PCollection<PubSubMessage> messages =
-        indexes.apply(
-            "createMessages",
-            MapElements.via(
-                new SimpleFunction<Integer, PubSubMessage>(
-                    index ->
-                        Message.builder()
-                            .setData(ByteString.copyFromUtf8(index.toString()))
-                            .build()
-                            .toProto()) {}));
-    // Add UUIDs to messages for later deduplication.
-    messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
-    messages.apply(
-        "writeMessages",
-        
PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
-  }
-
-  public static PCollection<SequencedMessage> readMessages(
-      SubscriptionPath subscriptionPath, Pipeline pipeline) {
-    PCollection<SequencedMessage> messages =
-        pipeline.apply(
-            "readMessages",
-            PubsubLiteIO.read(
-                
SubscriberOptions.newBuilder().setSubscriptionPath(subscriptionPath).build()));
-    return messages;
-    // TODO(https://github.com/apache/beam/issues/21157): Fix and re-enable
-    // Deduplicate messages based on the uuids added in 
PubsubLiteIO.addUuids() when writing.
-    // return messages.apply(
-    //   "dedupeMessages", 
PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build()));
-  }
-
-  public static SimpleFunction<SequencedMessage, Integer> extractIds() {
-    return new SimpleFunction<SequencedMessage, Integer>() {
-      @Override
-      public Integer apply(SequencedMessage input) {
-        return Integer.parseInt(input.getMessage().getData().toStringUtf8());
-      }
-    };
-  }
-
-  public static SerializableFunction<Set<Integer>, Boolean> testIds() {
-    return ids -> {
-      LOG.debug("Ids are: {}", ids);
-      Set<Integer> target = IntStream.range(0, 
MESSAGE_COUNT).boxed().collect(Collectors.toSet());
-      return target.equals(ids);
-    };
-  }
-
-  @Test
-  public void testPubsubLiteWriteReadWithSchemaTransform() throws Exception {
-    pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
-    pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
-    TopicPath topic = createTopic(getProject(pipeline.getOptions()));
-    SubscriptionPath subscription = null;
-    Exception lastException = null;
-    for (int i = 0; i < 30; ++i) {
-      // Sleep for topic creation to propagate.
-      Thread.sleep(1000);
-      try {
-        subscription = createSubscription(topic);
-        break;
-      } catch (Exception e) {
-        lastException = e;
-        LOG.info("Retrying exception on subscription creation.", e);
-      }
-    }
-    if (subscription == null) {
-      throw lastException;
-    }
-
-    // Publish some messages
-    writeJsonMessages(topic, pipeline);
-
-    // Read some messages. They should be deduplicated by the time we see 
them, so there should be
-    // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
-    PCollection<Row> messages =
-        PCollectionRowTuple.empty(pipeline)
-            .apply(
-                "read from pslite",
-                new PubsubLiteReadSchemaTransformProvider()
-                    .from(
-                        PubsubLiteReadSchemaTransformProvider
-                            
.PubsubLiteReadSchemaTransformConfiguration.builder()
-                            .setFormat("JSON")
-                            .setSchema(
-                                "{\n"
-                                    + "  \"properties\": {\n"
-                                    + "    \"numberInString\": {\n"
-                                    + "      \"type\": \"string\"\n"
-                                    + "    },\n"
-                                    + "    \"numberInInt\": {\n"
-                                    + "      \"type\": \"integer\"\n"
-                                    + "    }\n"
-                                    + "  }\n"
-                                    + "}")
-                            .setSubscriptionName(subscription.name().value())
-                            .setLocation(subscription.location().toString())
-                            .build()))
-            .get("output");
-    PCollection<Integer> ids =
-        messages.apply(
-            "get ints",
-            MapElements.into(TypeDescriptors.integers())
-                .via(
-                    row -> {
-                      return 
Objects.requireNonNull(row.getInt64("numberInInt")).intValue();
-                    }));
-    ids.apply("PubsubSignalTest", 
signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
-    Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(8));
-    pipeline.apply("start signal", signal.signalStart());
-    PipelineResult job = pipeline.run();
-    start.get();
-    LOG.info("Running!");
-    signal.waitForSuccess(Duration.standardMinutes(5));
-    // A runner may not support cancel
-    try {
-      job.cancel();
-    } catch (UnsupportedOperationException exc) {
-      // noop
-    }
-  }
-
-  @Test
-  public void testReadWrite() throws Exception {
-    pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
-    pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
-    TopicPath topic = createTopic(getProject(pipeline.getOptions()));
-    SubscriptionPath subscription = null;
-    Exception lastException = null;
-    for (int i = 0; i < 30; ++i) {
-      // Sleep for topic creation to propagate.
-      Thread.sleep(1000);
-      try {
-        subscription = createSubscription(topic);
-        break;
-      } catch (Exception e) {
-        lastException = e;
-        LOG.info("Retrying exception on subscription creation.", e);
-      }
-    }
-    if (subscription == null) {
-      throw lastException;
-    }
-
-    // Publish some messages
-    writeMessages(topic, pipeline);
-
-    // Read some messages. They should be deduplicated by the time we see 
them, so there should be
-    // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
-    PCollection<SequencedMessage> messages = readMessages(subscription, 
pipeline);
-    PCollection<Integer> ids = messages.apply(MapElements.via(extractIds()));
-    ids.apply("PubsubSignalTest", 
signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
-    Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(8));
-    pipeline.apply(signal.signalStart());
-    PipelineResult job = pipeline.run();
-    start.get();
-    LOG.info("Running!");
-    signal.waitForSuccess(Duration.standardMinutes(5));
-    // A runner may not support cancel
-    try {
-      job.cancel();
-    } catch (UnsupportedOperationException exc) {
-      // noop
-    }
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
deleted file mode 100644
index 3d0ba336eee..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite.internal;
-
-import static 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.getRawBytesToRowFunction;
-import static 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.getUuidFromMessage;
-
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.protobuf.ByteString;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
-import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider;
-import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.ErrorFn;
-import org.apache.beam.sdk.io.gcp.pubsublite.UuidDeduplicationOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class PubsubLiteDlqTest {
-
-  private static final TupleTag<Row> OUTPUT_TAG = 
PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG;
-  private static final TupleTag<Row> ERROR_TAG = 
PubsubLiteReadSchemaTransformProvider.ERROR_TAG;
-
-  private static final Schema BEAM_RAW_SCHEMA =
-      Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
-  private static final Schema BEAM_SCHEMA =
-      Schema.of(Schema.Field.of("name", Schema.FieldType.STRING));
-
-  private static final Schema BEAM_SCHEMA_ATTRIBUTES =
-      Schema.of(
-          Schema.Field.of("name", Schema.FieldType.STRING),
-          Schema.Field.of("key1", Schema.FieldType.STRING),
-          Schema.Field.of("key2", Schema.FieldType.STRING));
-
-  private static final Schema BEAM_SCHEMA_ATTRIBUTES_AND_MAP =
-      Schema.of(
-          Schema.Field.of("name", Schema.FieldType.STRING),
-          Schema.Field.of("key1", Schema.FieldType.STRING),
-          Schema.Field.of("key2", Schema.FieldType.STRING),
-          Schema.Field.of(
-              "attrs", Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.STRING)));
-
-  private static final Schema BEAM_SCHEMA_ATTRIBUTES_MAP =
-      Schema.of(
-          Schema.Field.of("name", Schema.FieldType.STRING),
-          Schema.Field.of(
-              "attrs", Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.STRING)));
-
-  private static final Map<String, String> STATIC_MAP;
-
-  static {
-    Map<String, String> tempMap = new HashMap<>();
-    tempMap.put("key1", "first_key");
-    tempMap.put("key2", "second_key");
-    STATIC_MAP = Collections.unmodifiableMap(tempMap);
-  }
-
-  private static final List<Row> RAW_ROWS;
-
-  static {
-    try {
-      RAW_ROWS =
-          Arrays.asList(
-              Row.withSchema(BEAM_RAW_SCHEMA)
-                  .withFieldValue("payload", "a".getBytes("UTF-8"))
-                  .build(),
-              Row.withSchema(BEAM_RAW_SCHEMA)
-                  .withFieldValue("payload", "b".getBytes("UTF-8"))
-                  .build(),
-              Row.withSchema(BEAM_RAW_SCHEMA)
-                  .withFieldValue("payload", "c".getBytes("UTF-8"))
-                  .build());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static final List<Row> ROWS_WITH_ATTRIBUTES =
-      Arrays.asList(
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
-              .withFieldValue("name", "a")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
-              .withFieldValue("name", "b")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
-              .withFieldValue("name", "c")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .build());
-  private static final List<Row> ROWS_WITH_ATTRIBUTES_MAP =
-      Arrays.asList(
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP)
-              .withFieldValue("name", "a")
-              .withFieldValue("attrs", STATIC_MAP)
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP)
-              .withFieldValue("name", "b")
-              .withFieldValue("attrs", STATIC_MAP)
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP)
-              .withFieldValue("name", "c")
-              .withFieldValue("attrs", STATIC_MAP)
-              .build());
-  private static final List<Row> ROWS_WITH_ATTRIBUTES_AND_MAP =
-      Arrays.asList(
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP)
-              .withFieldValue("name", "a")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .withFieldValue("attrs", STATIC_MAP)
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP)
-              .withFieldValue("name", "b")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .withFieldValue("attrs", STATIC_MAP)
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP)
-              .withFieldValue("name", "c")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .withFieldValue("attrs", STATIC_MAP)
-              .build());
-
-  private static final List<Row> ROWS =
-      Arrays.asList(
-          Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "a").build(),
-          Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "b").build(),
-          Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "c").build());
-
-  private static final Map<String, AttributeValues> ATTRIBUTE_VALUES_MAP = new 
HashMap<>();
-
-  static {
-    ATTRIBUTE_VALUES_MAP.put(
-        "key1",
-        
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("first_key")).build());
-    ATTRIBUTE_VALUES_MAP.put(
-        "key2",
-        
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("second_key")).build());
-  }
-
-  private static final List<SequencedMessage> MESSAGES =
-      Arrays.asList(
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("{\"name\":\"a\"}"))
-                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-                      .build())
-              .build(),
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("{\"name\":\"b\"}"))
-                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-                      .build())
-              .build(),
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("{\"name\":\"c\"}"))
-                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-                      .build())
-              .build());
-
-  private static final List<SequencedMessage> RAW_MESSAGES =
-      Arrays.asList(
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("a"))
-                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-                      .build())
-              .build(),
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("b"))
-                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-                      .build())
-              .build(),
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("c"))
-                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-                      .build())
-              .build());
-
-  private static final List<SequencedMessage> MESSAGESWITHERROR =
-      Arrays.asList(
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("{\"error\":\"a\"}"))
-                      .build())
-              .build(),
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("{\"error\":\"b\"}"))
-                      .build())
-              .build(),
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFromUtf8("{\"error\":\"c\"}"))
-                      .build())
-              .build());
-
-  private static final String PROTO_STRING_SCHEMA =
-      "syntax = \"proto3\";\n"
-          + "package com.test.proto;"
-          + "\n"
-          + "message MyMessage {\n"
-          + "  int32 id = 1;\n"
-          + "  string name = 2;\n"
-          + "  bool active = 3;\n"
-          + "\n"
-          + "  // Nested field\n"
-          + "  message Address {\n"
-          + "    string street = 1;\n"
-          + "    string city = 2;\n"
-          + "    string state = 3;\n"
-          + "    string zip_code = 4;\n"
-          + "  }\n"
-          + "\n"
-          + "  Address address = 4;\n"
-          + "}";
-
-  private static final Schema BEAM_PROTO_SCHEMA =
-      Schema.builder()
-          .addField("id", Schema.FieldType.INT32)
-          .addField("name", Schema.FieldType.STRING)
-          .addField("active", Schema.FieldType.BOOLEAN)
-          .addField(
-              "address",
-              Schema.FieldType.row(
-                  Schema.builder()
-                      .addField("street", Schema.FieldType.STRING)
-                      .addField("city", Schema.FieldType.STRING)
-                      .addField("state", Schema.FieldType.STRING)
-                      .addField("zip_code", Schema.FieldType.STRING)
-                      .build()))
-          .build();
-
-  private static final Row INPUT_ROW =
-      Row.withSchema(BEAM_PROTO_SCHEMA)
-          .withFieldValue("id", 1234)
-          .withFieldValue("name", "Doe")
-          .withFieldValue("active", false)
-          .withFieldValue("address.city", "seattle")
-          .withFieldValue("address.street", "fake street")
-          .withFieldValue("address.zip_code", "TO-1234")
-          .withFieldValue("address.state", "wa")
-          .build();
-  private static final SerializableFunction<Row, byte[]> INPUT_MAPPER =
-      ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, 
"com.test.proto.MyMessage");
-
-  private static final byte[] INPUT_SOURCE = INPUT_MAPPER.apply(INPUT_ROW);
-
-  private static final List<SequencedMessage> INPUT_MESSAGES =
-      Collections.singletonList(
-          SequencedMessage.newBuilder()
-              .setMessage(
-                  PubSubMessage.newBuilder()
-                      .setData(ByteString.copyFrom(INPUT_SOURCE))
-                      .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-                      .build())
-              .build());
-
-  final SerializableFunction<byte[], Row> valueMapper =
-      JsonUtils.getJsonBytesToRowFunction(BEAM_SCHEMA);
-
-  @Rule public transient TestPipeline p = TestPipeline.create();
-
-  @Test
-  public void testPubsubLiteErrorFnSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper, 
errorSchema, Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnFailure() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    PCollection<SequencedMessage> input = 
p.apply(Create.of(MESSAGESWITHERROR));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper, 
errorSchema, Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PCollection<Long> count = output.get(ERROR_TAG).apply("error_count", 
Count.globally());
-
-    PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L));
-
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnRawSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
-    List<String> attributes = new ArrayList<>();
-    String attributesMap = "";
-    Schema beamAttributeSchema =
-        PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
-            BEAM_RAW_SCHEMA, attributes, attributesMap);
-    SerializableFunction<byte[], Row> rawValueMapper = 
getRawBytesToRowFunction(BEAM_RAW_SCHEMA);
-    PCollection<SequencedMessage> input = p.apply(Create.of(RAW_MESSAGES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(new ErrorFn("Read-Error-Counter", rawValueMapper, 
errorSchema, Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(RAW_ROWS);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnWithAttributesSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    List<String> attributes = new ArrayList<>();
-    attributes.add("key1");
-    attributes.add("key2");
-    String attributeMap = "";
-    Schema beamAttributeSchema =
-        PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
-            BEAM_SCHEMA, attributes, attributeMap);
-
-    PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(
-                    new ErrorFn(
-                        "Read-Error-Counter",
-                        valueMapper,
-                        errorSchema,
-                        attributes,
-                        attributeMap,
-                        beamAttributeSchema,
-                        Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnWithAttributeMapSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    // empty list of attributes
-    List<String> attributes = new ArrayList<>();
-    String attributeMap = "attrs";
-    Schema beamAttributeSchema =
-        PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
-            BEAM_SCHEMA, attributes, attributeMap);
-
-    PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(
-                    new ErrorFn(
-                        "Read-Error-Counter",
-                        valueMapper,
-                        errorSchema,
-                        attributes,
-                        attributeMap,
-                        beamAttributeSchema,
-                        Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES_MAP);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnWithAttributesAndAttributeMapSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    List<String> attributes = new ArrayList<>();
-    attributes.add("key1");
-    attributes.add("key2");
-    String attributeMap = "attrs";
-    Schema beamAttributeSchema =
-        PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
-            BEAM_SCHEMA, attributes, attributeMap);
-
-    PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(
-                    new ErrorFn(
-                        "Read-Error-Counter",
-                        valueMapper,
-                        errorSchema,
-                        attributes,
-                        attributeMap,
-                        beamAttributeSchema,
-                        Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES_AND_MAP);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnWithAttributesFailure() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    List<String> attributes = new ArrayList<>();
-    attributes.add("randomKey1");
-    attributes.add("randomKey2");
-    String attributeMap = "";
-    Schema beamAttributeSchema =
-        PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
-            BEAM_SCHEMA, attributes, attributeMap);
-
-    PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(
-                    new ErrorFn(
-                        "Read-Error-Counter",
-                        valueMapper,
-                        errorSchema,
-                        attributes,
-                        attributeMap,
-                        beamAttributeSchema,
-                        Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PCollection<Long> count = output.get(ERROR_TAG).apply("error_count", 
Count.globally());
-
-    PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L));
-
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnWithDedupingSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
-    PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
-    UuidDeduplicationOptions.Builder uuidExtractor =
-        
UuidDeduplicationOptions.newBuilder().setUuidExtractor(getUuidFromMessage("key1"));
-    PCollectionTuple output =
-        input
-            .apply(PubsubLiteIO.deduplicate(uuidExtractor.build()))
-            .apply(
-                ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper, 
errorSchema, Boolean.TRUE))
-                    .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PCollection<Long> count = output.get(OUTPUT_TAG).apply("error_count", 
Count.globally());
-
-    // We are deduping so we should only have 1 value
-    PAssert.that(count).containsInAnyOrder(Collections.singletonList(1L));
-
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubSubLiteErrorFnReadProto() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
-    List<String> attributes = new ArrayList<>();
-    String attributesMap = "";
-    Schema beamAttributeSchema =
-        PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
-            BEAM_PROTO_SCHEMA, attributes, attributesMap);
-
-    SerializableFunction<byte[], Row> protoValueMapper =
-        ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(
-            PROTO_STRING_SCHEMA, "com.test.proto.MyMessage");
-
-    PCollection<SequencedMessage> input = p.apply(Create.of(INPUT_MESSAGES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(new ErrorFn("Read-Error-Counter", protoValueMapper, 
errorSchema, Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    // Unexpected behaviors occur if the PCollection schem differs from the 
schema generated in the
-    // conversion from Proto to Row.
-    output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(INPUT_ROW);
-    p.run().waitUntilFinish();
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
deleted file mode 100644
index 5afa4b7e516..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite.internal;
-
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.protobuf.ByteString;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider;
-import 
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider.ErrorCounterFn;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class PubsubLiteWriteDlqTest {
-
-  private static final TupleTag<PubSubMessage> OUTPUT_TAG =
-      PubsubLiteWriteSchemaTransformProvider.OUTPUT_TAG;
-  private static final TupleTag<Row> ERROR_TAG = 
PubsubLiteWriteSchemaTransformProvider.ERROR_TAG;
-
-  private static final Schema BEAM_SCHEMA =
-      Schema.of(Schema.Field.of("name", Schema.FieldType.STRING));
-
-  private static final Schema BEAM_RAW_SCHEMA =
-      Schema.of(Schema.Field.of("payload", Schema.FieldType.BYTES));
-
-  private static final Schema BEAM_SCHEMA_ATTRIBUTES =
-      Schema.of(
-          Schema.Field.of("name", Schema.FieldType.STRING),
-          Schema.Field.of("key1", Schema.FieldType.STRING),
-          Schema.Field.of("key2", Schema.FieldType.STRING));
-
-  private static final List<Row> RAW_ROWS;
-
-  static {
-    try {
-      RAW_ROWS =
-          Arrays.asList(
-              Row.withSchema(BEAM_RAW_SCHEMA)
-                  .withFieldValue("payload", "a".getBytes("UTF8"))
-                  .build(),
-              Row.withSchema(BEAM_RAW_SCHEMA)
-                  .withFieldValue("payload", "b".getBytes("UTF8"))
-                  .build(),
-              Row.withSchema(BEAM_RAW_SCHEMA)
-                  .withFieldValue("payload", "c".getBytes("UTF8"))
-                  .build());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static final List<Row> ROWS =
-      Arrays.asList(
-          Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "a").build(),
-          Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "b").build(),
-          Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "c").build());
-
-  private static final List<Row> ROWSATTRIBUTES =
-      Arrays.asList(
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
-              .withFieldValue("name", "a")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
-              .withFieldValue("name", "b")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .build(),
-          Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
-              .withFieldValue("name", "c")
-              .withFieldValue("key1", "first_key")
-              .withFieldValue("key2", "second_key")
-              .build());
-
-  private static final String PROTO_STRING_SCHEMA =
-      "syntax = \"proto3\";\n"
-          + "package com.test.proto;"
-          + "\n"
-          + "message MyMessage {\n"
-          + " string name = 1;\n"
-          + "}";
-
-  private static final Map<String, AttributeValues> ATTRIBUTE_VALUES_MAP = new 
HashMap<>();
-
-  static {
-    ATTRIBUTE_VALUES_MAP.put(
-        "key1",
-        
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("first_key")).build());
-    ATTRIBUTE_VALUES_MAP.put(
-        "key2",
-        
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("second_key")).build());
-  }
-
-  private static final List<PubSubMessage> MESSAGES_RAW =
-      Arrays.asList(
-          
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("a")).build(),
-          
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("b")).build(),
-          
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("c")).build());
-
-  private static final List<PubSubMessage> MESSAGES =
-      Arrays.asList(
-          
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"a\"}")).build(),
-          
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"b\"}")).build(),
-          
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"c\"}")).build());
-  private static final List<PubSubMessage> MESSAGES_WITH_ATTRIBUTES =
-      Arrays.asList(
-          PubSubMessage.newBuilder()
-              .setData(ByteString.copyFromUtf8("{\"name\":\"a\"}"))
-              .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-              .build(),
-          PubSubMessage.newBuilder()
-              .setData(ByteString.copyFromUtf8("{\"name\":\"b\"}"))
-              .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-              .build(),
-          PubSubMessage.newBuilder()
-              .setData(ByteString.copyFromUtf8("{\"name\":\"c\"}"))
-              .putAllAttributes(ATTRIBUTE_VALUES_MAP)
-              .build());
-
-  final SerializableFunction<Row, byte[]> valueMapper =
-      JsonUtils.getRowToJsonBytesFunction(BEAM_SCHEMA);
-
-  final SerializableFunction<Row, byte[]> valueMapperRaw =
-      
PubsubLiteWriteSchemaTransformProvider.getRowToRawBytesFunction("payload");
-
-  @Rule public transient TestPipeline p = TestPipeline.create();
-
-  @Test
-  public void testPubsubLiteErrorFnSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    PCollection<Row> input = p.apply(Create.of(ROWS));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(new ErrorCounterFn("ErrorCounter", valueMapper, 
errorSchema, Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(MESSAGES);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnSuccessRawEvents() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    PCollection<Row> input = p.apply(Create.of(RAW_ROWS));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(new ErrorCounterFn("ErrorCounter", valueMapperRaw, 
errorSchema, Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(MESSAGES_RAW);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnSuccessWithAttributes() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    List<String> attributes = new ArrayList<>();
-    attributes.add("key1");
-    attributes.add("key2");
-    Schema schema =
-        PubsubLiteWriteSchemaTransformProvider.getSchemaWithoutAttributes(
-            BEAM_SCHEMA_ATTRIBUTES, attributes);
-    PCollection<Row> input = p.apply(Create.of(ROWSATTRIBUTES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(
-                    new ErrorCounterFn(
-                        "ErrorCounter", valueMapper, errorSchema, 
Boolean.TRUE, attributes, schema))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(MESSAGES_WITH_ATTRIBUTES);
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnSuccessWithAttributesAndDedupingSuccess() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-    List<String> attributes = new ArrayList<>();
-    attributes.add("key1");
-    attributes.add("key2");
-    Schema schema =
-        PubsubLiteWriteSchemaTransformProvider.getSchemaWithoutAttributes(
-            BEAM_SCHEMA_ATTRIBUTES, attributes);
-    PCollection<Row> input = p.apply(Create.of(ROWSATTRIBUTES));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(
-                    new ErrorCounterFn(
-                        "ErrorCounter", valueMapper, errorSchema, 
Boolean.TRUE, attributes, schema))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PCollection<Long> count =
-        output
-            .get(OUTPUT_TAG)
-            .apply(
-                ParDo.of(
-                    new 
PubsubLiteWriteSchemaTransformProvider.SetUuidFromPubSubMessage.SetUuidFn(
-                        "unique_key")))
-            .apply("error_count", Count.globally());
-    PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L));
-    p.run().waitUntilFinish();
-  }
-
-  @Test
-  public void testPubsubLiteErrorFnSuccessProto() {
-    Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
-    SerializableFunction<Row, byte[]> valueMapperProto =
-        ProtoByteUtils.getRowToProtoBytesFromSchema(
-            PROTO_STRING_SCHEMA, "com.test.proto.MyMessage");
-
-    PCollection<Row> input = p.apply(Create.of(ROWS));
-    PCollectionTuple output =
-        input.apply(
-            ParDo.of(
-                    new ErrorCounterFn("ErrorCounter", valueMapperProto, 
errorSchema, Boolean.TRUE))
-                .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
-    output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-    PAssert.that(output.get(OUTPUT_TAG).apply(Count.globally()))
-        .containsInAnyOrder(Collections.singletonList(3L));
-    p.run().waitUntilFinish();
-  }
-}
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml 
b/sdks/python/apache_beam/yaml/standard_io.yaml
index 458d3d63e43..e62b3a562c3 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -97,44 +97,6 @@
           'ReadFromKafka': '2.65.0'
           'WriteToKafka': '2.65.0'
 
-# PubSub
-- type: renaming
-  transforms:
-    'ReadFromPubSubLite': 'ReadFromPubSubLite'
-    'WriteToPubSubLite': 'WriteToPubSubLite'
-  config:
-    mappings:
-      'ReadFromPubSubLite':
-        'project': 'project'
-        'schema': 'schema'
-        'format': 'format'
-        'subscription_name': 'subscription_name'
-        'location': 'location'
-        'attributes': 'attributes'
-        'attribute_map': 'attribute_map'
-        'attribute_id': 'attribute_id'
-        'error_handling': 'error_handling'
-        'file_descriptor_path': 'file_descriptor_path'
-        'message_name': 'message_name'
-      'WriteToPubSubLite':
-        'project': 'project'
-        'format': 'format'
-        'topic_name': 'topic_name'
-        'location': 'location'
-        'attributes': 'attributes'
-        'attribute_id': 'attribute_id'
-        'error_handling': 'error_handling'
-        'file_descriptor_path': 'file_descriptor_path'
-        'message_name': 'message_name'
-        'schema': 'schema'
-    underlying_provider:
-      type: beamJar
-      transforms:
-        'ReadFromPubSubLite': 
'beam:schematransform:org.apache.beam:pubsublite_read:v1'
-        'WriteToPubSubLite': 
'beam:schematransform:org.apache.beam:pubsublite_write:v1'
-      config:
-        gradle_target: 
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
-
 # TODO(yaml): Tests are assuming python providers are before java ones, hence
 # the order below.  This should be fixed in the future.
 

Reply via email to