This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b50a0992b95 Remove Read and Write PubsubLite for Yaml (#37383)
b50a0992b95 is described below
commit b50a0992b959a12253656bc03a280ddfac2b3a02
Author: Derrick Williams <[email protected]>
AuthorDate: Thu Jan 22 18:54:43 2026 -0500
Remove Read and Write PubsubLite for Yaml (#37383)
* remove ReadWrite PubsubLite Yaml
* add back pubsub oops
* and the configuration
---
.../assets/symbols/java.g.yaml | 39 --
.../PubsubLiteReadSchemaTransformProvider.java | 544 --------------------
.../PubsubLiteWriteSchemaTransformProvider.java | 466 -----------------
.../beam/sdk/io/gcp/pubsublite/ReadWriteIT.java | 379 --------------
.../gcp/pubsublite/internal/PubsubLiteDlqTest.java | 565 ---------------------
.../internal/PubsubLiteWriteDlqTest.java | 273 ----------
sdks/python/apache_beam/yaml/standard_io.yaml | 38 --
7 files changed, 2304 deletions(-)
diff --git
a/playground/frontend/playground_components/assets/symbols/java.g.yaml
b/playground/frontend/playground_components/assets/symbols/java.g.yaml
index 345e11071b9..e0f2b1d15d1 100644
--- a/playground/frontend/playground_components/assets/symbols/java.g.yaml
+++ b/playground/frontend/playground_components/assets/symbols/java.g.yaml
@@ -9031,26 +9031,6 @@ PubsubLiteIO:
- expand
- read
- write
-PubsubLiteReadSchemaTransformProvider:
- methods:
- - build
- - buildTransform
- - builder
- - expand
- - from
- - getDataFormat
- - getLocation
- - getProject
- - getSchema
- - getSubscriptionName
- - identifier
- - inputCollectionNames
- - outputCollectionNames
- - setDataFormat
- - setLocation
- - setProject
- - setSchema
- - setSubscriptionName
PubsubLiteSink:
methods:
- finishBundle
@@ -9060,25 +9040,6 @@ PubsubLiteTableProvider:
methods:
- buildBeamSqlTable
- getTableType
-PubsubLiteWriteSchemaTransformProvider:
- methods:
- - build
- - buildTransform
- - builder
- - expand
- - getFormat
- - getLocation
- - getProject
- - getTopicName
- - identifier
- - inputCollectionNames
- - outputCollectionNames
- - setFormat
- - setLocation
- - setProject
- - setTopicName
- properties:
- - SUPPORTED_FORMATS
PubsubMessage:
methods:
- equals
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
deleted file mode 100644
index 61b94aeee44..00000000000
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.CloudRegionOrZone;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.SubscriptionName;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.protobuf.ByteString;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Consumer;
-import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@AutoService(SchemaTransformProvider.class)
-public class PubsubLiteReadSchemaTransformProvider
- extends TypedSchemaTransformProvider<
-
PubsubLiteReadSchemaTransformProvider.PubsubLiteReadSchemaTransformConfiguration>
{
-
- private static final Logger LOG =
- LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class);
-
- public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON,PROTO";
- public static final Set<String> VALID_DATA_FORMATS =
- Sets.newHashSet(VALID_FORMATS_STR.split(","));
-
- public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
- public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
-
- @Override
- protected Class<PubsubLiteReadSchemaTransformConfiguration>
configurationClass() {
- return PubsubLiteReadSchemaTransformConfiguration.class;
- }
-
- @Override
- public String description() {
- return "Performs a read from Google Pub/Sub Lite.\n"
- + "\n"
- + "**Note**: This provider is deprecated. See Pub/Sub Lite <a
href=\"https://cloud.google.com/pubsub/lite/docs\">documentation</a> for more
information.";
- }
-
- public static class ErrorFn extends DoFn<SequencedMessage, Row> {
- private final SerializableFunction<byte[], Row> valueMapper;
- private final Counter errorCounter;
- private Long errorsInBundle = 0L;
- private final boolean handleErrors;
-
- private final List<String> attributes;
-
- private final String attributeMap;
-
- private final Schema errorSchema;
-
- private final Schema attributeSchema;
-
- public ErrorFn(
- String name,
- SerializableFunction<byte[], Row> valueMapper,
- Schema errorSchema,
- boolean handleErrors) {
- this.errorCounter =
Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, name);
- this.valueMapper = valueMapper;
- this.errorSchema = errorSchema;
- this.handleErrors = handleErrors;
- this.attributes = new ArrayList<>();
- this.attributeMap = "";
- this.attributeSchema = Schema.builder().build();
- }
-
- public ErrorFn(
- String name,
- SerializableFunction<byte[], Row> valueMapper,
- Schema errorSchema,
- List<String> attributes,
- String attributeMap,
- Schema attributeSchema,
- boolean handleErrors) {
- this.errorCounter =
Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, name);
- this.valueMapper = valueMapper;
- this.errorSchema = errorSchema;
- this.handleErrors = handleErrors;
- this.attributes = attributes;
- this.attributeMap = attributeMap;
- this.attributeSchema = attributeSchema;
- }
-
- @ProcessElement
- public void process(@DoFn.Element SequencedMessage seqMessage,
MultiOutputReceiver receiver) {
- Row mappedRow = null;
- try {
- if (attributes.isEmpty()
- && attributeSchema.getFields().isEmpty()
- && attributeMap.isEmpty()) {
- mappedRow =
valueMapper.apply(seqMessage.getMessage().getData().toByteArray());
- } else {
- PubSubMessage message = seqMessage.getMessage();
- Row row = valueMapper.apply(message.getData().toByteArray());
- Row.Builder rowBuilder =
Row.withSchema(attributeSchema).addValues(row.getValues());
- Map<String, String> stringAttributeMap = new HashMap<>();
- message
- .getAttributesMap()
- .forEach(
- (attributeName, attributeValues) -> {
- if (attributes.contains(attributeName)) {
- processAttribute(attributeValues, rowBuilder::addValue);
- }
-
- if (!attributeMap.isEmpty()) {
- processAttribute(
- attributeValues, value ->
stringAttributeMap.put(attributeName, value));
- }
- });
- if (!attributeMap.isEmpty() && !stringAttributeMap.isEmpty()) {
- rowBuilder.addValue(stringAttributeMap);
- }
- mappedRow = rowBuilder.build();
- }
- } catch (Exception e) {
- if (!handleErrors) {
- throw new RuntimeException(e);
- }
- errorsInBundle += 1;
- LOG.warn("Error while parsing the element", e);
- receiver
- .get(ERROR_TAG)
- .output(
- ErrorHandling.errorRecord(
- errorSchema,
seqMessage.getMessage().getData().toByteArray(), e));
- }
- if (mappedRow != null) {
- receiver.get(OUTPUT_TAG).output(mappedRow);
- }
- }
-
- @FinishBundle
- public void finish(FinishBundleContext c) {
- errorCounter.inc(errorsInBundle);
- errorsInBundle = 0L;
- }
- }
-
- @Override
- public SchemaTransform from(PubsubLiteReadSchemaTransformConfiguration
configuration) {
- if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
- throw new IllegalArgumentException(
- String.format(
- "Format %s not supported. Only supported formats are %s",
- configuration.getFormat(), VALID_FORMATS_STR));
- }
- boolean handleErrors =
ErrorHandling.hasOutput(configuration.getErrorHandling());
- String format = configuration.getFormat();
- String inputSchema = configuration.getSchema();
- List<String> attributes = configuration.getAttributes();
- SerializableFunction<byte[], Row> valueMapper;
- Schema beamSchema;
-
- if (format != null && format.equals("RAW")) {
-
- beamSchema = Schema.builder().addField("payload",
Schema.FieldType.BYTES).build();
- valueMapper = getRawBytesToRowFunction(beamSchema);
-
- } else if (format != null && format.equals("PROTO")) {
- String fileDescriptorPath = configuration.getFileDescriptorPath();
- String messageName = configuration.getMessageName();
-
- if (fileDescriptorPath != null && messageName != null) {
- beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath,
messageName);
- valueMapper =
ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName);
- } else if (inputSchema != null && messageName != null) {
- beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema,
messageName);
- valueMapper =
ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName);
- } else {
- throw new IllegalArgumentException(
- "To read from PubSubLite in PROTO format, either descriptorPath or
schema must be provided.");
- }
-
- } else {
- if (inputSchema != null) {
- beamSchema =
- Objects.equals(configuration.getFormat(), "JSON")
- ? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
- : AvroUtils.toBeamSchema(new
org.apache.avro.Schema.Parser().parse(inputSchema));
- valueMapper =
- Objects.equals(configuration.getFormat(), "JSON")
- ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
- : AvroUtils.getAvroBytesToRowFunction(beamSchema);
- } else {
- throw new IllegalArgumentException(
- "To read from Pubsub Lite in JSON or AVRO format, you must provide
a schema.");
- }
- }
- return new SchemaTransform() {
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- String project = configuration.getProject();
- if (Strings.isNullOrEmpty(project)) {
- project =
input.getPipeline().getOptions().as(GcpOptions.class).getProject();
- }
- if (project == null) {
- throw new IllegalArgumentException(
- "Unable to infer the project to read from Pubsub Lite. Please
provide a project.");
- }
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- List<String> attributeList = new ArrayList<>();
- if (attributes != null) {
- attributeList = attributes;
- }
- String attributeMapValue = configuration.getAttributeMap();
- String attributeMap = attributeMapValue == null ? "" :
attributeMapValue;
- Schema resultingBeamSchema =
- buildSchemaWithAttributes(beamSchema, attributeList, attributeMap);
- PCollection<SequencedMessage> readPubsubLite =
- input
- .getPipeline()
- .apply(
- PubsubLiteIO.read(
- SubscriberOptions.newBuilder()
- .setSubscriptionPath(
- SubscriptionPath.newBuilder()
- .setLocation(
-
CloudRegionOrZone.parse(configuration.getLocation()))
- .setProject(ProjectId.of(project))
- .setName(
-
SubscriptionName.of(configuration.getSubscriptionName()))
- .build())
- .build()));
-
- String attributeId = configuration.getAttributeId();
- PCollectionTuple outputTuple;
- PCollection<SequencedMessage> transformSequencedMessage;
- if (attributeId != null && !attributeId.isEmpty()) {
- UuidDeduplicationOptions.Builder uuidExtractor =
- UuidDeduplicationOptions.newBuilder()
- .setUuidExtractor(getUuidFromMessage(attributeId));
- transformSequencedMessage =
-
readPubsubLite.apply(PubsubLiteIO.deduplicate(uuidExtractor.build()));
- } else {
- transformSequencedMessage = readPubsubLite;
- }
-
- outputTuple =
- transformSequencedMessage.apply(
- ParDo.of(
- new ErrorFn(
- "PubsubLite-read-error-counter",
- valueMapper,
- errorSchema,
- attributeList,
- attributeMap,
- resultingBeamSchema,
- handleErrors))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
- return PCollectionRowTuple.of(
- "output",
- outputTuple.get(OUTPUT_TAG).setRowSchema(resultingBeamSchema),
- "errors",
- outputTuple.get(ERROR_TAG).setRowSchema(errorSchema));
- }
- };
- }
-
- /**
- * Builds a new {@link Schema} by adding additional optional attributes and
map field to the
- * provided schema.
- *
- * @param schema The base schema to which additional attributes and map
field will be added.
- * @param attributes A list of optional attribute names to be added as
STRING fields to the
- * schema.
- * @param attributesMap The name of the optional map field to be added to
the schema. If empty, no
- * map field will be added.
- * @return A new {@link Schema} with the specified attributes and an
optional map field.
- * @throws IllegalArgumentException if the schema is null or if any
attribute name in the
- * attributes list is null or empty.
- */
- public static Schema buildSchemaWithAttributes(
- Schema schema, List<String> attributes, String attributesMap) {
- Schema.Builder schemaBuilder = Schema.builder();
- // Copy fields from the original schema
- schema.getFields().forEach(field ->
schemaBuilder.addField(field.getName(), field.getType()));
-
- // Add optional additional attributes as STRING fields
- attributes.forEach(
- attribute -> {
- if (attribute == null || attribute.isEmpty()) {
- throw new IllegalArgumentException(
- "Attribute names in the attributes list must not be null or
empty.");
- }
- schemaBuilder.addField(attribute, Schema.FieldType.STRING);
- });
-
- // Add an optional map field if attributesMap is not empty
- if (!attributesMap.isEmpty()) {
- schemaBuilder
- .addMapField(attributesMap, Schema.FieldType.STRING,
Schema.FieldType.STRING)
- .build();
- }
- return schemaBuilder.build();
- }
-
- /**
- * Processes the attribute values, invoking the specified consumer with the
processed value. If
- * the attribute values are null or contain multiple values, an exception is
thrown.
- *
- * @param attributeValues The attribute values to be processed. If null, the
method does nothing.
- * @param valueConsumer The consumer to accept the processed value.
- * @throws RuntimeException if attributeValues is not null and contains
multiple values.
- */
- private static void processAttribute(
- @Nullable AttributeValues attributeValues, Consumer<String>
valueConsumer) {
- if (attributeValues != null) {
- List<ByteString> valueList = attributeValues.getValuesList();
- if (valueList.size() != 1) {
- throw new RuntimeException(
- "Received an unparseable message with multiple values for an
attribute.");
- }
- valueConsumer.accept(valueList.get(0).toStringUtf8());
- }
- }
-
- public static SerializableFunction<byte[], Row>
getRawBytesToRowFunction(Schema rawSchema) {
- return new SimpleFunction<byte[], Row>() {
- @Override
- public Row apply(byte[] input) {
- return Row.withSchema(rawSchema).addValue(input).build();
- }
- };
- }
-
- public static SerializableFunction<SequencedMessage, Uuid>
getUuidFromMessage(
- String attributeId) {
- return new SimpleFunction<SequencedMessage, Uuid>() {
- @Override
- public Uuid apply(SequencedMessage input) {
- AttributeValues attribute =
input.getMessage().getAttributesMap().get(attributeId);
- if (attribute != null) {
- if (attribute.getValuesCount() != 1) {
- throw new RuntimeException(
- "Received an unparseable message with multiple values for an
attribute.");
- }
- return Uuid.of(attribute.getValues(0));
- } else {
- throw new RuntimeException("Uuid attribute missing.");
- }
- }
- };
- }
-
- @Override
- public String identifier() {
- return "beam:schematransform:org.apache.beam:pubsublite_read:v1";
- }
-
- @Override
- public List<String> inputCollectionNames() {
- return Collections.emptyList();
- }
-
- @Override
- public List<String> outputCollectionNames() {
- return Arrays.asList("output", "errors");
- }
-
- @AutoValue
- @DefaultSchema(AutoValueSchema.class)
- public abstract static class PubsubLiteReadSchemaTransformConfiguration {
-
- public void validate() {
- final String dataFormat = this.getFormat();
- assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
- : "Valid data formats are " + VALID_DATA_FORMATS;
-
- final String inputSchema = this.getSchema();
- final String messageName = this.getMessageName();
-
- if (dataFormat != null && dataFormat.equals("RAW")) {
- assert inputSchema == null
- : "To read from Pubsub Lite in RAW format, you can't provide a
schema.";
- }
-
- if (dataFormat != null && dataFormat.equals("PROTO")) {
- assert messageName != null
- : "To read from Pubsub Lite in PROTO format, messageName must be
provided.";
- }
- }
-
- @SchemaFieldDescription(
- "The encoding format for the data stored in Pubsub Lite. Valid options
are: "
- + VALID_FORMATS_STR)
- public abstract String getFormat();
-
- @SchemaFieldDescription(
- "The schema in which the data is encoded in the Pubsub Lite topic. "
- + "For AVRO data, this is a schema defined with AVRO schema syntax
"
- + "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). "
- + "For JSON data, this is a schema defined with JSON-schema syntax
(https://json-schema.org/).")
- public abstract @Nullable String getSchema();
-
- @SchemaFieldDescription(
- "The GCP project where the Pubsub Lite reservation resides. This can
be a "
- + "project number of a project ID.")
- public abstract @Nullable String getProject();
-
- @SchemaFieldDescription(
- "The name of the subscription to consume data. This will be
concatenated with "
- + "the project and location parameters to build a full
subscription path.")
- public abstract String getSubscriptionName();
-
- @SchemaFieldDescription("The region or zone where the Pubsub Lite
reservation resides.")
- public abstract String getLocation();
-
- @SchemaFieldDescription("This option specifies whether and where to output
unwritable rows.")
- public abstract @Nullable ErrorHandling getErrorHandling();
-
- @SchemaFieldDescription(
- "List of attribute keys whose values will be flattened into the "
- + "output message as additional fields. For example, if the
format is `RAW` "
- + "and attributes is `[\"a\", \"b\"]` then this read will produce
elements of "
- + "the form `Row(payload=..., a=..., b=...)`")
- public abstract @Nullable List<String> getAttributes();
-
- @SchemaFieldDescription(
- "Name of a field in which to store the full set of attributes "
- + "associated with this message. For example, if the format is
`RAW` and "
- + "`attribute_map` is set to `\"attrs\"` then this read will
produce elements "
- + "of the form `Row(payload=..., attrs=...)` where `attrs` is a
Map type "
- + "of string to string. "
- + "If both `attributes` and `attribute_map` are set, the
overlapping "
- + "attribute values will be present in both the flattened
structure and the "
- + "attribute map.")
- public abstract @Nullable String getAttributeMap();
-
- @SchemaFieldDescription(
- "The attribute on incoming Pubsub Lite messages to use as a unique "
- + "record identifier. When specified, the value of this attribute
(which "
- + "can be any string that uniquely identifies the record) will be
used for "
- + "deduplication of messages. If not provided, we cannot guarantee
"
- + "that no duplicate data will be delivered on the Pub/Sub stream.
In this "
- + "case, deduplication of the stream will be strictly best
effort.")
- public abstract @Nullable String getAttributeId();
-
- @SchemaFieldDescription(
- "The path to the Protocol Buffer File Descriptor Set file. This file
is used for schema"
- + " definition and message serialization.")
- @Nullable
- public abstract String getFileDescriptorPath();
-
- @SchemaFieldDescription(
- "The name of the Protocol Buffer message to be used for schema"
- + " extraction and data conversion.")
- @Nullable
- public abstract String getMessageName();
-
- public static Builder builder() {
- return new
AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration
- .Builder();
- }
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setFormat(String format);
-
- public abstract Builder setSchema(String schema);
-
- public abstract Builder setProject(String project);
-
- public abstract Builder setSubscriptionName(String subscriptionName);
-
- public abstract Builder setLocation(String location);
-
- public abstract Builder setErrorHandling(ErrorHandling errorHandling);
-
- public abstract Builder setAttributes(List<String> attributes);
-
- @SuppressWarnings("unused")
- public abstract Builder setAttributeMap(String attributeMap);
-
- @SuppressWarnings("unused")
- public abstract Builder setAttributeId(String attributeId);
-
- @SuppressWarnings("unused")
- public abstract Builder setFileDescriptorPath(String fileDescriptorPath);
-
- @SuppressWarnings("unused")
- public abstract Builder setMessageName(String messageName);
-
- public abstract PubsubLiteReadSchemaTransformConfiguration build();
- }
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
deleted file mode 100644
index 54ed7ac495d..00000000000
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.CloudRegionOrZone;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.TopicName;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.protobuf.ByteString;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@AutoService(SchemaTransformProvider.class)
-public class PubsubLiteWriteSchemaTransformProvider
- extends TypedSchemaTransformProvider<
-
PubsubLiteWriteSchemaTransformProvider.PubsubLiteWriteSchemaTransformConfiguration>
{
-
- public static final String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO,PROTO";
- public static final Set<String> SUPPORTED_FORMATS =
- Sets.newHashSet(SUPPORTED_FORMATS_STR.split(","));
- public static final TupleTag<PubSubMessage> OUTPUT_TAG = new
TupleTag<PubSubMessage>() {};
- public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
- private static final Logger LOG =
- LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class);
-
- @Override
- protected Class<PubsubLiteWriteSchemaTransformConfiguration>
configurationClass() {
- return PubsubLiteWriteSchemaTransformConfiguration.class;
- }
-
- @Override
- public String description() {
- return "Performs a write to Google Pub/Sub Lite.\n"
- + "\n"
- + "**Note**: This provider is deprecated. See Pub/Sub Lite <a
href=\"https://cloud.google.com/pubsub/lite/docs\">documentation</a> for more
information.";
- }
-
- public static class ErrorCounterFn extends DoFn<Row, PubSubMessage> {
- private final SerializableFunction<Row, byte[]> toBytesFn;
- private final Counter errorCounter;
- private long errorsInBundle = 0L;
-
- private final Schema errorSchema;
-
- private final boolean handleErrors;
-
- private final List<String> attributes;
-
- private final Schema schemaWithoutAttributes;
-
- public ErrorCounterFn(
- String name,
- SerializableFunction<Row, byte[]> toBytesFn,
- Schema errorSchema,
- boolean handleErrors) {
- this.toBytesFn = toBytesFn;
- errorCounter =
Metrics.counter(PubsubLiteWriteSchemaTransformProvider.class, name);
- this.errorSchema = errorSchema;
- this.handleErrors = handleErrors;
- this.attributes = new ArrayList<>();
- this.schemaWithoutAttributes = Schema.builder().build();
- }
-
- public ErrorCounterFn(
- String name,
- SerializableFunction<Row, byte[]> toBytesFn,
- Schema errorSchema,
- boolean handleErrors,
- List<String> attributes,
- Schema schemaWithoutAttributes) {
- this.toBytesFn = toBytesFn;
- errorCounter =
Metrics.counter(PubsubLiteWriteSchemaTransformProvider.class, name);
- this.errorSchema = errorSchema;
- this.handleErrors = handleErrors;
- this.attributes = attributes;
- this.schemaWithoutAttributes = schemaWithoutAttributes;
- }
-
- @ProcessElement
- public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
- try {
- PubSubMessage message;
- if (attributes.isEmpty()) {
- message =
- PubSubMessage.newBuilder()
-
.setData(ByteString.copyFrom(Objects.requireNonNull(toBytesFn.apply(row))))
- .build();
- } else {
- Row.Builder builder = Row.withSchema(schemaWithoutAttributes);
- schemaWithoutAttributes
- .getFields()
- .forEach(field ->
builder.addValue(row.getValue(field.getName())));
-
- Row resultingRow = builder.build();
- Map<String, AttributeValues> attributeValuesHashMap =
- getStringAttributeValuesMap(row, attributes);
- message =
- PubSubMessage.newBuilder()
- .setData(
-
ByteString.copyFrom(Objects.requireNonNull(toBytesFn.apply(resultingRow))))
- .putAllAttributes(attributeValuesHashMap)
- .build();
- }
-
- receiver.get(OUTPUT_TAG).output(message);
- } catch (Exception e) {
- if (!handleErrors) {
- throw new RuntimeException(e);
- }
- errorsInBundle += 1;
- LOG.warn("Error while processing the element", e);
- receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema,
row, e));
- }
- }
-
- @FinishBundle
- public void finish() {
- errorCounter.inc(errorsInBundle);
- errorsInBundle = 0L;
- }
- }
-
- @Override
- public SchemaTransform from(PubsubLiteWriteSchemaTransformConfiguration
configuration) {
-
- if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) {
- throw new IllegalArgumentException(
- "Format "
- + configuration.getFormat()
- + " is not supported. "
- + "Supported formats are: "
- + String.join(", ", SUPPORTED_FORMATS));
- }
-
- return new SchemaTransform() {
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- List<String> attributesConfigValue = configuration.getAttributes();
- String attributeId = configuration.getAttributeId();
- List<String> attributes =
- attributesConfigValue != null ? attributesConfigValue : new
ArrayList<>();
- Schema inputSchema;
- if (!attributes.isEmpty()) {
- inputSchema =
getSchemaWithoutAttributes(input.get("input").getSchema(), attributes);
- } else {
- inputSchema = input.get("input").getSchema();
- }
- ErrorHandling errorHandling = configuration.getErrorHandling();
- boolean handleErrors = ErrorHandling.hasOutput(errorHandling);
- Schema errorSchema = ErrorHandling.errorSchema(inputSchema);
-
- final SerializableFunction<Row, byte[]> toBytesFn;
- if (configuration.getFormat().equals("RAW")) {
- int numFields = inputSchema.getFields().size();
- if (numFields != 1) {
- throw new IllegalArgumentException("Expecting exactly one field,
found " + numFields);
- }
- if
(!inputSchema.getField(0).getType().equals(Schema.FieldType.BYTES)) {
- throw new IllegalArgumentException(
- "The input schema must have exactly one field of type byte.");
- }
- toBytesFn =
getRowToRawBytesFunction(inputSchema.getField(0).getName());
- } else if (configuration.getFormat().equals("PROTO")) {
- String descriptorPath = configuration.getFileDescriptorPath();
- String schema = configuration.getSchema();
- String messageName = configuration.getMessageName();
-
- if (descriptorPath != null && messageName != null) {
- toBytesFn = ProtoByteUtils.getRowToProtoBytes(descriptorPath,
messageName);
- } else if (schema != null && messageName != null) {
- toBytesFn = ProtoByteUtils.getRowToProtoBytesFromSchema(schema,
messageName);
- } else {
- throw new IllegalArgumentException(
- "At least a descriptorPath or a PROTO schema is required.");
- }
- } else if (configuration.getFormat().equals("JSON")) {
- toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema);
- } else {
- toBytesFn = AvroUtils.getRowToAvroBytesFunction(inputSchema);
- }
-
- PCollectionTuple outputTuple =
- input
- .get("input")
- .apply(
- "Map Rows to PubSubMessages",
- ParDo.of(
- new ErrorCounterFn(
- "PubSubLite-write-error-counter",
- toBytesFn,
- errorSchema,
- handleErrors,
- attributes,
- inputSchema))
- .withOutputTags(OUTPUT_TAG,
TupleTagList.of(ERROR_TAG)));
-
- outputTuple
- .get(OUTPUT_TAG)
- .apply(
- "Add UUIDs",
- (attributeId != null && !attributeId.isEmpty())
- ? new SetUuidFromPubSubMessage(attributeId)
- : PubsubLiteIO.addUuids())
- .apply(
- "Write to PS Lite",
- PubsubLiteIO.write(
- PublisherOptions.newBuilder()
- .setTopicPath(
- TopicPath.newBuilder()
-
.setProject(ProjectId.of(configuration.getProject()))
-
.setName(TopicName.of(configuration.getTopicName()))
-
.setLocation(CloudRegionOrZone.parse(configuration.getLocation()))
- .build())
- .build()));
-
- PCollection<Row> errorOutput =
-
outputTuple.get(ERROR_TAG).setRowSchema(ErrorHandling.errorSchema(errorSchema));
-
- String outputString = errorHandling != null ?
errorHandling.getOutput() : "errors";
- return PCollectionRowTuple.of(handleErrors ? outputString : "errors",
errorOutput);
- }
- };
- }
-
- public static Schema getSchemaWithoutAttributes(Schema inputSchema,
List<String> attributes) {
- Schema.Builder schemaBuilder = Schema.builder();
-
- inputSchema
- .getFields()
- .forEach(
- field -> {
- if (!attributes.contains(field.getName())) {
- schemaBuilder.addField(field.getName(), field.getType());
- }
- });
- return schemaBuilder.build();
- }
-
- private static Map<String, AttributeValues> getStringAttributeValuesMap(
- Row row, List<String> attributes) {
- Map<String, AttributeValues> attributeValuesHashMap = new HashMap<>();
- attributes.forEach(
- attribute -> {
- String value = row.getValue(attribute);
- if (value != null) {
- attributeValuesHashMap.put(
- attribute,
-
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8(value)).build());
- }
- });
- return attributeValuesHashMap;
- }
-
- public static SerializableFunction<Row, byte[]>
getRowToRawBytesFunction(String rowFieldName) {
- return new SimpleFunction<Row, byte[]>() {
- @Override
- public byte[] apply(Row input) {
- byte[] rawBytes = input.getBytes(rowFieldName);
- if (rawBytes == null) {
- throw new NullPointerException();
- }
- return rawBytes;
- }
- };
- }
-
- @Override
- public String identifier() {
- return "beam:schematransform:org.apache.beam:pubsublite_write:v1";
- }
-
- @Override
- public List<String> inputCollectionNames() {
- return Collections.singletonList("input");
- }
-
- @Override
- public List<String> outputCollectionNames() {
- return Collections.singletonList("errors");
- }
-
- @AutoValue
- @DefaultSchema(AutoValueSchema.class)
- public abstract static class PubsubLiteWriteSchemaTransformConfiguration {
-
- public void validate() {
- final String dataFormat = this.getFormat();
- final String inputSchema = this.getSchema();
- final String messageName = this.getMessageName();
- final String descriptorPath = this.getFileDescriptorPath();
-
- if (dataFormat != null && dataFormat.equals("PROTO")) {
- assert messageName != null : "Expecting messageName to be non-null.";
- assert descriptorPath != null && inputSchema != null
- : "You must include a descriptorPath or a PROTO schema but not
both.";
- }
- }
-
- @SchemaFieldDescription(
- "The GCP project where the Pubsub Lite reservation resides. This can
be a "
- + "project number of a project ID.")
- public abstract String getProject();
-
- @SchemaFieldDescription("The region or zone where the Pubsub Lite
reservation resides.")
- public abstract String getLocation();
-
- @SchemaFieldDescription(
- "The name of the topic to publish data into. This will be concatenated
with "
- + "the project and location parameters to build a full topic
path.")
- public abstract String getTopicName();
-
- @SchemaFieldDescription(
- "The encoding format for the data stored in Pubsub Lite. Valid options
are: "
- + SUPPORTED_FORMATS_STR)
- public abstract String getFormat();
-
- @SchemaFieldDescription("This option specifies whether and where to output
unwritable rows.")
- public abstract @Nullable ErrorHandling getErrorHandling();
-
- @SchemaFieldDescription(
- "List of attribute keys whose values will be pulled out as "
- + "Pubsub Lite message attributes. For example, if the format is
`JSON` "
- + "and attributes is `[\"a\", \"b\"]` then elements of the form "
- + "`Row(any_field=..., a=..., b=...)` will result in Pubsub Lite
messages whose "
- + "payload has the contents of any_field and whose attribute will
be "
- + "populated with the values of `a` and `b`.")
- public abstract @Nullable List<String> getAttributes();
-
- @SchemaFieldDescription(
- "If set, will set an attribute for each Pubsub Lite message "
- + "with the given name and a unique value. This attribute can then
be used "
- + "in a ReadFromPubSubLite PTransform to deduplicate messages.")
- public abstract @Nullable String getAttributeId();
-
- @SchemaFieldDescription(
- "The path to the Protocol Buffer File Descriptor Set file. This file
is used for schema"
- + " definition and message serialization.")
- public abstract @Nullable String getFileDescriptorPath();
-
- @SchemaFieldDescription(
- "The name of the Protocol Buffer message to be used for schema"
- + " extraction and data conversion.")
- public abstract @Nullable String getMessageName();
-
- public abstract @Nullable String getSchema();
-
- public static Builder builder() {
- return new
AutoValue_PubsubLiteWriteSchemaTransformProvider_PubsubLiteWriteSchemaTransformConfiguration
- .Builder();
- }
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setProject(String project);
-
- public abstract Builder setLocation(String location);
-
- public abstract Builder setTopicName(String topicName);
-
- public abstract Builder setFormat(String format);
-
- public abstract Builder setErrorHandling(ErrorHandling errorHandling);
-
- public abstract Builder setAttributes(List<String> attributes);
-
- @SuppressWarnings("unused")
- public abstract Builder setAttributeId(String attributeId);
-
- @SuppressWarnings("unused")
- public abstract Builder setFileDescriptorPath(String fileDescriptorPath);
-
- @SuppressWarnings("unused")
- public abstract Builder setMessageName(String messageName);
-
- @SuppressWarnings("unused")
- public abstract Builder setSchema(String schema);
-
- public abstract PubsubLiteWriteSchemaTransformConfiguration build();
- }
- }
-
- public static class SetUuidFromPubSubMessage
- extends PTransform<PCollection<PubSubMessage>,
PCollection<PubSubMessage>> {
- private final String attributeId;
-
- public SetUuidFromPubSubMessage(String attributeId) {
- this.attributeId = attributeId;
- }
-
- @Override
- public PCollection<PubSubMessage> expand(PCollection<PubSubMessage> input)
{
- return input.apply("SetUuidFromPubSubMessage", ParDo.of(new
SetUuidFn(attributeId)));
- }
-
- public static class SetUuidFn extends DoFn<PubSubMessage, PubSubMessage> {
- private final String attributeId;
-
- public SetUuidFn(String attributeId) {
- this.attributeId = attributeId;
- }
-
- @ProcessElement
- public void processElement(
- @Element PubSubMessage input, OutputReceiver<PubSubMessage>
outputReceiver) {
- PubSubMessage.Builder builder = input.toBuilder();
- builder.putAttributes(
- attributeId,
AttributeValues.newBuilder().addValues(Uuid.random().value()).build());
- outputReceiver.output(builder.build());
- }
- }
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
deleted file mode 100644
index 1f2819940ff..00000000000
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite;
-
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
-import com.google.cloud.pubsublite.AdminClient;
-import com.google.cloud.pubsublite.AdminClientSettings;
-import com.google.cloud.pubsublite.BacklogLocation;
-import com.google.cloud.pubsublite.CloudZone;
-import com.google.cloud.pubsublite.Message;
-import com.google.cloud.pubsublite.ProjectId;
-import com.google.cloud.pubsublite.SubscriptionName;
-import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.TopicName;
-import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.cloud.pubsublite.proto.Subscription;
-import
com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
-import com.google.cloud.pubsublite.proto.Topic;
-import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
-import com.google.protobuf.ByteString;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(JUnit4.class)
-public class ReadWriteIT {
- private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class);
- private static final CloudZone ZONE = CloudZone.parse("us-central1-b");
- private static final int MESSAGE_COUNT = 90;
- private static final Schema SAMPLE_BEAM_SCHEMA =
-
Schema.builder().addStringField("numberInString").addInt32Field("numberInInt").build();
-
- @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- private static ProjectId getProject(PipelineOptions options) {
- return
ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject()));
- }
-
- private static String randomName() {
- return "beam_it_resource_" + ThreadLocalRandom.current().nextLong();
- }
-
- private static AdminClient newAdminClient() {
- return
AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build());
- }
-
- private final Deque<Runnable> cleanupActions = new ArrayDeque<>();
-
- private TopicPath createTopic(ProjectId id) throws Exception {
- TopicPath toReturn =
- TopicPath.newBuilder()
- .setProject(id)
- .setLocation(ZONE)
- .setName(TopicName.of(randomName()))
- .build();
- Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString());
- topic
- .getPartitionConfigBuilder()
- .setCount(2)
-
.setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4));
- topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30));
- cleanupActions.addLast(
- () -> {
- try (AdminClient client = newAdminClient()) {
- client.deleteTopic(toReturn).get();
- } catch (Throwable t) {
- LOG.error("Failed to clean up topic.", t);
- }
- });
- LOG.info("Creating topic named {}", toReturn);
- try (AdminClient client = newAdminClient()) {
- client.createTopic(topic.build()).get();
- }
- return toReturn;
- }
-
- private SubscriptionPath createSubscription(TopicPath topic) throws
Exception {
- SubscriptionPath toReturn =
- SubscriptionPath.newBuilder()
- .setProject(topic.project())
- .setLocation(ZONE)
- .setName(SubscriptionName.of(randomName()))
- .build();
- Subscription.Builder subscription =
Subscription.newBuilder().setName(toReturn.toString());
- subscription
- .getDeliveryConfigBuilder()
- .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY);
- subscription.setTopic(topic.toString());
- cleanupActions.addLast(
- () -> {
- try (AdminClient client = newAdminClient()) {
- client.deleteSubscription(toReturn).get();
- } catch (Throwable t) {
- LOG.error("Failed to clean up subscription.", t);
- }
- });
- LOG.info("Creating subscription named {} from topic {}", toReturn, topic);
- try (AdminClient client = newAdminClient()) {
- client.createSubscription(subscription.build(),
BacklogLocation.BEGINNING).get();
- }
- return toReturn;
- }
-
- @After
- public void tearDown() {
- while (!cleanupActions.isEmpty()) {
- cleanupActions.removeLast().run();
- }
- }
-
- // Workaround for https://github.com/apache/beam/issues/21257
- // TODO(https://github.com/apache/beam/issues/21257): Remove this.
- private static class CustomCreate extends PTransform<PCollection<Void>,
PCollection<Integer>> {
- @Override
- public PCollection<Integer> expand(PCollection<Void> input) {
- return input.apply(
- "createIndexes",
- FlatMapElements.via(
- new SimpleFunction<Void, Iterable<Integer>>() {
- @Override
- public Iterable<Integer> apply(Void input) {
- return IntStream.range(0,
MESSAGE_COUNT).boxed().collect(Collectors.toList());
- }
- }));
- }
- }
-
- public static void writeJsonMessages(TopicPath topicPath, Pipeline pipeline)
{
- PCollectionRowTuple.of(
- "input",
- pipeline
- .apply(Create.of((Void) null))
- .apply("createIndexes", new CustomCreate())
- .apply(
- "format to rows",
- MapElements.via(
- new SimpleFunction<Integer, Row>(
- index ->
- Row.withSchema(SAMPLE_BEAM_SCHEMA)
-
.addValue(Objects.requireNonNull(index).toString())
- .addValue(index)
- .build()) {}))
- .setRowSchema(SAMPLE_BEAM_SCHEMA))
- .apply(
- "write to pslite",
- new PubsubLiteWriteSchemaTransformProvider()
- .from(
- PubsubLiteWriteSchemaTransformProvider
- .PubsubLiteWriteSchemaTransformConfiguration.builder()
- .setFormat("JSON")
- .setLocation(ZONE.toString())
- .setTopicName(topicPath.name().value())
- .setProject(topicPath.project().name().value())
- .build()));
- }
-
- public static void writeMessages(TopicPath topicPath, Pipeline pipeline) {
- PCollection<Void> trigger = pipeline.apply(Create.of((Void) null));
- PCollection<Integer> indexes = trigger.apply("createIndexes", new
CustomCreate());
- PCollection<PubSubMessage> messages =
- indexes.apply(
- "createMessages",
- MapElements.via(
- new SimpleFunction<Integer, PubSubMessage>(
- index ->
- Message.builder()
- .setData(ByteString.copyFromUtf8(index.toString()))
- .build()
- .toProto()) {}));
- // Add UUIDs to messages for later deduplication.
- messages = messages.apply("addUuids", PubsubLiteIO.addUuids());
- messages.apply(
- "writeMessages",
-
PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build()));
- }
-
- public static PCollection<SequencedMessage> readMessages(
- SubscriptionPath subscriptionPath, Pipeline pipeline) {
- PCollection<SequencedMessage> messages =
- pipeline.apply(
- "readMessages",
- PubsubLiteIO.read(
-
SubscriberOptions.newBuilder().setSubscriptionPath(subscriptionPath).build()));
- return messages;
- // TODO(https://github.com/apache/beam/issues/21157): Fix and re-enable
- // Deduplicate messages based on the uuids added in
PubsubLiteIO.addUuids() when writing.
- // return messages.apply(
- // "dedupeMessages",
PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build()));
- }
-
- public static SimpleFunction<SequencedMessage, Integer> extractIds() {
- return new SimpleFunction<SequencedMessage, Integer>() {
- @Override
- public Integer apply(SequencedMessage input) {
- return Integer.parseInt(input.getMessage().getData().toStringUtf8());
- }
- };
- }
-
- public static SerializableFunction<Set<Integer>, Boolean> testIds() {
- return ids -> {
- LOG.debug("Ids are: {}", ids);
- Set<Integer> target = IntStream.range(0,
MESSAGE_COUNT).boxed().collect(Collectors.toSet());
- return target.equals(ids);
- };
- }
-
- @Test
- public void testPubsubLiteWriteReadWithSchemaTransform() throws Exception {
- pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
- pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
- TopicPath topic = createTopic(getProject(pipeline.getOptions()));
- SubscriptionPath subscription = null;
- Exception lastException = null;
- for (int i = 0; i < 30; ++i) {
- // Sleep for topic creation to propagate.
- Thread.sleep(1000);
- try {
- subscription = createSubscription(topic);
- break;
- } catch (Exception e) {
- lastException = e;
- LOG.info("Retrying exception on subscription creation.", e);
- }
- }
- if (subscription == null) {
- throw lastException;
- }
-
- // Publish some messages
- writeJsonMessages(topic, pipeline);
-
- // Read some messages. They should be deduplicated by the time we see
them, so there should be
- // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
- PCollection<Row> messages =
- PCollectionRowTuple.empty(pipeline)
- .apply(
- "read from pslite",
- new PubsubLiteReadSchemaTransformProvider()
- .from(
- PubsubLiteReadSchemaTransformProvider
-
.PubsubLiteReadSchemaTransformConfiguration.builder()
- .setFormat("JSON")
- .setSchema(
- "{\n"
- + " \"properties\": {\n"
- + " \"numberInString\": {\n"
- + " \"type\": \"string\"\n"
- + " },\n"
- + " \"numberInInt\": {\n"
- + " \"type\": \"integer\"\n"
- + " }\n"
- + " }\n"
- + "}")
- .setSubscriptionName(subscription.name().value())
- .setLocation(subscription.location().toString())
- .build()))
- .get("output");
- PCollection<Integer> ids =
- messages.apply(
- "get ints",
- MapElements.into(TypeDescriptors.integers())
- .via(
- row -> {
- return
Objects.requireNonNull(row.getInt64("numberInInt")).intValue();
- }));
- ids.apply("PubsubSignalTest",
signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
- Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(8));
- pipeline.apply("start signal", signal.signalStart());
- PipelineResult job = pipeline.run();
- start.get();
- LOG.info("Running!");
- signal.waitForSuccess(Duration.standardMinutes(5));
- // A runner may not support cancel
- try {
- job.cancel();
- } catch (UnsupportedOperationException exc) {
- // noop
- }
- }
-
- @Test
- public void testReadWrite() throws Exception {
- pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
- pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
-
- TopicPath topic = createTopic(getProject(pipeline.getOptions()));
- SubscriptionPath subscription = null;
- Exception lastException = null;
- for (int i = 0; i < 30; ++i) {
- // Sleep for topic creation to propagate.
- Thread.sleep(1000);
- try {
- subscription = createSubscription(topic);
- break;
- } catch (Exception e) {
- lastException = e;
- LOG.info("Retrying exception on subscription creation.", e);
- }
- }
- if (subscription == null) {
- throw lastException;
- }
-
- // Publish some messages
- writeMessages(topic, pipeline);
-
- // Read some messages. They should be deduplicated by the time we see
them, so there should be
- // exactly numMessages, one for every index in [0,MESSAGE_COUNT).
- PCollection<SequencedMessage> messages = readMessages(subscription,
pipeline);
- PCollection<Integer> ids = messages.apply(MapElements.via(extractIds()));
- ids.apply("PubsubSignalTest",
signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds()));
- Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(8));
- pipeline.apply(signal.signalStart());
- PipelineResult job = pipeline.run();
- start.get();
- LOG.info("Running!");
- signal.waitForSuccess(Duration.standardMinutes(5));
- // A runner may not support cancel
- try {
- job.cancel();
- } catch (UnsupportedOperationException exc) {
- // noop
- }
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
deleted file mode 100644
index 3d0ba336eee..00000000000
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite.internal;
-
-import static
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.getRawBytesToRowFunction;
-import static
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.getUuidFromMessage;
-
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.cloud.pubsublite.proto.SequencedMessage;
-import com.google.protobuf.ByteString;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
-import
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider;
-import
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.ErrorFn;
-import org.apache.beam.sdk.io.gcp.pubsublite.UuidDeduplicationOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class PubsubLiteDlqTest {
-
- private static final TupleTag<Row> OUTPUT_TAG =
PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG;
- private static final TupleTag<Row> ERROR_TAG =
PubsubLiteReadSchemaTransformProvider.ERROR_TAG;
-
- private static final Schema BEAM_RAW_SCHEMA =
- Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
- private static final Schema BEAM_SCHEMA =
- Schema.of(Schema.Field.of("name", Schema.FieldType.STRING));
-
- private static final Schema BEAM_SCHEMA_ATTRIBUTES =
- Schema.of(
- Schema.Field.of("name", Schema.FieldType.STRING),
- Schema.Field.of("key1", Schema.FieldType.STRING),
- Schema.Field.of("key2", Schema.FieldType.STRING));
-
- private static final Schema BEAM_SCHEMA_ATTRIBUTES_AND_MAP =
- Schema.of(
- Schema.Field.of("name", Schema.FieldType.STRING),
- Schema.Field.of("key1", Schema.FieldType.STRING),
- Schema.Field.of("key2", Schema.FieldType.STRING),
- Schema.Field.of(
- "attrs", Schema.FieldType.map(Schema.FieldType.STRING,
Schema.FieldType.STRING)));
-
- private static final Schema BEAM_SCHEMA_ATTRIBUTES_MAP =
- Schema.of(
- Schema.Field.of("name", Schema.FieldType.STRING),
- Schema.Field.of(
- "attrs", Schema.FieldType.map(Schema.FieldType.STRING,
Schema.FieldType.STRING)));
-
- private static final Map<String, String> STATIC_MAP;
-
- static {
- Map<String, String> tempMap = new HashMap<>();
- tempMap.put("key1", "first_key");
- tempMap.put("key2", "second_key");
- STATIC_MAP = Collections.unmodifiableMap(tempMap);
- }
-
- private static final List<Row> RAW_ROWS;
-
- static {
- try {
- RAW_ROWS =
- Arrays.asList(
- Row.withSchema(BEAM_RAW_SCHEMA)
- .withFieldValue("payload", "a".getBytes("UTF-8"))
- .build(),
- Row.withSchema(BEAM_RAW_SCHEMA)
- .withFieldValue("payload", "b".getBytes("UTF-8"))
- .build(),
- Row.withSchema(BEAM_RAW_SCHEMA)
- .withFieldValue("payload", "c".getBytes("UTF-8"))
- .build());
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static final List<Row> ROWS_WITH_ATTRIBUTES =
- Arrays.asList(
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
- .withFieldValue("name", "a")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
- .withFieldValue("name", "b")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
- .withFieldValue("name", "c")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .build());
- private static final List<Row> ROWS_WITH_ATTRIBUTES_MAP =
- Arrays.asList(
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP)
- .withFieldValue("name", "a")
- .withFieldValue("attrs", STATIC_MAP)
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP)
- .withFieldValue("name", "b")
- .withFieldValue("attrs", STATIC_MAP)
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP)
- .withFieldValue("name", "c")
- .withFieldValue("attrs", STATIC_MAP)
- .build());
- private static final List<Row> ROWS_WITH_ATTRIBUTES_AND_MAP =
- Arrays.asList(
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP)
- .withFieldValue("name", "a")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .withFieldValue("attrs", STATIC_MAP)
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP)
- .withFieldValue("name", "b")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .withFieldValue("attrs", STATIC_MAP)
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP)
- .withFieldValue("name", "c")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .withFieldValue("attrs", STATIC_MAP)
- .build());
-
- private static final List<Row> ROWS =
- Arrays.asList(
- Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "a").build(),
- Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "b").build(),
- Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "c").build());
-
- private static final Map<String, AttributeValues> ATTRIBUTE_VALUES_MAP = new
HashMap<>();
-
- static {
- ATTRIBUTE_VALUES_MAP.put(
- "key1",
-
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("first_key")).build());
- ATTRIBUTE_VALUES_MAP.put(
- "key2",
-
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("second_key")).build());
- }
-
- private static final List<SequencedMessage> MESSAGES =
- Arrays.asList(
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"name\":\"a\"}"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build())
- .build(),
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"name\":\"b\"}"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build())
- .build(),
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"name\":\"c\"}"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build())
- .build());
-
- private static final List<SequencedMessage> RAW_MESSAGES =
- Arrays.asList(
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("a"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build())
- .build(),
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("b"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build())
- .build(),
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("c"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build())
- .build());
-
- private static final List<SequencedMessage> MESSAGESWITHERROR =
- Arrays.asList(
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"error\":\"a\"}"))
- .build())
- .build(),
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"error\":\"b\"}"))
- .build())
- .build(),
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"error\":\"c\"}"))
- .build())
- .build());
-
- private static final String PROTO_STRING_SCHEMA =
- "syntax = \"proto3\";\n"
- + "package com.test.proto;"
- + "\n"
- + "message MyMessage {\n"
- + " int32 id = 1;\n"
- + " string name = 2;\n"
- + " bool active = 3;\n"
- + "\n"
- + " // Nested field\n"
- + " message Address {\n"
- + " string street = 1;\n"
- + " string city = 2;\n"
- + " string state = 3;\n"
- + " string zip_code = 4;\n"
- + " }\n"
- + "\n"
- + " Address address = 4;\n"
- + "}";
-
- private static final Schema BEAM_PROTO_SCHEMA =
- Schema.builder()
- .addField("id", Schema.FieldType.INT32)
- .addField("name", Schema.FieldType.STRING)
- .addField("active", Schema.FieldType.BOOLEAN)
- .addField(
- "address",
- Schema.FieldType.row(
- Schema.builder()
- .addField("street", Schema.FieldType.STRING)
- .addField("city", Schema.FieldType.STRING)
- .addField("state", Schema.FieldType.STRING)
- .addField("zip_code", Schema.FieldType.STRING)
- .build()))
- .build();
-
- private static final Row INPUT_ROW =
- Row.withSchema(BEAM_PROTO_SCHEMA)
- .withFieldValue("id", 1234)
- .withFieldValue("name", "Doe")
- .withFieldValue("active", false)
- .withFieldValue("address.city", "seattle")
- .withFieldValue("address.street", "fake street")
- .withFieldValue("address.zip_code", "TO-1234")
- .withFieldValue("address.state", "wa")
- .build();
- private static final SerializableFunction<Row, byte[]> INPUT_MAPPER =
- ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA,
"com.test.proto.MyMessage");
-
- private static final byte[] INPUT_SOURCE = INPUT_MAPPER.apply(INPUT_ROW);
-
- private static final List<SequencedMessage> INPUT_MESSAGES =
- Collections.singletonList(
- SequencedMessage.newBuilder()
- .setMessage(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFrom(INPUT_SOURCE))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build())
- .build());
-
- final SerializableFunction<byte[], Row> valueMapper =
- JsonUtils.getJsonBytesToRowFunction(BEAM_SCHEMA);
-
- @Rule public transient TestPipeline p = TestPipeline.create();
-
- @Test
- public void testPubsubLiteErrorFnSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnFailure() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- PCollection<SequencedMessage> input =
p.apply(Create.of(MESSAGESWITHERROR));
- PCollectionTuple output =
- input.apply(
- ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PCollection<Long> count = output.get(ERROR_TAG).apply("error_count",
Count.globally());
-
- PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L));
-
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnRawSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
- List<String> attributes = new ArrayList<>();
- String attributesMap = "";
- Schema beamAttributeSchema =
- PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
- BEAM_RAW_SCHEMA, attributes, attributesMap);
- SerializableFunction<byte[], Row> rawValueMapper =
getRawBytesToRowFunction(BEAM_RAW_SCHEMA);
- PCollection<SequencedMessage> input = p.apply(Create.of(RAW_MESSAGES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(new ErrorFn("Read-Error-Counter", rawValueMapper,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(RAW_ROWS);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnWithAttributesSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- List<String> attributes = new ArrayList<>();
- attributes.add("key1");
- attributes.add("key2");
- String attributeMap = "";
- Schema beamAttributeSchema =
- PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
- BEAM_SCHEMA, attributes, attributeMap);
-
- PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(
- new ErrorFn(
- "Read-Error-Counter",
- valueMapper,
- errorSchema,
- attributes,
- attributeMap,
- beamAttributeSchema,
- Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnWithAttributeMapSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- // empty list of attributes
- List<String> attributes = new ArrayList<>();
- String attributeMap = "attrs";
- Schema beamAttributeSchema =
- PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
- BEAM_SCHEMA, attributes, attributeMap);
-
- PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(
- new ErrorFn(
- "Read-Error-Counter",
- valueMapper,
- errorSchema,
- attributes,
- attributeMap,
- beamAttributeSchema,
- Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES_MAP);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnWithAttributesAndAttributeMapSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- List<String> attributes = new ArrayList<>();
- attributes.add("key1");
- attributes.add("key2");
- String attributeMap = "attrs";
- Schema beamAttributeSchema =
- PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
- BEAM_SCHEMA, attributes, attributeMap);
-
- PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(
- new ErrorFn(
- "Read-Error-Counter",
- valueMapper,
- errorSchema,
- attributes,
- attributeMap,
- beamAttributeSchema,
- Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
-
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES_AND_MAP);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnWithAttributesFailure() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- List<String> attributes = new ArrayList<>();
- attributes.add("randomKey1");
- attributes.add("randomKey2");
- String attributeMap = "";
- Schema beamAttributeSchema =
- PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
- BEAM_SCHEMA, attributes, attributeMap);
-
- PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(
- new ErrorFn(
- "Read-Error-Counter",
- valueMapper,
- errorSchema,
- attributes,
- attributeMap,
- beamAttributeSchema,
- Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PCollection<Long> count = output.get(ERROR_TAG).apply("error_count",
Count.globally());
-
- PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L));
-
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnWithDedupingSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
- PCollection<SequencedMessage> input = p.apply(Create.of(MESSAGES));
- UuidDeduplicationOptions.Builder uuidExtractor =
-
UuidDeduplicationOptions.newBuilder().setUuidExtractor(getUuidFromMessage("key1"));
- PCollectionTuple output =
- input
- .apply(PubsubLiteIO.deduplicate(uuidExtractor.build()))
- .apply(
- ParDo.of(new ErrorFn("Read-Error-Counter", valueMapper,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PCollection<Long> count = output.get(OUTPUT_TAG).apply("error_count",
Count.globally());
-
- // We are deduping so we should only have 1 value
- PAssert.that(count).containsInAnyOrder(Collections.singletonList(1L));
-
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubSubLiteErrorFnReadProto() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
- List<String> attributes = new ArrayList<>();
- String attributesMap = "";
- Schema beamAttributeSchema =
- PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(
- BEAM_PROTO_SCHEMA, attributes, attributesMap);
-
- SerializableFunction<byte[], Row> protoValueMapper =
- ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(
- PROTO_STRING_SCHEMA, "com.test.proto.MyMessage");
-
- PCollection<SequencedMessage> input = p.apply(Create.of(INPUT_MESSAGES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(new ErrorFn("Read-Error-Counter", protoValueMapper,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- // Unexpected behaviors occur if the PCollection schem differs from the
schema generated in the
- // conversion from Proto to Row.
- output.get(OUTPUT_TAG).setRowSchema(beamAttributeSchema);
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(INPUT_ROW);
- p.run().waitUntilFinish();
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
deleted file mode 100644
index 5afa4b7e516..00000000000
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteWriteDlqTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsublite.internal;
-
-import com.google.cloud.pubsublite.proto.AttributeValues;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import com.google.protobuf.ByteString;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
-import
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider;
-import
org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider.ErrorCounterFn;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
-import org.apache.beam.sdk.schemas.utils.JsonUtils;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class PubsubLiteWriteDlqTest {
-
- private static final TupleTag<PubSubMessage> OUTPUT_TAG =
- PubsubLiteWriteSchemaTransformProvider.OUTPUT_TAG;
- private static final TupleTag<Row> ERROR_TAG =
PubsubLiteWriteSchemaTransformProvider.ERROR_TAG;
-
- private static final Schema BEAM_SCHEMA =
- Schema.of(Schema.Field.of("name", Schema.FieldType.STRING));
-
- private static final Schema BEAM_RAW_SCHEMA =
- Schema.of(Schema.Field.of("payload", Schema.FieldType.BYTES));
-
- private static final Schema BEAM_SCHEMA_ATTRIBUTES =
- Schema.of(
- Schema.Field.of("name", Schema.FieldType.STRING),
- Schema.Field.of("key1", Schema.FieldType.STRING),
- Schema.Field.of("key2", Schema.FieldType.STRING));
-
- private static final List<Row> RAW_ROWS;
-
- static {
- try {
- RAW_ROWS =
- Arrays.asList(
- Row.withSchema(BEAM_RAW_SCHEMA)
- .withFieldValue("payload", "a".getBytes("UTF8"))
- .build(),
- Row.withSchema(BEAM_RAW_SCHEMA)
- .withFieldValue("payload", "b".getBytes("UTF8"))
- .build(),
- Row.withSchema(BEAM_RAW_SCHEMA)
- .withFieldValue("payload", "c".getBytes("UTF8"))
- .build());
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static final List<Row> ROWS =
- Arrays.asList(
- Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "a").build(),
- Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "b").build(),
- Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "c").build());
-
- private static final List<Row> ROWSATTRIBUTES =
- Arrays.asList(
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
- .withFieldValue("name", "a")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
- .withFieldValue("name", "b")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .build(),
- Row.withSchema(BEAM_SCHEMA_ATTRIBUTES)
- .withFieldValue("name", "c")
- .withFieldValue("key1", "first_key")
- .withFieldValue("key2", "second_key")
- .build());
-
- private static final String PROTO_STRING_SCHEMA =
- "syntax = \"proto3\";\n"
- + "package com.test.proto;"
- + "\n"
- + "message MyMessage {\n"
- + " string name = 1;\n"
- + "}";
-
- private static final Map<String, AttributeValues> ATTRIBUTE_VALUES_MAP = new
HashMap<>();
-
- static {
- ATTRIBUTE_VALUES_MAP.put(
- "key1",
-
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("first_key")).build());
- ATTRIBUTE_VALUES_MAP.put(
- "key2",
-
AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("second_key")).build());
- }
-
- private static final List<PubSubMessage> MESSAGES_RAW =
- Arrays.asList(
-
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("a")).build(),
-
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("b")).build(),
-
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("c")).build());
-
- private static final List<PubSubMessage> MESSAGES =
- Arrays.asList(
-
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"a\"}")).build(),
-
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"b\"}")).build(),
-
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"c\"}")).build());
- private static final List<PubSubMessage> MESSAGES_WITH_ATTRIBUTES =
- Arrays.asList(
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"name\":\"a\"}"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build(),
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"name\":\"b\"}"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build(),
- PubSubMessage.newBuilder()
- .setData(ByteString.copyFromUtf8("{\"name\":\"c\"}"))
- .putAllAttributes(ATTRIBUTE_VALUES_MAP)
- .build());
-
- final SerializableFunction<Row, byte[]> valueMapper =
- JsonUtils.getRowToJsonBytesFunction(BEAM_SCHEMA);
-
- final SerializableFunction<Row, byte[]> valueMapperRaw =
-
PubsubLiteWriteSchemaTransformProvider.getRowToRawBytesFunction("payload");
-
- @Rule public transient TestPipeline p = TestPipeline.create();
-
- @Test
- public void testPubsubLiteErrorFnSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- PCollection<Row> input = p.apply(Create.of(ROWS));
- PCollectionTuple output =
- input.apply(
- ParDo.of(new ErrorCounterFn("ErrorCounter", valueMapper,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(MESSAGES);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnSuccessRawEvents() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- PCollection<Row> input = p.apply(Create.of(RAW_ROWS));
- PCollectionTuple output =
- input.apply(
- ParDo.of(new ErrorCounterFn("ErrorCounter", valueMapperRaw,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(MESSAGES_RAW);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnSuccessWithAttributes() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- List<String> attributes = new ArrayList<>();
- attributes.add("key1");
- attributes.add("key2");
- Schema schema =
- PubsubLiteWriteSchemaTransformProvider.getSchemaWithoutAttributes(
- BEAM_SCHEMA_ATTRIBUTES, attributes);
- PCollection<Row> input = p.apply(Create.of(ROWSATTRIBUTES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(
- new ErrorCounterFn(
- "ErrorCounter", valueMapper, errorSchema,
Boolean.TRUE, attributes, schema))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
-
PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(MESSAGES_WITH_ATTRIBUTES);
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnSuccessWithAttributesAndDedupingSuccess() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
- List<String> attributes = new ArrayList<>();
- attributes.add("key1");
- attributes.add("key2");
- Schema schema =
- PubsubLiteWriteSchemaTransformProvider.getSchemaWithoutAttributes(
- BEAM_SCHEMA_ATTRIBUTES, attributes);
- PCollection<Row> input = p.apply(Create.of(ROWSATTRIBUTES));
- PCollectionTuple output =
- input.apply(
- ParDo.of(
- new ErrorCounterFn(
- "ErrorCounter", valueMapper, errorSchema,
Boolean.TRUE, attributes, schema))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PCollection<Long> count =
- output
- .get(OUTPUT_TAG)
- .apply(
- ParDo.of(
- new
PubsubLiteWriteSchemaTransformProvider.SetUuidFromPubSubMessage.SetUuidFn(
- "unique_key")))
- .apply("error_count", Count.globally());
- PAssert.that(count).containsInAnyOrder(Collections.singletonList(3L));
- p.run().waitUntilFinish();
- }
-
- @Test
- public void testPubsubLiteErrorFnSuccessProto() {
- Schema errorSchema = ErrorHandling.errorSchemaBytes();
-
- SerializableFunction<Row, byte[]> valueMapperProto =
- ProtoByteUtils.getRowToProtoBytesFromSchema(
- PROTO_STRING_SCHEMA, "com.test.proto.MyMessage");
-
- PCollection<Row> input = p.apply(Create.of(ROWS));
- PCollectionTuple output =
- input.apply(
- ParDo.of(
- new ErrorCounterFn("ErrorCounter", valueMapperProto,
errorSchema, Boolean.TRUE))
- .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
-
- output.get(ERROR_TAG).setRowSchema(errorSchema);
-
- PAssert.that(output.get(OUTPUT_TAG).apply(Count.globally()))
- .containsInAnyOrder(Collections.singletonList(3L));
- p.run().waitUntilFinish();
- }
-}
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml
b/sdks/python/apache_beam/yaml/standard_io.yaml
index 458d3d63e43..e62b3a562c3 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -97,44 +97,6 @@
'ReadFromKafka': '2.65.0'
'WriteToKafka': '2.65.0'
-# PubSub
-- type: renaming
- transforms:
- 'ReadFromPubSubLite': 'ReadFromPubSubLite'
- 'WriteToPubSubLite': 'WriteToPubSubLite'
- config:
- mappings:
- 'ReadFromPubSubLite':
- 'project': 'project'
- 'schema': 'schema'
- 'format': 'format'
- 'subscription_name': 'subscription_name'
- 'location': 'location'
- 'attributes': 'attributes'
- 'attribute_map': 'attribute_map'
- 'attribute_id': 'attribute_id'
- 'error_handling': 'error_handling'
- 'file_descriptor_path': 'file_descriptor_path'
- 'message_name': 'message_name'
- 'WriteToPubSubLite':
- 'project': 'project'
- 'format': 'format'
- 'topic_name': 'topic_name'
- 'location': 'location'
- 'attributes': 'attributes'
- 'attribute_id': 'attribute_id'
- 'error_handling': 'error_handling'
- 'file_descriptor_path': 'file_descriptor_path'
- 'message_name': 'message_name'
- 'schema': 'schema'
- underlying_provider:
- type: beamJar
- transforms:
- 'ReadFromPubSubLite':
'beam:schematransform:org.apache.beam:pubsublite_read:v1'
- 'WriteToPubSubLite':
'beam:schematransform:org.apache.beam:pubsublite_write:v1'
- config:
- gradle_target:
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
-
# TODO(yaml): Tests are assuming python providers are before java ones, hence
# the order below. This should be fixed in the future.