This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c490304040a Moving Pubsub Transforms to beam (#27366) c490304040a is described below commit c490304040a3cf5782bda412a8395dda0fb63554 Author: Dip Patel <37777672+dippate...@users.noreply.github.com> AuthorDate: Tue Jul 18 08:59:41 2023 -0500 Moving Pubsub Transforms to beam (#27366) --- .../PubsubReadSchemaTransformConfiguration.java | 160 ++--- .../pubsub/PubsubReadSchemaTransformProvider.java | 329 ++++----- .../PubsubSchemaTransformMessageToRowFactory.java | 179 ----- .../PubsubWriteSchemaTransformConfiguration.java | 174 +---- .../pubsub/PubsubWriteSchemaTransformProvider.java | 447 +++--------- .../PubsubReadSchemaTransformProviderTest.java | 419 ++++------- ...bsubSchemaTransformMessageToRowFactoryTest.java | 337 --------- .../PubsubWriteSchemaTransformProviderIT.java | 180 ----- .../PubsubWriteSchemaTransformProviderTest.java | 786 --------------------- 9 files changed, 459 insertions(+), 2552 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java index f663f60f09b..befb22ca6dc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; +import com.google.api.client.util.Clock; import com.google.auto.value.AutoValue; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; /** * Configuration for reading from Pub/Sub. @@ -33,137 +35,57 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract class PubsubReadSchemaTransformConfiguration { + @SchemaFieldDescription( + "The name of the topic to consume data from. If a topic is specified, " + + " will create a new subscription for that topic and start consuming from that point. " + + "Either a topic or a subscription must be provided. " + + "Format: projects/${PROJECT}/topics/${TOPIC}") + public abstract @Nullable String getTopic(); + + @SchemaFieldDescription( + "The name of the subscription to consume data. " + + "Either a topic or subscription must be provided. " + + "Format: projects/${PROJECT}/subscriptions/${SUBSCRIPTION}") + public abstract @Nullable String getSubscription(); + + @SchemaFieldDescription( + "The encoding format for the data stored in Pubsub. Valid options are: " + + PubsubReadSchemaTransformProvider.VALID_FORMATS_STR) + public abstract String getFormat(); // AVRO, JSON + + @SchemaFieldDescription( + "The schema in which the data is encoded in the Pubsub topic. " + + "For AVRO data, this is a schema defined with AVRO schema syntax " + + "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). " + + "For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/).") + public abstract String getSchema(); + + // Used for testing only. + public abstract @Nullable PubsubTestClientFactory getClientFactory(); + + // Used for testing only. + public abstract @Nullable Clock getClock(); - /** Instantiates a {@link PubsubReadSchemaTransformConfiguration.Builder}. */ public static Builder builder() { return new AutoValue_PubsubReadSchemaTransformConfiguration.Builder(); } - /** The expected schema of the Pub/Sub message. */ - public abstract Schema getDataSchema(); - - /** - * The Pub/Sub topic path to write failures. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the dead - * letter queue topic string. - */ - @Nullable - public abstract String getDeadLetterQueue(); - - /** - * The expected format of the Pub/Sub message. - * - * <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from - * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. - */ - @Nullable - public abstract String getFormat(); - - /** Used by the ProtoPayloadSerializerProvider when serializing from a Pub/Sub message. */ - @Nullable - public abstract String getProtoClass(); - - /** - * The subscription from which to read Pub/Sub messages. - * - * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format of - * the subscription string. - */ - @Nullable - public abstract String getSubscription(); - - /** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */ - @Nullable - public abstract String getThriftClass(); - - /** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */ - @Nullable - public abstract String getThriftProtocolFactoryClass(); - - /** - * When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message - * attributes, specifies the name of the attribute that contains the timestamp. - */ - @Nullable - public abstract String getTimestampAttribute(); - - /** - * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub message - * attributes, specifies the name of the attribute containing the unique identifier. - */ - @Nullable - public abstract String getIdAttribute(); - - /** - * The topic from which to read Pub/Sub messages. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * topic string. - */ - @Nullable - public abstract String getTopic(); - @AutoValue.Builder public abstract static class Builder { + public abstract Builder setTopic(@Nullable String topic); - /** The expected schema of the Pub/Sub message. */ - public abstract Builder setDataSchema(Schema value); - - /** - * The Pub/Sub topic path to write failures. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * dead letter queue topic string. - */ - public abstract Builder setDeadLetterQueue(String value); - - /** - * The expected format of the Pub/Sub message. - * - * <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} - * from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. - */ - public abstract Builder setFormat(String value); - - /** Used by the ProtoPayloadSerializerProvider when serializing from a Pub/Sub message. */ - public abstract Builder setProtoClass(String value); - - /** - * The subscription from which to read Pub/Sub messages. - * - * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format of - * the subscription string. - */ - public abstract Builder setSubscription(String value); - - /** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */ - public abstract Builder setThriftClass(String value); + public abstract Builder setSubscription(@Nullable String subscription); - /** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */ - public abstract Builder setThriftProtocolFactoryClass(String value); + public abstract Builder setFormat(String format); - /** - * When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message - * attributes, specifies the name of the attribute that contains the timestamp. - */ - public abstract Builder setTimestampAttribute(String value); + public abstract Builder setSchema(String schema); - /** - * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub - * message attributes, specifies the name of the attribute containing the unique identifier. - */ - public abstract Builder setIdAttribute(String value); + // Used for testing only. + public abstract Builder setClientFactory(@Nullable PubsubTestClientFactory clientFactory); - /** - * The topic from which to read Pub/Sub messages. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * topic string. - */ - public abstract Builder setTopic(String value); + // Used for testing only. + public abstract Builder setClock(@Nullable Clock clock); - /** Builds a {@link PubsubReadSchemaTransformConfiguration} instance. */ public abstract PubsubReadSchemaTransformConfiguration build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java index cec07dafef4..c0e8880d028 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java @@ -17,23 +17,39 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG; - import com.google.api.client.util.Clock; import com.google.auto.service.AutoService; +import java.io.Serializable; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers; +import java.util.Objects; +import java.util.Set; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.FinishBundle; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; /** * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using @@ -43,196 +59,191 @@ import org.checkerframework.checker.nullness.qual.Nullable; * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam * repository. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -@Internal @AutoService(SchemaTransformProvider.class) public class PubsubReadSchemaTransformProvider extends TypedSchemaTransformProvider<PubsubReadSchemaTransformConfiguration> { - static final String OUTPUT_TAG = "OUTPUT"; - /** Returns the expected class of the configuration. */ - @Override - protected Class<PubsubReadSchemaTransformConfiguration> configurationClass() { - return PubsubReadSchemaTransformConfiguration.class; - } + public static final String VALID_FORMATS_STR = "AVRO,JSON"; + public static final Set<String> VALID_DATA_FORMATS = + Sets.newHashSet(VALID_FORMATS_STR.split(",")); - /** Returns the expected {@link SchemaTransform} of the configuration. */ - @Override - protected SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration) { - PubsubMessageToRow toRowTransform = - PubsubSchemaTransformMessageToRowFactory.from(configuration).buildMessageToRow(); - return new PubsubReadSchemaTransform(configuration, toRowTransform); - } + public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {}; + public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {}; + public static final Schema ERROR_SCHEMA = + Schema.builder().addStringField("error").addNullableByteArrayField("row").build(); - /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @Override - public String identifier() { - return "beam:schematransform:org.apache.beam:pubsub_read:v1"; - } - - /** - * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since - * no input is expected, this returns an empty list. - */ - @Override - public List<String> inputCollectionNames() { - return Collections.emptyList(); + public Class<PubsubReadSchemaTransformConfiguration> configurationClass() { + return PubsubReadSchemaTransformConfiguration.class; } - /** - * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since - * a single output is expected, this returns a list with a single name. - */ @Override - public List<String> outputCollectionNames() { - return Collections.singletonList(OUTPUT_TAG); - } - - /** - * An implementation of {@link SchemaTransform} for Pub/Sub reads configured using {@link - * PubsubReadSchemaTransformConfiguration}. - */ - static class PubsubReadSchemaTransform extends SchemaTransform { + public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration) { + if (configuration.getSubscription() == null && configuration.getTopic() == null) { + throw new IllegalArgumentException( + "To read from Pubsub, a subscription name or a topic name must be provided"); + } - private final PubsubReadSchemaTransformConfiguration configuration; - private final PubsubMessageToRow pubsubMessageToRow; + if (configuration.getSubscription() != null && configuration.getTopic() != null) { + throw new IllegalArgumentException( + "To read from Pubsub, a subscription name or a topic name must be provided. Not both."); + } - private PubsubClient.PubsubClientFactory clientFactory; + if ((Strings.isNullOrEmpty(configuration.getSchema()) + && !Strings.isNullOrEmpty(configuration.getFormat())) + || (!Strings.isNullOrEmpty(configuration.getSchema()) + && Strings.isNullOrEmpty(configuration.getFormat()))) { + throw new IllegalArgumentException( + "A schema was provided without a data format (or viceversa). Please provide " + + "both of these parameters to read from Pubsub, or if you would like to use the Pubsub schema service," + + " please leave both of these blank."); + } - private Clock clock; + Schema beamSchema; + SerializableFunction<byte[], Row> valueMapper; - private PubsubReadSchemaTransform( - PubsubReadSchemaTransformConfiguration configuration, - PubsubMessageToRow pubsubMessageToRow) { - this.configuration = configuration; - this.pubsubMessageToRow = pubsubMessageToRow; + if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) { + throw new IllegalArgumentException( + String.format( + "Format %s not supported. Only supported formats are %s", + configuration.getFormat(), VALID_FORMATS_STR)); } - - /** - * Sets the {@link PubsubClient.PubsubClientFactory}. - * - * <p>Used for testing. - */ - void setClientFactory(PubsubClient.PubsubClientFactory value) { - this.clientFactory = value; + beamSchema = + Objects.equals(configuration.getFormat(), "JSON") + ? JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()) + : AvroUtils.toBeamSchema( + new org.apache.avro.Schema.Parser().parse(configuration.getSchema())); + valueMapper = + Objects.equals(configuration.getFormat(), "JSON") + ? JsonUtils.getJsonBytesToRowFunction(beamSchema) + : AvroUtils.getAvroBytesToRowFunction(beamSchema); + + PubsubReadSchemaTransform transform = + new PubsubReadSchemaTransform( + configuration.getTopic(), configuration.getSubscription(), beamSchema, valueMapper); + + if (configuration.getClientFactory() != null) { + transform.setClientFactory(configuration.getClientFactory()); + } + if (configuration.getClock() != null) { + transform.setClock(configuration.getClock()); } - /** - * Sets the {@link Clock}. - * - * <p>Used for testing. - */ - void setClock(Clock clock) { - this.clock = clock; + return transform; + } + + private static class PubsubReadSchemaTransform extends SchemaTransform implements Serializable { + final Schema beamSchema; + final SerializableFunction<byte[], Row> valueMapper; + final @Nullable String topic; + final @Nullable String subscription; + @Nullable PubsubTestClientFactory clientFactory; + @Nullable Clock clock; + + PubsubReadSchemaTransform( + @Nullable String topic, + @Nullable String subscription, + Schema beamSchema, + SerializableFunction<byte[], Row> valueMapper) { + this.topic = topic; + this.subscription = subscription; + this.beamSchema = beamSchema; + this.valueMapper = valueMapper; } - /** Validates the {@link PubsubReadSchemaTransformConfiguration}. */ - @Override - public void validate(@Nullable PipelineOptions options) { - if (configuration.getSubscription() == null && configuration.getTopic() == null) { - throw new IllegalArgumentException( - String.format( - "%s needs to set either the topic or the subscription", - PubsubReadSchemaTransformConfiguration.class)); - } + private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> { + private Counter pubsubErrorCounter; + private Long errorsInBundle = 0L; + private SerializableFunction<byte[], Row> valueMapper; - if (configuration.getSubscription() != null && configuration.getTopic() != null) { - throw new IllegalArgumentException( - String.format( - "%s should not set both the topic or the subscription", - PubsubReadSchemaTransformConfiguration.class)); + ErrorCounterFn(String name, SerializableFunction<byte[], Row> valueMapper) { + this.pubsubErrorCounter = Metrics.counter(PubsubReadSchemaTransformProvider.class, name); + this.valueMapper = valueMapper; } - try { - PayloadSerializers.getSerializer( - configuration.getFormat(), configuration.getDataSchema(), new HashMap<>()); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - String.format( - "Invalid %s, no serializer provider exists for format `%s`", - PubsubReadSchemaTransformConfiguration.class, configuration.getFormat())); + @ProcessElement + public void process(@DoFn.Element PubsubMessage message, MultiOutputReceiver receiver) { + + try { + receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload())); + } catch (Exception e) { + errorsInBundle += 1; + receiver + .get(ERROR_TAG) + .output( + Row.withSchema(ERROR_SCHEMA) + .addValues(e.toString(), message.getPayload()) + .build()); + } } - } - /** Reads from Pub/Sub according to {@link PubsubReadSchemaTransformConfiguration}. */ - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - if (!input.getAll().isEmpty()) { - throw new IllegalArgumentException( - String.format( - "%s %s input is expected to be empty", - input.getClass().getSimpleName(), getClass().getSimpleName())); + @FinishBundle + public void finish(FinishBundleContext c) { + pubsubErrorCounter.inc(errorsInBundle); + errorsInBundle = 0L; } - - PCollectionTuple rowsWithDlq = - input - .getPipeline() - .apply("ReadFromPubsub", buildPubsubRead()) - .apply("PubsubMessageToRow", pubsubMessageToRow); - - writeToDeadLetterQueue(rowsWithDlq); - - return PCollectionRowTuple.of(OUTPUT_TAG, rowsWithDlq.get(MAIN_TAG)); } - private void writeToDeadLetterQueue(PCollectionTuple rowsWithDlq) { - PubsubIO.Write<PubsubMessage> deadLetterQueue = buildDeadLetterQueueWrite(); - if (deadLetterQueue == null) { - return; - } - rowsWithDlq.get(DLQ_TAG).apply("WriteToDeadLetterQueue", deadLetterQueue); + void setClientFactory(@Nullable PubsubTestClientFactory factory) { + this.clientFactory = factory; } - /** - * Builds {@link PubsubIO.Write} dead letter queue from {@link - * PubsubReadSchemaTransformConfiguration}. - */ - PubsubIO.Write<PubsubMessage> buildDeadLetterQueueWrite() { - if (configuration.getDeadLetterQueue() == null) { - return null; - } - - PubsubIO.Write<PubsubMessage> writeDlq = - PubsubIO.writeMessages().to(configuration.getDeadLetterQueue()); - - if (configuration.getTimestampAttribute() != null) { - writeDlq = writeDlq.withTimestampAttribute(configuration.getTimestampAttribute()); - } - - return writeDlq; + void setClock(@Nullable Clock clock) { + this.clock = clock; } - /** Builds {@link PubsubIO.Read} from a {@link PubsubReadSchemaTransformConfiguration}. */ + @SuppressWarnings("nullness") PubsubIO.Read<PubsubMessage> buildPubsubRead() { - PubsubIO.Read<PubsubMessage> read = PubsubIO.readMessagesWithAttributes(); - - if (configuration.getSubscription() != null) { - read = read.fromSubscription(configuration.getSubscription()); + PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages(); + if (!Strings.isNullOrEmpty(topic)) { + pubsubRead = pubsubRead.fromTopic(topic); + } else { + pubsubRead = pubsubRead.fromSubscription(subscription); } - - if (configuration.getTopic() != null) { - read = read.fromTopic(configuration.getTopic()); + if (clientFactory != null && clock != null) { + pubsubRead = pubsubRead.withClientFactory(clientFactory); + pubsubRead = clientFactory.setClock(pubsubRead, clock); + } else if (clientFactory != null || clock != null) { + throw new IllegalArgumentException( + "Both PubsubTestClientFactory and Clock need to be specified for testing, but only one is provided"); } + return pubsubRead; + } - if (configuration.getTimestampAttribute() != null) { - read = read.withTimestampAttribute(configuration.getTimestampAttribute()); - } + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PubsubIO.Read<PubsubMessage> pubsubRead = buildPubsubRead(); - if (configuration.getIdAttribute() != null) { - read = read.withIdAttribute(configuration.getIdAttribute()); - } + PCollectionTuple outputTuple = + input + .getPipeline() + .apply(pubsubRead) + .apply( + ParDo.of(new ErrorCounterFn("PubSub-read-error-counter", valueMapper)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + return PCollectionRowTuple.of( + "output", + outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema), + "errors", + outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); + } + } - if (clientFactory != null) { - read = read.withClientFactory(clientFactory); - } + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:pubsub_read:v1"; + } - if (clock != null) { - read = read.withClock(clock); - } + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + inputCollectionNames() { + return Collections.emptyList(); + } - return read; - } + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + outputCollectionNames() { + return Arrays.asList("output", "errors"); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java deleted file mode 100644 index 988c593e32f..00000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD; -import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW; - -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers; - -/** - * Builds a {@link PubsubMessageToRow} from a {@link PubsubReadSchemaTransformConfiguration}. - * - * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We - * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam - * repository. - */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -@Internal -class PubsubSchemaTransformMessageToRowFactory { - private static final String DEFAULT_FORMAT = "json"; - - private static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE = - Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), Schema.FieldType.STRING); - private static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA = - Schema.builder().addStringField("key").addStringField("value").build(); - private static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE = - Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA)); - - private static final String THRIFT_CLASS_KEY = "thriftClass"; - private static final String THRIFT_PROTOCOL_FACTORY_CLASS_KEY = "thriftProtocolFactoryClass"; - private static final String PROTO_CLASS_KEY = "protoClass"; - - /** - * Instantiate a {@link PubsubSchemaTransformMessageToRowFactory} from a {@link - * PubsubReadSchemaTransformConfiguration}. - */ - static PubsubSchemaTransformMessageToRowFactory from( - PubsubReadSchemaTransformConfiguration configuration) { - return new PubsubSchemaTransformMessageToRowFactory(configuration); - } - - /** Build the {@link PubsubMessageToRow}. */ - PubsubMessageToRow buildMessageToRow() { - PubsubMessageToRow.Builder builder = - PubsubMessageToRow.builder() - .messageSchema(configuration.getDataSchema()) - .useDlq( - configuration.getDeadLetterQueue() != null - && !configuration.getDeadLetterQueue().isEmpty()) - .useFlatSchema(!shouldUseNestedSchema()); - - if (needsSerializer()) { - builder = builder.serializerProvider(serializer()); - } - - return builder.build(); - } - - private final PubsubReadSchemaTransformConfiguration configuration; - - private PubsubSchemaTransformMessageToRowFactory( - PubsubReadSchemaTransformConfiguration configuration) { - this.configuration = configuration; - } - - private PayloadSerializer payloadSerializer() { - Schema schema = configuration.getDataSchema(); - String format = DEFAULT_FORMAT; - - if (configuration.getFormat() != null && !configuration.getFormat().isEmpty()) { - format = configuration.getFormat(); - } - - Map<String, Object> params = new HashMap<>(); - - if (configuration.getThriftClass() != null && !configuration.getThriftClass().isEmpty()) { - params.put(THRIFT_CLASS_KEY, configuration.getThriftClass()); - } - - if (configuration.getThriftProtocolFactoryClass() != null - && !configuration.getThriftProtocolFactoryClass().isEmpty()) { - params.put(THRIFT_PROTOCOL_FACTORY_CLASS_KEY, configuration.getThriftProtocolFactoryClass()); - } - - if (configuration.getProtoClass() != null && !configuration.getProtoClass().isEmpty()) { - params.put(PROTO_CLASS_KEY, configuration.getProtoClass()); - } - - return PayloadSerializers.getSerializer(format, schema, params); - } - - PubsubMessageToRow.SerializerProvider serializer() { - return input -> payloadSerializer(); - } - - /** - * Determines whether the {@link PubsubMessageToRow} needs a {@link - * PubsubMessageToRow.SerializerProvider}. - * - * <p>The determination is based on {@link #shouldUseNestedSchema()} is false or if the {@link - * PubsubMessageToRow#PAYLOAD_FIELD} is not present. - */ - boolean needsSerializer() { - return !shouldUseNestedSchema() || !fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES); - } - - /** - * Determines whether a nested schema should be used for {@link - * PubsubReadSchemaTransformConfiguration#getDataSchema()}. - * - * <p>The determination is based on {@link #schemaHasValidPayloadField()} and {@link - * #schemaHasValidAttributesField()}} - */ - boolean shouldUseNestedSchema() { - return schemaHasValidPayloadField() && schemaHasValidAttributesField(); - } - - /** - * Determines whether {@link PubsubReadSchemaTransformConfiguration#getDataSchema()} has a valid - * {@link PubsubMessageToRow#PAYLOAD_FIELD}. - */ - boolean schemaHasValidPayloadField() { - Schema schema = configuration.getDataSchema(); - if (!schema.hasField(PAYLOAD_FIELD)) { - return false; - } - if (fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES)) { - return true; - } - return schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(ROW); - } - - /** - * Determines whether {@link PubsubReadSchemaTransformConfiguration#getDataSchema()} has a valid - * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} field. - * - * <p>The determination is based on whether {@link #fieldPresent(String, Schema.FieldType)} for - * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} is true for either {@link - * #ATTRIBUTE_MAP_FIELD_TYPE} or {@link #ATTRIBUTE_ARRAY_FIELD_TYPE} {@link Schema.FieldType}s. - */ - boolean schemaHasValidAttributesField() { - return fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE) - || fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE); - } - - /** - * Determines whether {@link PubsubReadSchemaTransformConfiguration#getDataSchema()} contains the - * field and whether that field is an expectedType {@link Schema.FieldType}. - */ - boolean fieldPresent(String field, Schema.FieldType expectedType) { - Schema schema = configuration.getDataSchema(); - return schema.hasField(field) - && expectedType.equivalent( - schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java index acaf04cdfc6..57620c968c5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java @@ -18,9 +18,9 @@ package org.apache.beam.sdk.io.gcp.pubsub; import com.google.auto.value.AutoValue; -import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; /** * Configuration for writing to Pub/Sub. @@ -32,179 +32,25 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract class PubsubWriteSchemaTransformConfiguration { + @SchemaFieldDescription( + "The encoding format for the data stored in Pubsub. Valid options are: " + + PubsubWriteSchemaTransformProvider.VALID_FORMATS_STR) + public abstract String getFormat(); - public static final String DEFAULT_TIMESTAMP_ATTRIBUTE = "event_timestamp"; + @SchemaFieldDescription( + "The name of the topic to write data to. " + "Format: projects/${PROJECT}/topics/${TOPIC}") + public abstract String getTopic(); public static Builder builder() { return new AutoValue_PubsubWriteSchemaTransformConfiguration.Builder(); } - public static TargetConfiguration.Builder targetConfigurationBuilder() { - return new AutoValue_PubsubWriteSchemaTransformConfiguration_TargetConfiguration.Builder() - .setTimestampAttributeKey(DEFAULT_TIMESTAMP_ATTRIBUTE); - } - - public static SourceConfiguration.Builder sourceConfigurationBuilder() { - return new AutoValue_PubsubWriteSchemaTransformConfiguration_SourceConfiguration.Builder(); - } - - /** - * Configuration details of the source {@link org.apache.beam.sdk.values.Row} {@link - * org.apache.beam.sdk.schemas.Schema}. - */ - @Nullable - public abstract SourceConfiguration getSource(); - - /** Configuration details of the target {@link PubsubMessage}. */ - public abstract TargetConfiguration getTarget(); - - /** - * The topic to which to write Pub/Sub messages. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * topic string. - */ - public abstract String getTopic(); - - /** - * The expected format of the Pub/Sub message. - * - * <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from - * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of supported - * values by invoking {@link org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}. - * - * <pre>{@code Providers.loadProviders(PayloadSerializer.class).keySet()}</pre> - */ - @Nullable - public abstract String getFormat(); - - /** - * When writing to Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub message - * attributes, specifies the name of the attribute containing the unique identifier. - */ - @Nullable - public abstract String getIdAttribute(); - - /** Builder for {@link PubsubWriteSchemaTransformConfiguration}. */ @AutoValue.Builder public abstract static class Builder { + public abstract Builder setFormat(String format); - /** - * Configuration details of the source {@link org.apache.beam.sdk.values.Row} {@link - * org.apache.beam.sdk.schemas.Schema}. - */ - public abstract Builder setSource(SourceConfiguration value); - - /** Configuration details of the target {@link PubsubMessage}. */ - public abstract Builder setTarget(TargetConfiguration value); - - /** - * The topic to which to write Pub/Sub messages. - * - * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * topic string. - */ - public abstract Builder setTopic(String value); - - /** - * The expected format of the Pub/Sub message. - * - * <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} - * from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of - * supported values by invoking {@link - * org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}. - * - * <pre>{@code Providers.loadProviders(PayloadSerializer.class).keySet()}</pre> - */ - public abstract Builder setFormat(String value); - - /** - * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub - * message attributes, specifies the name of the attribute containing the unique identifier. - */ - public abstract Builder setIdAttribute(String value); + public abstract Builder setTopic(String topic); public abstract PubsubWriteSchemaTransformConfiguration build(); } - - @DefaultSchema(AutoValueSchema.class) - @AutoValue - public abstract static class SourceConfiguration { - /** - * The attributes field name of the source {@link org.apache.beam.sdk.values.Row}. {@link - * org.apache.beam.sdk.schemas.Schema.FieldType} must be a <code>Map<String, String> - * </code> - */ - @Nullable - public abstract String getAttributesFieldName(); - - /** - * The timestamp field name of the source {@link org.apache.beam.sdk.values.Row}. {@link - * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link - * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}. - */ - @Nullable - public abstract String getTimestampFieldName(); - - /** - * The payload field name of the source {@link org.apache.beam.sdk.values.Row}. {@link - * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link - * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link - * org.apache.beam.sdk.values.Row}. If null, payload serialized from user fields other than - * attributes. Not compatible with other payload intended fields. - */ - @Nullable - public abstract String getPayloadFieldName(); - - @AutoValue.Builder - public abstract static class Builder { - /** - * The attributes field name of the source {@link org.apache.beam.sdk.values.Row}. {@link - * org.apache.beam.sdk.schemas.Schema.FieldType} must be a <code>Map<String, String> - * </code> - */ - public abstract Builder setAttributesFieldName(String value); - - /** - * The timestamp field name of the source {@link org.apache.beam.sdk.values.Row}. {@link - * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link - * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}. - */ - public abstract Builder setTimestampFieldName(String value); - - /** - * The payload field name of the source {@link org.apache.beam.sdk.values.Row}. {@link - * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link - * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link - * org.apache.beam.sdk.values.Row}. If null, payload serialized from user fields other than - * attributes. Not compatible with other payload intended fields. - */ - public abstract Builder setPayloadFieldName(String value); - - public abstract SourceConfiguration build(); - } - } - - @DefaultSchema(AutoValueSchema.class) - @AutoValue - public abstract static class TargetConfiguration { - - /** - * The attribute key to assign the {@link PubsubMessage} stringified timestamp value. {@link - * #builder()} method defaults value to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}. - */ - public abstract String getTimestampAttributeKey(); - - @AutoValue.Builder - public abstract static class Builder { - - /** - * The attribute key to assign the {@link PubsubMessage} stringified timestamp value. Defaults - * to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}. - */ - public abstract Builder setTimestampAttributeKey(String value); - - public abstract TargetConfiguration build(); - } - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java index 7f3f6f2c702..8e8b804801b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java @@ -17,56 +17,29 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ERROR; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.OUTPUT; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_ROW_TYPE_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.removeFields; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; - -import com.google.api.client.util.Clock; import com.google.auto.service.AutoService; -import java.io.IOException; -import java.util.ArrayList; +import java.io.Serializable; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Set; -import java.util.stream.Stream; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.FieldMatcher; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.SchemaReflection; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.SourceConfiguration; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.Field; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.Schema.TypeName; -import org.apache.beam.sdk.schemas.io.Providers; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.utils.JsonUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.joda.time.Instant; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; /** * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using @@ -76,360 +49,102 @@ import org.joda.time.Instant; * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam * repository. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -@Internal @AutoService(SchemaTransformProvider.class) public class PubsubWriteSchemaTransformProvider extends TypedSchemaTransformProvider<PubsubWriteSchemaTransformConfiguration> { - private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:pubsub_write:v1"; - static final String INPUT_TAG = "input"; - static final String ERROR_TAG = "error"; - /** Returns the expected class of the configuration. */ - @Override - protected Class<PubsubWriteSchemaTransformConfiguration> configurationClass() { - return PubsubWriteSchemaTransformConfiguration.class; - } + public static final TupleTag<PubsubMessage> OUTPUT_TAG = new TupleTag<PubsubMessage>() {}; + public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {}; - /** Returns the expected {@link SchemaTransform} of the configuration. */ - @Override - public SchemaTransform from(PubsubWriteSchemaTransformConfiguration configuration) { - return new PubsubWriteSchemaTransform(configuration); - } - - /** Implementation of the {@link SchemaTransformProvider} identifier method. */ - @Override - public String identifier() { - return IDENTIFIER; - } - - /** - * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a - * single input is expected, this returns a list with a single name. - */ - @Override - public List<String> inputCollectionNames() { - return Collections.singletonList(INPUT_TAG); - } + public static final String VALID_FORMATS_STR = "AVRO,JSON"; + public static final Set<String> VALID_DATA_FORMATS = + Sets.newHashSet(VALID_FORMATS_STR.split(",")); - /** - * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. The - * only expected output is the {@link #ERROR_TAG}. - */ @Override - public List<String> outputCollectionNames() { - return Collections.singletonList(ERROR_TAG); + public Class<PubsubWriteSchemaTransformConfiguration> configurationClass() { + return PubsubWriteSchemaTransformConfiguration.class; } - /** - * An implementation of {@link SchemaTransform} for Pub/Sub writes configured using {@link - * PubsubWriteSchemaTransformConfiguration}. - */ - static class PubsubWriteSchemaTransform extends SchemaTransform { - - private final PubsubWriteSchemaTransformConfiguration configuration; + public static class ErrorFn extends DoFn<Row, PubsubMessage> { + private SerializableFunction<Row, byte[]> valueMapper; + private Schema errorSchema; - private PubsubClient.PubsubClientFactory pubsubClientFactory; - - PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration configuration) { - this.configuration = configuration; + ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema) { + this.valueMapper = valueMapper; + this.errorSchema = errorSchema; } - PubsubWriteSchemaTransform withPubsubClientFactory(PubsubClient.PubsubClientFactory factory) { - this.pubsubClientFactory = factory; - return this; - } - - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - if (input.getAll().size() != 1 || !input.has(INPUT_TAG)) { - throw new IllegalArgumentException( - String.format( - "%s %s input is expected to contain a single %s tagged PCollection<Row>", - input.getClass().getSimpleName(), getClass().getSimpleName(), INPUT_TAG)); - } - - PCollection<Row> rows = input.get(INPUT_TAG); - if (rows.getSchema().getFieldCount() == 0) { - throw new IllegalArgumentException(String.format("empty Schema for %s", INPUT_TAG)); - } - - Schema targetSchema = buildTargetSchema(rows.getSchema()); - - rows = - rows.apply( - ConvertForRowToMessage.class.getSimpleName(), - convertForRowToMessage(targetSchema)) - .setRowSchema(targetSchema); - - Schema schema = rows.getSchema(); - - Schema serializableSchema = - removeFields(schema, DEFAULT_ATTRIBUTES_KEY_NAME, DEFAULT_EVENT_TIMESTAMP_KEY_NAME); - FieldMatcher payloadRowMatcher = FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW); - if (payloadRowMatcher.match(serializableSchema)) { - serializableSchema = - serializableSchema.getField(DEFAULT_PAYLOAD_KEY_NAME).getType().getRowSchema(); - } - - validateTargetSchemaAgainstPubsubSchema(serializableSchema, input.getPipeline().getOptions()); - - PCollectionTuple pct = - rows.apply( - PubsubRowToMessage.class.getSimpleName(), - buildPubsubRowToMessage(serializableSchema)); - - PCollection<PubsubMessage> messages = pct.get(OUTPUT); - messages.apply(PubsubIO.Write.class.getSimpleName(), buildPubsubWrite()); - return PCollectionRowTuple.of(ERROR_TAG, pct.get(ERROR)); - } - - PayloadSerializer getPayloadSerializer(Schema schema) { - if (configuration.getFormat() == null) { - return null; - } - String format = configuration.getFormat(); - Set<String> availableFormats = - Providers.loadProviders(PayloadSerializerProvider.class).keySet(); - if (!availableFormats.contains(format)) { - String availableFormatsString = String.join(",", availableFormats); - throw new IllegalArgumentException( - String.format( - "%s is not among the valid formats: [%s]", format, availableFormatsString)); - } - return PayloadSerializers.getSerializer(configuration.getFormat(), schema, ImmutableMap.of()); - } - - PubsubRowToMessage buildPubsubRowToMessage(Schema schema) { - PubsubRowToMessage.Builder builder = - PubsubRowToMessage.builder().setPayloadSerializer(getPayloadSerializer(schema)); - - if (configuration.getTarget() != null) { - builder = - builder.setTargetTimestampAttributeName( - configuration.getTarget().getTimestampAttributeKey()); - } - - return builder.build(); - } - - PubsubIO.Write<PubsubMessage> buildPubsubWrite() { - PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages().to(configuration.getTopic()); - - if (configuration.getIdAttribute() != null) { - write = write.withIdAttribute(configuration.getIdAttribute()); - } - - if (pubsubClientFactory != null) { - write = write.withClientFactory(pubsubClientFactory); + @ProcessElement + public void processElement(@Element Row row, MultiOutputReceiver receiver) { + try { + receiver.get(OUTPUT_TAG).output(new PubsubMessage(valueMapper.apply(row), null)); + } catch (Exception e) { + receiver + .get(ERROR_TAG) + .output(Row.withSchema(errorSchema).addValues(e.toString(), row).build()); } - - return write; } + } - void validateSourceSchemaAgainstConfiguration(Schema sourceSchema) { - if (sourceSchema.getFieldCount() == 0) { - throw new IllegalArgumentException(String.format("empty Schema for %s", INPUT_TAG)); - } - - if (configuration.getSource() == null) { - return; - } - - SourceConfiguration source = configuration.getSource(); - - if (source.getAttributesFieldName() != null) { - String fieldName = source.getAttributesFieldName(); - FieldType fieldType = ATTRIBUTES_FIELD_TYPE; - FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType); - checkArgument( - fieldMatcher.match(sourceSchema), - String.format("schema missing field: %s for type %s: ", fieldName, fieldType)); - } - - if (source.getTimestampFieldName() != null) { - String fieldName = source.getTimestampFieldName(); - FieldType fieldType = EVENT_TIMESTAMP_FIELD_TYPE; - FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType); - checkArgument( - fieldMatcher.match(sourceSchema), - String.format("schema missing field: %s for type: %s", fieldName, fieldType)); - } - - if (source.getPayloadFieldName() == null) { - return; - } - - String fieldName = source.getPayloadFieldName(); - FieldMatcher bytesFieldMatcher = FieldMatcher.of(fieldName, PAYLOAD_BYTES_TYPE_NAME); - FieldMatcher rowFieldMatcher = FieldMatcher.of(fieldName, PAYLOAD_ROW_TYPE_NAME); - SchemaReflection schemaReflection = SchemaReflection.of(sourceSchema); - checkArgument( - schemaReflection.matchesAny(bytesFieldMatcher, rowFieldMatcher), + @Override + public SchemaTransform from(PubsubWriteSchemaTransformConfiguration configuration) { + if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) { + throw new IllegalArgumentException( String.format( - "schema missing field: %s for types %s or %s", - fieldName, PAYLOAD_BYTES_TYPE_NAME, PAYLOAD_ROW_TYPE_NAME)); - - String[] fieldsToExclude = - Stream.of( - source.getAttributesFieldName(), - source.getTimestampFieldName(), - source.getPayloadFieldName()) - .filter(Objects::nonNull) - .toArray(String[]::new); - - Schema userFieldsSchema = removeFields(sourceSchema, fieldsToExclude); - - if (userFieldsSchema.getFieldCount() > 0) { - throw new IllegalArgumentException( - String.format("user fields incompatible with %s field", source.getPayloadFieldName())); - } - } - - void validateTargetSchemaAgainstPubsubSchema(Schema targetSchema, PipelineOptions options) { - checkArgument(options != null); - - try (PubsubClient pubsubClient = getPubsubClient(options.as(PubsubOptions.class))) { - PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromPath(configuration.getTopic()); - PubsubClient.SchemaPath schemaPath = pubsubClient.getSchemaPath(topicPath); - if (schemaPath == null || schemaPath.equals(SchemaPath.DELETED_SCHEMA)) { - return; - } - Schema expectedSchema = pubsubClient.getSchema(schemaPath); - checkState( - targetSchema.equals(expectedSchema), - String.format( - "input schema mismatch with expected schema at path: %s\ninput schema: %s\nPub/Sub schema: %s", - schemaPath, targetSchema, expectedSchema)); - } catch (IOException e) { - throw new IllegalStateException(e.getMessage()); - } - } - - Schema buildTargetSchema(Schema sourceSchema) { - validateSourceSchemaAgainstConfiguration(sourceSchema); - FieldType payloadFieldType = null; - - List<String> fieldsToRemove = new ArrayList<>(); - - if (configuration.getSource() != null) { - SourceConfiguration source = configuration.getSource(); - - if (source.getAttributesFieldName() != null) { - fieldsToRemove.add(source.getAttributesFieldName()); - } - - if (source.getTimestampFieldName() != null) { - fieldsToRemove.add(source.getTimestampFieldName()); - } - - if (source.getPayloadFieldName() != null) { - String fieldName = source.getPayloadFieldName(); - Field field = sourceSchema.getField(fieldName); - payloadFieldType = field.getType(); - fieldsToRemove.add(fieldName); - } - } - - Schema targetSchema = - PubsubRowToMessage.builder() - .build() - .inputSchemaFactory(payloadFieldType) - .buildSchema(sourceSchema.getFields().toArray(new Field[0])); - - return removeFields(targetSchema, fieldsToRemove.toArray(new String[0])); + "Format %s not supported. Only supported formats are %s", + configuration.getFormat(), VALID_FORMATS_STR)); } + return new PubsubWriteSchemaTransform(configuration.getTopic(), configuration.getFormat()); + } - private PubsubClient.PubsubClientFactory getPubsubClientFactory() { - if (pubsubClientFactory != null) { - return pubsubClientFactory; - } - return PubsubGrpcClient.FACTORY; - } + private static class PubsubWriteSchemaTransform extends SchemaTransform implements Serializable { + final String topic; + final String format; - private PubsubClient getPubsubClient(PubsubOptions options) throws IOException { - return getPubsubClientFactory() - .newClient( - configuration.getTarget().getTimestampAttributeKey(), - configuration.getIdAttribute(), - options); + PubsubWriteSchemaTransform(String topic, String format) { + this.topic = topic; + this.format = format; } - ParDo.SingleOutput<Row, Row> convertForRowToMessage(Schema targetSchema) { - return convertForRowToMessage(targetSchema, null); - } - - ParDo.SingleOutput<Row, Row> convertForRowToMessage( - Schema targetSchema, @Nullable Clock clock) { - String attributesName = null; - String timestampName = null; - String payloadName = null; - SourceConfiguration source = configuration.getSource(); - if (source != null) { - attributesName = source.getAttributesFieldName(); - timestampName = source.getTimestampFieldName(); - payloadName = source.getPayloadFieldName(); - } - return ParDo.of( - new ConvertForRowToMessage( - targetSchema, clock, attributesName, timestampName, payloadName)); + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + final Schema errorSchema = + Schema.builder() + .addStringField("error") + .addNullableRowField("row", input.get("input").getSchema()) + .build(); + SerializableFunction<Row, byte[]> fn = + format.equals("AVRO") + ? AvroUtils.getRowToAvroBytesFunction(input.get("input").getSchema()) + : JsonUtils.getRowToJsonBytesFunction(input.get("input").getSchema()); + + PCollectionTuple outputTuple = + input + .get("input") + .apply( + ParDo.of(new ErrorFn(fn, errorSchema)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + outputTuple.get(OUTPUT_TAG).apply(PubsubIO.writeMessages().to(topic)); + + return PCollectionRowTuple.of("errors", outputTuple.get(ERROR_TAG).setRowSchema(errorSchema)); } } - private static class ConvertForRowToMessage extends DoFn<Row, Row> { - private final Schema targetSchema; - @Nullable private final Clock clock; - @Nullable private final String attributesFieldName; - @Nullable private final String timestampFieldName; - @Nullable private final String payloadFieldName; - - ConvertForRowToMessage( - Schema targetSchema, - @Nullable Clock clock, - @Nullable String attributesFieldName, - @Nullable String timestampFieldName, - @Nullable String payloadFieldName) { - this.targetSchema = targetSchema; - this.clock = clock; - this.attributesFieldName = attributesFieldName; - this.timestampFieldName = timestampFieldName; - this.payloadFieldName = payloadFieldName; - } - - @ProcessElement - public void process(@Element Row row, OutputReceiver<Row> receiver) { - Instant now = Instant.now(); - if (clock != null) { - now = Instant.ofEpochMilli(clock.currentTimeMillis()); - } - Map<String, Object> values = new HashMap<>(); - - // Default attributes value - checkState(targetSchema.hasField(DEFAULT_ATTRIBUTES_KEY_NAME)); - values.put(DEFAULT_ATTRIBUTES_KEY_NAME, ImmutableMap.of()); - - // Default timestamp value - checkState(targetSchema.hasField(DEFAULT_EVENT_TIMESTAMP_KEY_NAME)); - values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, now); + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:pubsub_write:v1"; + } - for (String fieldName : row.getSchema().getFieldNames()) { - if (targetSchema.hasField(fieldName)) { - values.put(fieldName, row.getValue(fieldName)); - } + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + inputCollectionNames() { + return Collections.singletonList("input"); + } - if (attributesFieldName != null) { - values.put(DEFAULT_ATTRIBUTES_KEY_NAME, row.getValue(attributesFieldName)); - } - if (timestampFieldName != null) { - values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, row.getValue(timestampFieldName)); - } - if (payloadFieldName != null) { - values.put(DEFAULT_PAYLOAD_KEY_NAME, row.getValue(payloadFieldName)); - } - } - receiver.output(Row.withSchema(targetSchema).withFieldValues(values).build()); - } + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + outputCollectionNames() { + return Collections.singletonList("errors"); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java index 848549f1929..0de998f1112 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java @@ -18,300 +18,237 @@ package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import com.google.api.client.util.Clock; -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider; -import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptor; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link PubsubReadSchemaTransformProvider}. */ +/** Tests for {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubReadSchemaTransformProvider}. */ @RunWith(JUnit4.class) public class PubsubReadSchemaTransformProviderTest { - private static final Schema SCHEMA = + private static final Schema BEAM_SCHEMA = Schema.of( Schema.Field.of("name", Schema.FieldType.STRING), Schema.Field.of("number", Schema.FieldType.INT64)); - + private static final Schema BEAM_SCHEMA_WITH_ERROR = + Schema.of(Schema.Field.of("error", Schema.FieldType.STRING)); + private static final String SCHEMA = AvroUtils.toAvroSchema(BEAM_SCHEMA).toString(); private static final String SUBSCRIPTION = "projects/project/subscriptions/subscription"; private static final String TOPIC = "projects/project/topics/topic"; - private static final List<TestCase> cases = - Arrays.asList( - testCase( - "no configured topic or subscription", - PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).build()) - .expectInvalidConfiguration(), - testCase( - "both topic and subscription configured", - PubsubReadSchemaTransformConfiguration.builder() - .setSubscription(SUBSCRIPTION) - .setSubscription(TOPIC) - .setDataSchema(SCHEMA) - .build()) - .expectInvalidConfiguration(), - testCase( - "invalid format configured", - PubsubReadSchemaTransformConfiguration.builder() - .setSubscription(SUBSCRIPTION) - .setDataSchema(SCHEMA) - .setFormat("invalidformat") - .build()) - .expectInvalidConfiguration(), - testCase( - "configuration with subscription", - PubsubReadSchemaTransformConfiguration.builder() - .setSubscription(SUBSCRIPTION) - .setDataSchema(SCHEMA) - .build()) - .withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)), - testCase( - "configuration with topic", - PubsubReadSchemaTransformConfiguration.builder() - .setTopic(TOPIC) - .setDataSchema(SCHEMA) - .build()) - .withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)), - testCase( - "configuration with subscription, timestamp and id attributes", - PubsubReadSchemaTransformConfiguration.builder() - .setSubscription(SUBSCRIPTION) - .setTimestampAttribute("timestampAttribute") - .setIdAttribute("idAttribute") - .setDataSchema(SCHEMA) - .build()) - .withExpectedPubsubRead( - PubsubIO.readMessages() - .fromSubscription(SUBSCRIPTION) - .withTimestampAttribute("timestampAttribute") - .withIdAttribute("idAttribute")), - testCase( - "configuration with subscription and dead letter queue", - PubsubReadSchemaTransformConfiguration.builder() - .setSubscription(SUBSCRIPTION) - .setDataSchema(SCHEMA) - .setDeadLetterQueue(TOPIC) - .build()) - .withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)) - .withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)), - testCase( - "configuration with subscription, timestamp attribute, and dead letter queue", - PubsubReadSchemaTransformConfiguration.builder() - .setSubscription(SUBSCRIPTION) - .setTimestampAttribute("timestampAttribute") - .setDataSchema(SCHEMA) - .setDeadLetterQueue(TOPIC) - .build()) - .withExpectedPubsubRead( - PubsubIO.readMessages() - .fromSubscription(SUBSCRIPTION) - .withTimestampAttribute("timestampAttribute")) - .withExpectedDeadLetterQueue( - PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute"))); - - private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); - private static final TypeDescriptor<PubsubReadSchemaTransformConfiguration> TYPE_DESCRIPTOR = - TypeDescriptor.of(PubsubReadSchemaTransformConfiguration.class); - private static final SerializableFunction<PubsubReadSchemaTransformConfiguration, Row> - ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR); - private static final List<Row> ROWS = Arrays.asList( - Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 100L).build(), - Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 200L).build(), - Row.withSchema(SCHEMA) + Row.withSchema(BEAM_SCHEMA) + .withFieldValue("name", "a") + .withFieldValue("number", 100L) + .build(), + Row.withSchema(BEAM_SCHEMA) + .withFieldValue("name", "b") + .withFieldValue("number", 200L) + .build(), + Row.withSchema(BEAM_SCHEMA) .withFieldValue("name", "c") .withFieldValue("number", 300L) .build()); - private static final Clock CLOCK = (Clock & Serializable) () -> 1656788475425L; + private static final List<Row> ROWSWITHERROR = + Arrays.asList( + Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error", "a").build(), + Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error", "b").build(), + Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error", "c").build()); + + private static final Clock CLOCK = (Clock & Serializable) () -> 1678988970000L; private static final AvroPayloadSerializerProvider AVRO_PAYLOAD_SERIALIZER_PROVIDER = new AvroPayloadSerializerProvider(); private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER = - AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(SCHEMA, new HashMap<>()); + AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAM_SCHEMA, new HashMap<>()); + private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER_WITH_ERROR = + AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAM_SCHEMA_WITH_ERROR, new HashMap<>()); - @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public transient TestPipeline p = TestPipeline.create(); @Test - public void testBuildDeadLetterQueueWrite() { - for (TestCase testCase : cases) { - PubsubIO.Write<PubsubMessage> dlq = - testCase.pubsubReadSchemaTransform().buildDeadLetterQueueWrite(); - - if (testCase.expectedDeadLetterQueue == null) { - assertNull(testCase.name, dlq); - return; - } - - Map<DisplayData.Identifier, DisplayData.Item> actual = DisplayData.from(dlq).asMap(); - Map<DisplayData.Identifier, DisplayData.Item> expected = testCase.expectedDeadLetterQueue; - - assertEquals(testCase.name, expected, actual); - } + public void testInvalidConfigNoTopicOrSubscription() { + assertThrows( + IllegalArgumentException.class, + () -> + new PubsubReadSchemaTransformProvider() + .from( + PubsubReadSchemaTransformConfiguration.builder() + .setSchema(SCHEMA) + .setFormat("AVRO") + .build())); } @Test - public void testReadAvro() throws IOException { + public void testInvalidConfigBothTopicAndSubscription() { PCollectionRowTuple begin = PCollectionRowTuple.empty(p); - PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform = - schemaTransformWithClock("avro"); - PubsubTestClient.PubsubTestClientFactory clientFactory = - clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis())); - transform.setClientFactory(clientFactory); - PCollectionRowTuple reads = begin.apply(transform); - - PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS); - + assertThrows( + IllegalArgumentException.class, + () -> + begin.apply( + new PubsubReadSchemaTransformProvider() + .from( + PubsubReadSchemaTransformConfiguration.builder() + .setSchema(SCHEMA) + .setFormat("AVRO") + .setTopic(TOPIC) + .setSubscription(SUBSCRIPTION) + .build()))); p.run().waitUntilFinish(); - clientFactory.close(); } @Test - public void testReadJson() throws IOException { + public void testInvalidConfigInvalidFormat() { PCollectionRowTuple begin = PCollectionRowTuple.empty(p); - PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform = - schemaTransformWithClock("json"); - PubsubTestClient.PubsubTestClientFactory clientFactory = - clientFactory(incomingJsonMessagesOf(CLOCK.currentTimeMillis())); - transform.setClientFactory(clientFactory); - PCollectionRowTuple reads = begin.apply(transform); - - PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS); - + assertThrows( + IllegalArgumentException.class, + () -> + begin.apply( + new PubsubReadSchemaTransformProvider() + .from( + PubsubReadSchemaTransformConfiguration.builder() + .setSchema(SCHEMA) + .setFormat("BadFormat") + .setSubscription(SUBSCRIPTION) + .build()))); p.run().waitUntilFinish(); - - clientFactory.close(); - } - - @Test - public void testBuildPubSubRead() { - for (TestCase testCase : cases) { - if (testCase.invalidConfigurationExpected) { - continue; - } - Map<DisplayData.Identifier, DisplayData.Item> actual = - DisplayData.from(testCase.pubsubReadSchemaTransform().buildPubsubRead()).asMap(); - - Map<DisplayData.Identifier, DisplayData.Item> expected = testCase.expectedPubsubRead; - - assertEquals(testCase.name, expected, actual); - } - } - - @Test - public void testInvalidConfiguration() { - for (TestCase testCase : cases) { - PCollectionRowTuple begin = PCollectionRowTuple.empty(p); - if (testCase.invalidConfigurationExpected) { - assertThrows( - testCase.name, - RuntimeException.class, - () -> begin.apply(testCase.pubsubReadSchemaTransform())); - } - } } @Test - public void testInvalidInput() { - PCollectionRowTuple begin = PCollectionRowTuple.of("BadInput", p.apply(Create.of(ROWS))); + public void testNoSchema() { + PCollectionRowTuple begin = PCollectionRowTuple.empty(p); assertThrows( - IllegalArgumentException.class, + IllegalStateException.class, () -> begin.apply( new PubsubReadSchemaTransformProvider() .from( PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(SCHEMA) + .setSubscription(SUBSCRIPTION) + .setFormat("AVRO") .build()))); + p.run().waitUntilFinish(); } - private PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform schemaTransformWithClock( - String format) { - PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform = - (PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform) - new PubsubReadSchemaTransformProvider() - .from( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(SCHEMA) - .setSubscription(SUBSCRIPTION) - .setFormat(format) - .build()); - - transform.setClock(CLOCK); + @Test + public void testReadAvro() throws IOException { + PCollectionRowTuple begin = PCollectionRowTuple.empty(p); - return transform; + try (PubsubTestClientFactory clientFactory = clientFactory(beamRowToMessage())) { + PubsubReadSchemaTransformConfiguration config = + PubsubReadSchemaTransformConfiguration.builder() + .setFormat("AVRO") + .setSchema(SCHEMA) + .setSubscription(SUBSCRIPTION) + .setClientFactory(clientFactory) + .setClock(CLOCK) + .build(); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + PCollectionRowTuple reads = begin.apply(transform); + + PAssert.that(reads.get("output")).containsInAnyOrder(ROWS); + + p.run().waitUntilFinish(); + } catch (Exception e) { + throw e; + } } - private static PubsubTestClient.PubsubTestClientFactory clientFactory( - List<PubsubClient.IncomingMessage> messages) { - return PubsubTestClient.createFactoryForPull( - CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60, messages); - } + @Test + public void testReadAvroWithError() throws IOException { + PCollectionRowTuple begin = PCollectionRowTuple.empty(p); - private static List<PubsubClient.IncomingMessage> incomingAvroMessagesOf(long millisSinceEpoch) { - return ROWS.stream() - .map(row -> incomingAvroMessageOf(row, millisSinceEpoch)) - .collect(Collectors.toList()); - } + try (PubsubTestClientFactory clientFactory = clientFactory(beamRowToMessageWithError())) { + PubsubReadSchemaTransformConfiguration config = + PubsubReadSchemaTransformConfiguration.builder() + .setFormat("AVRO") + .setSchema(SCHEMA) + .setSubscription(SUBSCRIPTION) + .setClientFactory(clientFactory) + .setClock(CLOCK) + .build(); + SchemaTransform transform = new PubsubReadSchemaTransformProvider().from(config); + PCollectionRowTuple reads = begin.apply(transform); + + PAssert.that(reads.get("output")).empty(); + + PipelineResult result = p.run(); + result.waitUntilFinish(); + + MetricResults metrics = result.metrics(); + MetricQueryResults metricResults = + metrics.queryMetrics( + MetricsFilter.builder() + .addNameFilter( + MetricNameFilter.named( + PubsubReadSchemaTransformProvider.class, "PubSub-read-error-counter")) + .build()); + + Iterable<MetricResult<Long>> counters = metricResults.getCounters(); + if (!counters.iterator().hasNext()) { + throw new RuntimeException("no counters available "); + } - private static PubsubClient.IncomingMessage incomingAvroMessageOf( - Row row, long millisSinceEpoch) { - byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row); - return incomingMessageOf(bytes, millisSinceEpoch); + Long expectedCount = 3L; + for (MetricResult<Long> count : counters) { + assertEquals(expectedCount, count.getAttempted()); + } + } catch (Exception e) { + throw e; + } } - private static List<PubsubClient.IncomingMessage> incomingJsonMessagesOf(long millisSinceEpoch) { - return PubsubReadSchemaTransformProviderTest.ROWS.stream() - .map(row -> incomingJsonMessageOf(row, millisSinceEpoch)) + private static List<PubsubClient.IncomingMessage> beamRowToMessage() { + long timestamp = CLOCK.currentTimeMillis(); + return ROWS.stream() + .map( + row -> { + byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row); + return incomingMessageOf(bytes, timestamp); + }) .collect(Collectors.toList()); } - private static PubsubClient.IncomingMessage incomingJsonMessageOf( - Row row, long millisSinceEpoch) { - String name = Objects.requireNonNull(row.getString("name")); - long number = Objects.requireNonNull(row.getInt64("number")); - return incomingJsonMessageOf(name, number, millisSinceEpoch); - } - - private static PubsubClient.IncomingMessage incomingJsonMessageOf( - String name, long number, long millisSinceEpoch) { - Gson gson = new Gson(); - JsonObject obj = new JsonObject(); - obj.add("name", new JsonPrimitive(name)); - obj.add("number", new JsonPrimitive(number)); - byte[] bytes = gson.toJson(obj).getBytes(StandardCharsets.UTF_8); - return incomingMessageOf(bytes, millisSinceEpoch); + private static List<PubsubClient.IncomingMessage> beamRowToMessageWithError() { + long timestamp = CLOCK.currentTimeMillis(); + return ROWSWITHERROR.stream() + .map( + row -> { + byte[] bytes = AVRO_PAYLOAD_SERIALIZER_WITH_ERROR.serialize(row); + return incomingMessageOf(bytes, timestamp); + }) + .collect(Collectors.toList()); } private static PubsubClient.IncomingMessage incomingMessageOf( @@ -329,51 +266,9 @@ public class PubsubReadSchemaTransformProviderTest { UUID.randomUUID().toString()); } - static TestCase testCase(String name, PubsubReadSchemaTransformConfiguration configuration) { - return new TestCase(name, configuration); - } - - private static class TestCase { - - private final String name; - private final PubsubReadSchemaTransformConfiguration configuration; - - private Map<DisplayData.Identifier, DisplayData.Item> expectedDeadLetterQueue; - - private Map<DisplayData.Identifier, DisplayData.Item> expectedPubsubRead = - DisplayData.from(PubsubIO.readMessages()).asMap(); - - private boolean invalidConfigurationExpected = false; - - TestCase(String name, PubsubReadSchemaTransformConfiguration configuration) { - this.name = name; - this.configuration = configuration; - } - - PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform pubsubReadSchemaTransform() { - PubsubReadSchemaTransformProvider provider = new PubsubReadSchemaTransformProvider(); - Row configurationRow = toBeamRow(); - return (PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform) - provider.from(configurationRow); - } - - private Row toBeamRow() { - return ROW_SERIALIZABLE_FUNCTION.apply(configuration); - } - - TestCase withExpectedDeadLetterQueue(PubsubIO.Write<PubsubMessage> value) { - this.expectedDeadLetterQueue = DisplayData.from(value).asMap(); - return this; - } - - TestCase withExpectedPubsubRead(PubsubIO.Read<PubsubMessage> value) { - this.expectedPubsubRead = DisplayData.from(value).asMap(); - return this; - } - - TestCase expectInvalidConfiguration() { - this.invalidConfigurationExpected = true; - return this; - } + private static PubsubTestClient.PubsubTestClientFactory clientFactory( + List<PubsubClient.IncomingMessage> messages) { + return PubsubTestClient.createFactoryForPull( + CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60, messages); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java deleted file mode 100644 index 709fc35e02a..00000000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider; -import org.apache.beam.sdk.values.Row; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Test for {@link PubsubSchemaTransformMessageToRowFactory}. */ -@RunWith(JUnit4.class) -public class PubsubSchemaTransformMessageToRowFactoryTest { - - List<TestCase> cases = - Arrays.asList( - testCase(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA)) - .expectPayloadSerializerProvider(JSON_PAYLOAD_SERIALIZER_PROVIDER) - .withSerializerInput(), - testCase(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA)) - .expectPubsubToRow( - PubsubMessageToRow.builder() - .messageSchema(SCHEMA) - .useFlatSchema(true) - .useDlq(false)), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(SCHEMA) - .setDeadLetterQueue("projects/project/topics/topic")) - .expectPubsubToRow( - PubsubMessageToRow.builder() - .messageSchema(SCHEMA) - .useFlatSchema(true) - .useDlq(true)), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(SCHEMA) - .setFormat("avro")) - .expectPayloadSerializerProvider(AVRO_PAYLOAD_SERIALIZER_PROVIDER) - .withSerializerInput(), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY))) - .schemaShouldHaveValidAttributesField() - .fieldShouldBePresent( - ATTRIBUTES_FIELD_ARRAY.getName(), ATTRIBUTES_FIELD_ARRAY.getType()), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(Schema.of(ATTRIBUTES_FIELD_MAP))) - .schemaShouldHaveValidAttributesField() - .fieldShouldBePresent(ATTRIBUTES_FIELD_MAP.getName(), ATTRIBUTES_FIELD_MAP.getType()), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(Schema.of(ATTRIBUTES_FIELD_SHOULD_NOT_MATCH))), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(Schema.of(PAYLOAD_FIELD_SHOULD_NOT_MATCH))), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(Schema.of(PAYLOAD_FIELD_BYTES))) - .schemaShouldHaveValidPayloadField() - .fieldShouldBePresent(PAYLOAD_FIELD_BYTES.getName(), PAYLOAD_FIELD_BYTES.getType()), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(Schema.of(PAYLOAD_FIELD_ROW))) - .schemaShouldHaveValidPayloadField() - .fieldShouldBePresent(PAYLOAD_FIELD_ROW.getName(), PAYLOAD_FIELD_ROW.getType()), - testCase( - PubsubReadSchemaTransformConfiguration.builder() - .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, PAYLOAD_FIELD_BYTES))) - .schemaShouldHaveValidAttributesField() - .schemaShouldHaveValidPayloadField() - .shouldUseNestedSchema() - .shouldNotNeedSerializer() - .expectPubsubToRow( - PubsubMessageToRow.builder() - .messageSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY, PAYLOAD_FIELD_BYTES)) - .useFlatSchema(false) - .useDlq(false))); - - static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE = - Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), Schema.FieldType.STRING); - static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA = - Schema.builder().addStringField("key").addStringField("value").build(); - - static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE = - Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA)); - - private static final Schema.Field ATTRIBUTES_FIELD_SHOULD_NOT_MATCH = - Schema.Field.of(ATTRIBUTES_FIELD, Schema.FieldType.STRING); - - private static final Schema.Field ATTRIBUTES_FIELD_MAP = - Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE); - - private static final Schema.Field ATTRIBUTES_FIELD_ARRAY = - Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE); - - private static final Schema.Field PAYLOAD_FIELD_SHOULD_NOT_MATCH = - Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.STRING); - - private static final Schema.Field PAYLOAD_FIELD_BYTES = - Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.BYTES); - - private static final Schema.Field PAYLOAD_FIELD_ROW = - Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.row(Schema.of())); - - private static final PayloadSerializerProvider JSON_PAYLOAD_SERIALIZER_PROVIDER = - new JsonPayloadSerializerProvider(); - - private static final AvroPayloadSerializerProvider AVRO_PAYLOAD_SERIALIZER_PROVIDER = - new AvroPayloadSerializerProvider(); - - private static final Schema SCHEMA = - Schema.of( - Schema.Field.of("name", Schema.FieldType.STRING), - Schema.Field.of("number", Schema.FieldType.INT64)); - - private static final Row ROW = - Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(); - - @Test - public void testBuildMessageToRow() { - for (TestCase testCase : cases) { - if (testCase.expectPubsubToRow == null) { - continue; - } - - PubsubSchemaTransformMessageToRowFactory factory = testCase.factory(); - - PubsubMessageToRow expected = testCase.expectPubsubToRow; - PubsubMessageToRow actual = factory.buildMessageToRow(); - - assertEquals("messageSchema", expected.messageSchema(), actual.messageSchema()); - assertEquals("useFlatSchema", expected.useFlatSchema(), actual.useFlatSchema()); - assertEquals("useDlq", expected.useDlq(), actual.useDlq()); - } - } - - @Test - public void serializer() { - for (TestCase testCase : cases) { - PubsubSchemaTransformMessageToRowFactory factory = testCase.factory(); - - if (testCase.expectPayloadSerializerProvider == null) { - continue; - } - - Row serializerInput = testCase.serializerInput; - - byte[] expectedBytes = - testCase - .expectSerializerProvider() - .apply(testCase.dataSchema()) - .serialize(serializerInput); - - byte[] actualBytes = - factory.serializer().apply(testCase.dataSchema()).serialize(serializerInput); - - String expected = new String(expectedBytes, StandardCharsets.UTF_8); - String actual = new String(actualBytes, StandardCharsets.UTF_8); - - assertEquals(expected, actual); - } - } - - @Test - public void needsSerializer() { - for (TestCase testCase : cases) { - PubsubSchemaTransformMessageToRowFactory factory = testCase.factory(); - - boolean expected = testCase.shouldNeedSerializer; - boolean actual = factory.needsSerializer(); - - assertEquals(expected, actual); - } - } - - @Test - public void shouldUseNestedSchema() { - for (TestCase testCase : cases) { - PubsubSchemaTransformMessageToRowFactory factory = testCase.factory(); - - boolean expected = testCase.shouldUseNestedSchema; - boolean actual = factory.shouldUseNestedSchema(); - - assertEquals(expected, actual); - } - } - - @Test - public void schemaHasValidPayloadField() { - for (TestCase testCase : cases) { - PubsubSchemaTransformMessageToRowFactory factory = testCase.factory(); - - boolean expected = testCase.shouldSchemaHaveValidPayloadField; - boolean actual = factory.schemaHasValidPayloadField(); - - assertEquals(expected, actual); - } - } - - @Test - public void schemaHasValidAttributesField() { - for (TestCase testCase : cases) { - PubsubSchemaTransformMessageToRowFactory factory = testCase.factory(); - - boolean expected = testCase.shouldSchemaHaveValidAttributesField; - boolean actual = factory.schemaHasValidAttributesField(); - - assertEquals(expected, actual); - } - } - - @Test - public void fieldPresent() { - for (TestCase testCase : cases) { - PubsubSchemaTransformMessageToRowFactory factory = testCase.factory(); - for (Entry<String, FieldType> entry : testCase.shouldFieldPresent.entrySet()) { - - boolean actual = factory.fieldPresent(entry.getKey(), entry.getValue()); - - assertTrue(actual); - } - } - } - - static TestCase testCase(PubsubReadSchemaTransformConfiguration.Builder configurationBuilder) { - return new TestCase(configurationBuilder); - } - - private static class TestCase { - private final PubsubReadSchemaTransformConfiguration configuration; - - private PubsubMessageToRow expectPubsubToRow; - - private PayloadSerializerProvider expectPayloadSerializerProvider; - - private boolean shouldUseNestedSchema = false; - private boolean shouldNeedSerializer = true; - private boolean shouldSchemaHaveValidPayloadField = false; - private boolean shouldSchemaHaveValidAttributesField = false; - private final Map<String, FieldType> shouldFieldPresent = new HashMap<>(); - - private Row serializerInput; - - TestCase(PubsubReadSchemaTransformConfiguration.Builder configurationBuilder) { - this.configuration = configurationBuilder.build(); - } - - PubsubSchemaTransformMessageToRowFactory factory() { - return PubsubSchemaTransformMessageToRowFactory.from(configuration); - } - - Schema dataSchema() { - return configuration.getDataSchema(); - } - - TestCase expectPubsubToRow(PubsubMessageToRow.Builder pubsubMessageToRowBuilder) { - this.expectPubsubToRow = pubsubMessageToRowBuilder.build(); - return this; - } - - TestCase withSerializerInput() { - this.serializerInput = PubsubSchemaTransformMessageToRowFactoryTest.ROW; - return this; - } - - TestCase expectPayloadSerializerProvider(PayloadSerializerProvider value) { - this.expectPayloadSerializerProvider = value; - return this; - } - - PubsubMessageToRow.SerializerProvider expectSerializerProvider() { - Map<String, Object> params = new HashMap<>(); - PayloadSerializer payloadSerializer = - expectPayloadSerializerProvider.getSerializer(configuration.getDataSchema(), params); - - return (input -> payloadSerializer); - } - - TestCase shouldUseNestedSchema() { - this.shouldUseNestedSchema = true; - return this; - } - - TestCase shouldNotNeedSerializer() { - this.shouldNeedSerializer = false; - return this; - } - - TestCase schemaShouldHaveValidPayloadField() { - this.shouldSchemaHaveValidPayloadField = true; - return this; - } - - TestCase schemaShouldHaveValidAttributesField() { - this.shouldSchemaHaveValidAttributesField = true; - return this; - } - - TestCase fieldShouldBePresent(String name, Schema.FieldType expectedType) { - this.shouldFieldPresent.put(name, expectedType); - return this; - } - } -} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java deleted file mode 100644 index cb0e6ec03cc..00000000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.DEFAULT_TIMESTAMP_ATTRIBUTE; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.Field; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollectionRowTuple; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.commons.lang3.tuple.Pair; -import org.joda.time.Instant; -import org.joda.time.format.ISODateTimeFormat; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; - -/** Integration tests for {@link PubsubWriteSchemaTransformProvider}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public class PubsubWriteSchemaTransformProviderIT { - - @Rule public transient TestPipeline pipeline = TestPipeline.create(); - - private static final TestPubsubOptions TEST_PUBSUB_OPTIONS = - TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class); - - static { - TEST_PUBSUB_OPTIONS.setBlockOnRun(false); - } - - private static final String HAS_NO_SCHEMA = "has-no-schema"; - - private static PubsubClient pubsubClient; - - private static PubsubClient.TopicPath hasNoSchemaTopic; - - private static PubsubClient.SubscriptionPath hasNoSchemaSubscription; - - private static final Instant TIMESTAMP = Instant.now(); - - private static final String RESOURCE_NAME_POSTFIX = "-" + TIMESTAMP.getMillis(); - - private static final int ACK_DEADLINE_SECONDS = 60; - - private static final int AWAIT_TERMINATED_SECONDS = 30; - - private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); - - private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration> - CONFIGURATION_TYPE_DESCRIPTOR = - TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class); - - private static final SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row> - TO_ROW_FN = AUTO_VALUE_SCHEMA.toRowFunction(CONFIGURATION_TYPE_DESCRIPTOR); - - private final Field timestampField = Field.of("timestamp", FieldType.DATETIME); - - private final Field payloadBytesField = Field.of("payload", FieldType.BYTES); - - @BeforeClass - public static void setUp() throws IOException { - String project = TEST_PUBSUB_OPTIONS.as(PubsubOptions.class).getProject(); - pubsubClient = PubsubGrpcClient.FACTORY.newClient(null, null, TEST_PUBSUB_OPTIONS); - hasNoSchemaTopic = - PubsubClient.topicPathFromName(project, HAS_NO_SCHEMA + RESOURCE_NAME_POSTFIX); - hasNoSchemaSubscription = - PubsubClient.subscriptionPathFromName(project, HAS_NO_SCHEMA + RESOURCE_NAME_POSTFIX); - - pubsubClient.createTopic(hasNoSchemaTopic); - pubsubClient.createSubscription( - hasNoSchemaTopic, hasNoSchemaSubscription, ACK_DEADLINE_SECONDS); - } - - @AfterClass - public static void tearDown() throws IOException { - pubsubClient.deleteSubscription(hasNoSchemaSubscription); - pubsubClient.deleteTopic(hasNoSchemaTopic); - - pubsubClient.close(); - } - - @Test - public void testWritePayloadBytes() throws IOException { - Instant timestamp = Instant.ofEpochMilli(100000L); - Schema schema = Schema.of(payloadBytesField, timestampField); - List<Row> input = - Collections.singletonList( - Row.withSchema(schema).attachValues("aaa".getBytes(StandardCharsets.UTF_8), timestamp)); - Row configuration = - TO_ROW_FN.apply( - PubsubWriteSchemaTransformConfiguration.builder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setPayloadFieldName(payloadBytesField.getName()) - .setTimestampFieldName(timestampField.getName()) - .build()) - .setTopic(hasNoSchemaTopic.getPath()) - .setTarget( - PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build()) - .build()); - - PCollectionRowTuple.of(INPUT_TAG, pipeline.apply(Create.of(input).withRowSchema(schema))) - .apply(new PubsubWriteSchemaTransformProvider().from(configuration)); - - PipelineResult job = pipeline.run(TEST_PUBSUB_OPTIONS); - Instant now = Instant.now(); - Instant stop = Instant.ofEpochMilli(now.getMillis() + AWAIT_TERMINATED_SECONDS * 1000); - List<Pair<String, Map<String, String>>> actualList = new ArrayList<>(); - while (now.isBefore(stop)) { - List<IncomingMessage> received = pubsubClient.pull(0, hasNoSchemaSubscription, 1, true); - for (IncomingMessage incoming : received) { - actualList.add( - Pair.of( - incoming.message().getData().toStringUtf8(), - ImmutableMap.of( - DEFAULT_TIMESTAMP_ATTRIBUTE, - incoming - .message() - .getAttributesMap() - .getOrDefault(DEFAULT_TIMESTAMP_ATTRIBUTE, "")))); - } - if (actualList.size() == input.size()) { - break; - } - now = Instant.now(); - } - job.cancel(); - assertFalse( - String.format( - "messages pulled from %s should not be empty", hasNoSchemaSubscription.getPath()), - actualList.isEmpty()); - Pair<String, Map<String, String>> actual = actualList.get(0); - Row expected = input.get(0); - String payload = - new String( - Objects.requireNonNull(expected.getBytes(payloadBytesField.getName())), - StandardCharsets.UTF_8); - assertEquals(payload, actual.getLeft()); - assertEquals( - ISODateTimeFormat.dateTime().print(timestamp), - actual.getRight().get(DEFAULT_TIMESTAMP_ATTRIBUTE)); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java deleted file mode 100644 index 98939f7ddc6..00000000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java +++ /dev/null @@ -1,786 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.NON_USER_WITH_BYTES_PAYLOAD; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.rowWithAllDataTypes; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; - -import com.google.api.client.util.Clock; -import java.io.IOException; -import java.io.Serializable; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import org.apache.avro.SchemaParseException; -import org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.Field; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider; -import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.RowJson.UnsupportedRowJsonException; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionRowTuple; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link PubsubWriteSchemaTransformProvider}. */ -@RunWith(JUnit4.class) -public class PubsubWriteSchemaTransformProviderTest { - - private static final String ID_ATTRIBUTE = "id_attribute"; - private static final String TOPIC = "projects/project/topics/topic"; - private static final MockClock CLOCK = new MockClock(Instant.now()); - private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); - private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration> TYPE_DESCRIPTOR = - TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class); - private static final SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row> TO_ROW = - AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR); - - private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create(); - - static { - OPTIONS.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF); - } - - @Rule public transient TestPipeline pipeline = TestPipeline.create(); - - @Test - public void testBuildPubsubWrite() { - assertEquals( - "default configuration should yield a topic Pub/Sub write", - pubsubWrite(), - transform(configurationBuilder()).buildPubsubWrite()); - - assertEquals( - "idAttribute in configuration should yield a idAttribute set Pub/Sub write", - pubsubWrite().withIdAttribute(ID_ATTRIBUTE), - transform(configurationBuilder().setIdAttribute(ID_ATTRIBUTE)).buildPubsubWrite()); - } - - @Test - public void testBuildPubsubRowToMessage() { - assertEquals( - "override timestamp attribute on configuration should yield a PubsubRowToMessage with target timestamp", - rowToMessageBuilder().setTargetTimestampAttributeName("custom_timestamp_attribute").build(), - transform( - configurationBuilder() - .setTarget( - PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder() - .setTimestampAttributeKey("custom_timestamp_attribute") - .build())) - .buildPubsubRowToMessage(NON_USER_WITH_BYTES_PAYLOAD)); - - assertNull( - "failing to set format should yield a null payload serializer", - transform(configurationBuilder()) - .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA) - .getPayloadSerializer()); - - assertThrows( - "setting 'json' format for a unsupported field containing Schema should throw an Exception", - UnsupportedRowJsonException.class, - () -> - transform(configurationBuilder().setFormat("json")) - .buildPubsubRowToMessage( - Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE)))); - - assertThrows( - "setting 'avro' format for a unsupported field containing Schema should throw an Exception", - SchemaParseException.class, - () -> - transform(configurationBuilder().setFormat("avro")) - .buildPubsubRowToMessage( - Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE)))); - - assertNotNull( - "setting 'json' format for valid schema should yield PayloadSerializer", - transform(configurationBuilder().setFormat("json")) - .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA) - .getPayloadSerializer()); - - assertNotNull( - "setting 'avro' format for valid schema should yield PayloadSerializer", - transform(configurationBuilder().setFormat("avro")) - .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA) - .getPayloadSerializer()); - } - - @Test - public void testInvalidTaggedInput() { - Row withAllDataTypes = - rowWithAllDataTypes( - true, - (byte) 0, - Instant.now().toDateTime(), - BigDecimal.valueOf(1L), - 3.12345, - 4.1f, - (short) 5, - 2, - 7L, - "asdfjkl;"); - - PCollection<Row> rows = - pipeline.apply(Create.of(withAllDataTypes)).setRowSchema(ALL_DATA_TYPES_SCHEMA); - - assertThrows( - "empty input should not be allowed", - IllegalArgumentException.class, - () -> transform(configurationBuilder()).expand(PCollectionRowTuple.empty(pipeline))); - - assertThrows( - "input with >1 tagged rows should not be allowed", - IllegalArgumentException.class, - () -> - transform(configurationBuilder()) - .expand(PCollectionRowTuple.of(INPUT_TAG, rows).and("somethingelse", rows))); - - assertThrows( - "input missing INPUT tag should not be allowed", - IllegalArgumentException.class, - () -> - transform(configurationBuilder()) - .expand(PCollectionRowTuple.of("somethingelse", rows))); - - pipeline.run(OPTIONS); - } - - @Test - public void testValidateSourceSchemaAgainstConfiguration() { - // Only containing user fields and no configuration details should be valid - transform(configurationBuilder()) - .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA); - - // Matching attributes, timestamp, and payload (bytes) fields configured with expected types - // should be valid - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .setTimestampFieldName("timestamp") - .setPayloadFieldName("payload") - .build())) - .validateSourceSchemaAgainstConfiguration( - Schema.of( - Field.of("attributes", ATTRIBUTES_FIELD_TYPE), - Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE), - Field.of("payload", Schema.FieldType.BYTES))); - - // Matching attributes, timestamp, and payload (ROW) fields configured with expected types - // should be valid - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .setTimestampFieldName("timestamp") - .setPayloadFieldName("payload") - .build())) - .validateSourceSchemaAgainstConfiguration( - Schema.of( - Field.of("attributes", ATTRIBUTES_FIELD_TYPE), - Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE), - Field.of("payload", Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA)))); - - assertThrows( - "empty Schema should be invalid", - IllegalArgumentException.class, - () -> - transform(configurationBuilder()) - .validateSourceSchemaAgainstConfiguration(Schema.of())); - - assertThrows( - "attributes field in configuration but not in schema should be invalid", - IllegalArgumentException.class, - () -> - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .build())) - .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA)); - - assertThrows( - "timestamp field in configuration but not in schema should be invalid", - IllegalArgumentException.class, - () -> - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setTimestampFieldName("timestamp") - .build())) - .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA)); - - assertThrows( - "payload field in configuration but not in schema should be invalid", - IllegalArgumentException.class, - () -> - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setPayloadFieldName("payload") - .build())) - .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA)); - - assertThrows( - "attributes field in configuration but mismatching attributes type should be invalid", - IllegalArgumentException.class, - () -> - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .build())) - .validateSourceSchemaAgainstConfiguration( - // should be FieldType.map(FieldType.STRING, FieldType.STRING) - Schema.of( - Field.of("attributes", FieldType.map(FieldType.BYTES, FieldType.STRING))))); - - assertThrows( - "timestamp field in configuration but mismatching timestamp type should be invalid", - IllegalArgumentException.class, - () -> - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("timestamp") - .build())) - .validateSourceSchemaAgainstConfiguration( - // should be FieldType.DATETIME - Schema.of(Field.of("timestamp", FieldType.STRING)))); - - assertThrows( - "payload field in configuration but mismatching payload type should be invalid", - IllegalArgumentException.class, - () -> - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("payload") - .build())) - .validateSourceSchemaAgainstConfiguration( - // should be FieldType.BYTES or FieldType.row(...) - Schema.of(Field.of("payload", FieldType.STRING)))); - } - - @Test - public void testValidateTargetSchemaAgainstPubsubSchema() throws IOException { - TopicPath topicPath = PubsubClient.topicPathFromPath(TOPIC); - PubsubTestClientFactory noSchemaFactory = - PubsubTestClient.createFactoryForGetSchema(topicPath, null, null); - - PubsubTestClientFactory schemaDeletedFactory = - PubsubTestClient.createFactoryForGetSchema(topicPath, SchemaPath.DELETED_SCHEMA, null); - - PubsubTestClientFactory mismatchingSchemaFactory = - PubsubTestClient.createFactoryForGetSchema( - topicPath, - PubsubClient.schemaPathFromId("testProject", "misMatch"), - Schema.of(Field.of("StringField", FieldType.STRING))); - - PubsubTestClientFactory matchingSchemaFactory = - PubsubTestClient.createFactoryForGetSchema( - topicPath, - PubsubClient.schemaPathFromId("testProject", "match"), - ALL_DATA_TYPES_SCHEMA); - - // Should pass validation exceptions if Pub/Sub topic lacks schema - transform(configurationBuilder()) - .withPubsubClientFactory(noSchemaFactory) - .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS); - noSchemaFactory.close(); - - // Should pass validation if Pub/Sub topic schema deleted - transform(configurationBuilder()) - .withPubsubClientFactory(schemaDeletedFactory) - .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS); - schemaDeletedFactory.close(); - - assertThrows( - "mismatched schema should be detected from Pub/Sub topic", - IllegalStateException.class, - () -> - transform(configurationBuilder()) - .withPubsubClientFactory(mismatchingSchemaFactory) - .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS)); - mismatchingSchemaFactory.close(); - - // Should pass validation if Pub/Sub topic schema matches - transform(configurationBuilder()) - .withPubsubClientFactory(matchingSchemaFactory) - .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS); - matchingSchemaFactory.close(); - } - - @Test - public void testBuildTargetSchema() { - - Field sourceAttributesField = Field.of("attributes", ATTRIBUTES_FIELD_TYPE); - Field sourceTimestampField = Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE); - Field sourcePayloadBytesField = Field.of("payload", FieldType.BYTES); - Field sourcePayloadRowField = Field.of("payload", FieldType.row(ALL_DATA_TYPES_SCHEMA)); - - Field targetAttributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE); - Field targetTimestampField = - Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE); - Field targetPayloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES); - Field targetPayloadRowField = - Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA)); - - assertEquals( - "attributes and timestamp field should append to user fields", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .build())) - .buildTargetSchema(ALL_DATA_TYPES_SCHEMA)); - - assertEquals( - "timestamp field should append to user fields; attributes field name changed", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .build())) - .buildTargetSchema( - Schema.builder() - .addField(sourceAttributesField) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build())); - - assertEquals( - "attributes field should append to user fields; timestamp field name changed", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setTimestampFieldName("timestamp") - .build())) - .buildTargetSchema( - Schema.builder() - .addField(sourceTimestampField) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build())); - - assertEquals( - "attributes and timestamp field appended to user payload bytes field; payload field name changed", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addField(targetPayloadBytesField) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setPayloadFieldName("payload") - .build())) - .buildTargetSchema(Schema.builder().addField(sourcePayloadBytesField).build())); - - assertEquals( - "attributes and timestamp field appended to user payload row field; payload field name changed", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addField(targetPayloadRowField) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setPayloadFieldName("payload") - .build())) - .buildTargetSchema(Schema.builder().addField(sourcePayloadRowField).build())); - - assertEquals( - "attributes and timestamp fields name changed", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .setTimestampFieldName("timestamp") - .build())) - .buildTargetSchema( - Schema.builder() - .addField(sourceAttributesField) - .addField(sourceTimestampField) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build())); - - assertEquals( - "attributes, timestamp, payload bytes fields name changed", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addFields(targetPayloadBytesField) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .setTimestampFieldName("timestamp") - .setPayloadFieldName("payload") - .build())) - .buildTargetSchema( - Schema.builder() - .addField(sourceAttributesField) - .addField(sourceTimestampField) - .addField(sourcePayloadBytesField) - .build())); - - assertEquals( - "attributes, timestamp, payload row fields name changed", - Schema.builder() - .addField(targetAttributesField) - .addField(targetTimestampField) - .addFields(targetPayloadRowField) - .build(), - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() - .setAttributesFieldName("attributes") - .setTimestampFieldName("timestamp") - .setPayloadFieldName("payload") - .build())) - .buildTargetSchema( - Schema.builder() - .addField(sourceAttributesField) - .addField(sourceTimestampField) - .addField(sourcePayloadRowField) - .build())); - } - - @Test - public void testConvertForRowToMessageTransform() { - Row userRow = - rowWithAllDataTypes( - false, - (byte) 0, - Instant.ofEpochMilli(CLOCK.currentTimeMillis()).toDateTime(), - BigDecimal.valueOf(1L), - 1.12345, - 1.1f, - (short) 1, - 1, - 1L, - "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮"); - - Field sourceAttributes = Field.of("attributes", ATTRIBUTES_FIELD_TYPE); - Field targetAttributes = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE); - - Field sourceTimestamp = Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE); - Field targetTimestamp = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE); - - Field sourcePayloadBytes = Field.of("payload", FieldType.BYTES); - Field targetPayloadBytes = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES); - - Field sourcePayloadRow = Field.of("payload", FieldType.row(ALL_DATA_TYPES_SCHEMA)); - Field targetPayloadRow = - Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA)); - - Map<String, String> attributes = ImmutableMap.of("a", "1"); - Instant generatedTimestamp = Instant.ofEpochMilli(CLOCK.currentTimeMillis()); - Instant timestampFromSource = Instant.ofEpochMilli(CLOCK.currentTimeMillis() + 10000L); - byte[] payloadBytes = "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮".getBytes(StandardCharsets.UTF_8); - - PAssert.that( - "attributes only source yields attributes + timestamp target", - pipeline - .apply( - Create.of(Row.withSchema(Schema.of(sourceAttributes)).attachValues(attributes))) - .setRowSchema(Schema.of(sourceAttributes)) - .apply( - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration - .sourceConfigurationBuilder() - .setAttributesFieldName(sourceAttributes.getName()) - .build())) - .convertForRowToMessage( - Schema.of(targetAttributes, targetTimestamp), CLOCK)) - .setRowSchema(Schema.of(targetAttributes, targetTimestamp))) - .containsInAnyOrder( - Row.withSchema(Schema.of(targetAttributes, targetTimestamp)) - .attachValues(attributes, generatedTimestamp)); - - PAssert.that( - "timestamp only source yields attributes + timestamp target", - pipeline - .apply( - Create.of( - Row.withSchema(Schema.of(sourceTimestamp)) - .attachValues(timestampFromSource))) - .setRowSchema(Schema.of(sourceTimestamp)) - .apply( - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration - .sourceConfigurationBuilder() - .setTimestampFieldName(sourceTimestamp.getName()) - .build())) - .convertForRowToMessage( - Schema.of(targetAttributes, targetTimestamp), CLOCK)) - .setRowSchema(Schema.of(targetAttributes, targetTimestamp))) - .containsInAnyOrder( - Row.withSchema(Schema.of(targetAttributes, targetTimestamp)) - .attachValues(ImmutableMap.of(), timestampFromSource)); - - PAssert.that( - "timestamp and attributes source yields renamed fields in target", - pipeline - .apply( - Create.of( - Row.withSchema(Schema.of(sourceAttributes, sourceTimestamp)) - .attachValues(attributes, timestampFromSource))) - .setRowSchema(Schema.of(sourceAttributes, sourceTimestamp)) - .apply( - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration - .sourceConfigurationBuilder() - .setAttributesFieldName(sourceAttributes.getName()) - .setTimestampFieldName(sourceTimestamp.getName()) - .build())) - .convertForRowToMessage( - Schema.of(targetAttributes, targetTimestamp), CLOCK)) - .setRowSchema(Schema.of(targetAttributes, targetTimestamp))) - .containsInAnyOrder( - Row.withSchema(Schema.of(targetAttributes, targetTimestamp)) - .attachValues(attributes, timestampFromSource)); - - PAssert.that( - "bytes payload only source yields attributes + timestamp + renamed bytes payload target", - pipeline - .apply( - Create.of( - Row.withSchema(Schema.of(sourcePayloadBytes)) - .withFieldValue(sourcePayloadBytes.getName(), payloadBytes) - .build())) - .setRowSchema(Schema.of(sourcePayloadBytes)) - .apply( - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration - .sourceConfigurationBuilder() - .setPayloadFieldName(sourcePayloadBytes.getName()) - .build())) - .convertForRowToMessage( - Schema.of(targetAttributes, targetTimestamp, targetPayloadBytes), - CLOCK)) - .setRowSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadBytes))) - .containsInAnyOrder( - Row.withSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadBytes)) - .attachValues(ImmutableMap.of(), generatedTimestamp, payloadBytes)); - - PAssert.that( - "row payload only source yields attributes + timestamp + renamed row payload target", - pipeline - .apply(Create.of(Row.withSchema(Schema.of(sourcePayloadRow)).attachValues(userRow))) - .setRowSchema(Schema.of(sourcePayloadRow)) - .apply( - transform( - configurationBuilder() - .setSource( - PubsubWriteSchemaTransformConfiguration - .sourceConfigurationBuilder() - .setPayloadFieldName(sourcePayloadRow.getName()) - .build())) - .convertForRowToMessage( - Schema.of(targetAttributes, targetTimestamp, targetPayloadRow), CLOCK)) - .setRowSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadRow))) - .containsInAnyOrder( - Row.withSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadRow)) - .attachValues(ImmutableMap.of(), generatedTimestamp, userRow)); - - PAssert.that( - "user only fields source yields attributes + timestamp + user fields target", - pipeline - .apply(Create.of(userRow)) - .setRowSchema(ALL_DATA_TYPES_SCHEMA) - .apply( - transform(configurationBuilder()) - .convertForRowToMessage( - Schema.builder() - .addField(targetAttributes) - .addField(targetTimestamp) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build(), - CLOCK)) - .setRowSchema( - Schema.builder() - .addField(targetAttributes) - .addField(targetTimestamp) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build())) - .containsInAnyOrder( - Row.withSchema( - Schema.builder() - .addField(targetAttributes) - .addField(targetTimestamp) - .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) - .build()) - .addValue(ImmutableMap.of()) - .addValue(generatedTimestamp) - .addValues(userRow.getValues()) - .build()); - - pipeline.run(OPTIONS); - } - - @Test - public void testGetPayloadSerializer() { - Row withAllDataTypes = - rowWithAllDataTypes( - false, - (byte) 0, - Instant.now().toDateTime(), - BigDecimal.valueOf(-1L), - -3.12345, - -4.1f, - (short) -5, - -2, - -7L, - "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮"); - - PayloadSerializer jsonPayloadSerializer = - new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of()); - byte[] expectedJson = jsonPayloadSerializer.serialize(withAllDataTypes); - byte[] actualJson = - transform(configurationBuilder().setFormat("json")) - .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA) - .serialize(withAllDataTypes); - - PayloadSerializer avroPayloadSerializer = - new AvroPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of()); - byte[] expectedAvro = avroPayloadSerializer.serialize(withAllDataTypes); - byte[] actualAvro = - transform(configurationBuilder().setFormat("avro")) - .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA) - .serialize(withAllDataTypes); - - assertArrayEquals( - "configuration with json format should yield JSON PayloadSerializer", - expectedJson, - actualJson); - - assertArrayEquals( - "configuration with avro format should yield Avro PayloadSerializer", - expectedAvro, - actualAvro); - } - - private static PubsubWriteSchemaTransformConfiguration.Builder configurationBuilder() { - return PubsubWriteSchemaTransformConfiguration.builder() - .setTopic(TOPIC) - .setTarget(PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build()); - } - - private static PubsubRowToMessage.Builder rowToMessageBuilder() { - return PubsubRowToMessage.builder(); - } - - private static PubsubIO.Write<PubsubMessage> pubsubWrite() { - return PubsubIO.writeMessages().to(TOPIC); - } - - private static PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform transform( - PubsubWriteSchemaTransformConfiguration.Builder configurationBuilder) { - Row configurationRow = TO_ROW.apply(configurationBuilder.build()); - PubsubWriteSchemaTransformProvider provider = new PubsubWriteSchemaTransformProvider(); - return (PubsubWriteSchemaTransform) provider.from(configurationRow); - } - - private static class MockClock implements Clock, Serializable { - private final Long millis; - - private MockClock(Instant timestamp) { - this.millis = timestamp.getMillis(); - } - - @Override - public long currentTimeMillis() { - return millis; - } - } -}