Removed coder and parseFn from PubsubIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25dc94bc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25dc94bc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25dc94bc Branch: refs/heads/DSL_SQL Commit: 25dc94bc971a0cb8848777c48495929f6dbe8f69 Parents: dc0fdcb Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Apr 20 19:03:27 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue May 2 23:08:29 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 49 ++++--- .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 116 +++++++++++---- .../pubsub/PubsubMessagePayloadOnlyCoder.java | 48 +++++++ .../PubsubMessageWithAttributesCoder.java | 57 ++++++++ .../io/gcp/pubsub/PubsubUnboundedSource.java | 141 +++++++------------ .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 32 ++--- .../gcp/pubsub/PubsubUnboundedSourceTest.java | 108 +++++++------- 7 files changed, 350 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 68cc8e8..6f29797 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -86,6 +86,8 @@ import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -103,6 +105,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -856,29 +859,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // ================================================================================ /** - * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we - * can instead defer to Windmill's implementation. + * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can + * instead defer to Windmill's implementation. */ - private static class StreamingPubsubIORead<T> extends PTransform<PBegin, PCollection<T>> { - private final PubsubUnboundedSource<T> transform; + private static class StreamingPubsubIORead + extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> { + private final PubsubUnboundedSource transform; /** * Builds an instance of this class from the overridden transform. */ public StreamingPubsubIORead( - DataflowRunner runner, PubsubUnboundedSource<T> transform) { + DataflowRunner runner, PubsubUnboundedSource transform) { this.transform = transform; } - PubsubUnboundedSource<T> getOverriddenTransform() { + PubsubUnboundedSource getOverriddenTransform() { return transform; } @Override - public PCollection<T> expand(PBegin input) { - return PCollection.<T>createPrimitiveOutputInternal( + public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) { + return PCollection.<PubsubIO.PubsubMessage>createPrimitiveOutputInternal( input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) - .setCoder(transform.getElementCoder()); + .setCoder(new PubsubMessageWithAttributesCoder()); } @Override @@ -888,19 +892,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { static { DataflowPipelineTranslator.registerTransformTranslator( - StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator<>()); + StreamingPubsubIORead.class, new StreamingPubsubIOReadTranslator()); } } /** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */ - private static class StreamingPubsubIOReadTranslator<T> - implements TransformTranslator<StreamingPubsubIORead<T>> { + private static class StreamingPubsubIOReadTranslator + implements TransformTranslator<StreamingPubsubIORead> { @Override - public void translate(StreamingPubsubIORead<T> transform, TranslationContext context) { + public void translate(StreamingPubsubIORead transform, TranslationContext context) { checkArgument( context.getPipelineOptions().isStreaming(), "StreamingPubsubIORead is only for streaming pipelines."); - PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform(); + PubsubUnboundedSource overriddenTransform = transform.getOverriddenTransform(); StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); stepContext.addInput(PropertyNames.FORMAT, "pubsub"); if (overriddenTransform.getTopicProvider() != null) { @@ -932,16 +936,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { stepContext.addInput( PropertyNames.PUBSUB_ID_ATTRIBUTE, overriddenTransform.getIdAttribute()); } - if (overriddenTransform.getWithAttributesParseFn() != null) { + // In both cases, the transform needs to read PubsubMessage. However, in case it needs + // the attributes, we supply an identity "parse fn" so the worker will read PubsubMessage's + // from Windmill and simply pass them around; and in case it doesn't need attributes, + // we're already implicitly using a "Coder" that interprets the data as a PubsubMessage's + // payload. + if (overriddenTransform.getNeedsAttributes()) { stepContext.addInput( PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, byteArrayToJsonString( - serializeToByteArray(overriddenTransform.getWithAttributesParseFn()))); + serializeToByteArray(new IdentityMessageFn()))); } stepContext.addOutput(context.getOutput(transform)); } } + private static class IdentityMessageFn + extends SimpleFunction<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> { + @Override + public PubsubIO.PubsubMessage apply(PubsubIO.PubsubMessage input) { + return input; + } + } + /** * Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we * can instead defer to Windmill's implementation. http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 129a25f..af8b7d6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -24,6 +24,7 @@ import com.google.auto.value.AutoValue; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; @@ -44,6 +46,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -461,8 +464,34 @@ public class PubsubIO { } /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */ - public static <T> Read<T> read() { - return new AutoValue_PubsubIO_Read.Builder<T>().build(); + private static <T> Read<T> read() { + return new AutoValue_PubsubIO_Read.Builder<T>().setNeedsAttributes(false).build(); + } + + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will only contain a {@link PubsubMessage#getMessage() payload}, but no {@link + * PubsubMessage#getAttributeMap() attributes}. + */ + public static Read<PubsubMessage> readPubsubMessagesWithoutAttributes() { + return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>() + .setCoder(PubsubMessagePayloadOnlyCoder.of()) + .setParseFn(new IdentityMessageFn()) + .setNeedsAttributes(false) + .build(); + } + + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The + * messages will contain both a {@link PubsubMessage#getMessage() payload} and {@link + * PubsubMessage#getAttributeMap() attributes}. + */ + public static Read<PubsubMessage> readPubsubMessagesWithAttributes() { + return new AutoValue_PubsubIO_Read.Builder<PubsubMessage>() + .setCoder(PubsubMessageWithAttributesCoder.of()) + .setParseFn(new IdentityMessageFn()) + .setNeedsAttributes(true) + .build(); } /** @@ -470,7 +499,8 @@ public class PubsubIO { * Pub/Sub stream. */ public static Read<String> readStrings() { - return PubsubIO.<String>read().withCoder(StringUtf8Coder.of()); + return PubsubIO.<String>read().withCoderAndParseFn( + StringUtf8Coder.of(), new ParsePayloadAsUtf8()); } /** @@ -478,7 +508,11 @@ public class PubsubIO { * given type from a Google Cloud Pub/Sub stream. */ public static <T extends Message> Read<T> readProtos(Class<T> messageClass) { - return PubsubIO.<T>read().withCoder(ProtoCoder.of(messageClass)); + // TODO: Stop using ProtoCoder and instead parse the payload directly. + // We should not be relying on the fact that ProtoCoder's wire format is identical to + // the protobuf wire format, as the wire format is not part of a coder's API. + ProtoCoder<T> coder = ProtoCoder.of(messageClass); + return PubsubIO.<T>read().withCoderAndParseFn(coder, new ParsePayloadUsingCoder<>(coder)); } /** @@ -486,7 +520,11 @@ public class PubsubIO { * given type from a Google Cloud Pub/Sub stream. */ public static <T extends Message> Read<T> readAvros(Class<T> clazz) { - return PubsubIO.<T>read().withCoder(AvroCoder.of(clazz)); + // TODO: Stop using AvroCoder and instead parse the payload directly. + // We should not be relying on the fact that AvroCoder's wire format is identical to + // the Avro wire format, as the wire format is not part of a coder's API. + AvroCoder<T> coder = AvroCoder.of(clazz); + return PubsubIO.<T>read().withCoderAndParseFn(coder, new ParsePayloadUsingCoder<>(coder)); } /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ @@ -543,6 +581,8 @@ public class PubsubIO { @Nullable abstract SimpleFunction<PubsubMessage, T> getParseFn(); + abstract boolean getNeedsAttributes(); + abstract Builder<T> toBuilder(); @AutoValue.Builder @@ -559,6 +599,8 @@ public class PubsubIO { abstract Builder<T> setParseFn(SimpleFunction<PubsubMessage, T> parseFn); + abstract Builder<T> setNeedsAttributes(boolean needsAttributes); + abstract Read<T> build(); } @@ -665,20 +707,13 @@ public class PubsubIO { } /** - * Uses the given {@link Coder} to decode each record into a value of type {@code T}. - */ - public Read<T> withCoder(Coder<T> coder) { - return toBuilder().setCoder(coder).build(); - } - - /** * Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the * given parsing function to transform the PubsubMessage into an output type. * A Coder for the output type T must be registered or set on the output via * {@link PCollection#setCoder(Coder)}. */ - public Read<T> withParseFn(SimpleFunction<PubsubMessage, T> parseFn) { - return toBuilder().setParseFn(parseFn).build(); + private Read<T> withCoderAndParseFn(Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) { + return toBuilder().setCoder(coder).setParseFn(parseFn).build(); } @Override @@ -691,10 +726,6 @@ public class PubsubIO { throw new IllegalStateException( "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform"); } - if (getCoder() == null) { - throw new IllegalStateException( - "PubsubIO.Read requires that a coder be set using " + "the withCoder method."); - } @Nullable ValueProvider<ProjectPath> projectPath = @@ -711,17 +742,23 @@ public class PubsubIO { getSubscriptionProvider() == null ? null : NestedValueProvider.of(getSubscriptionProvider(), new SubscriptionPathTranslator()); - PubsubUnboundedSource<T> source = - new PubsubUnboundedSource<T>( + PubsubUnboundedSource source = + new PubsubUnboundedSource( FACTORY, projectPath, topicPath, subscriptionPath, - getCoder(), getTimestampAttribute(), getIdAttribute(), - getParseFn()); - return input.getPipeline().apply(source); + getNeedsAttributes()); + return input + .getPipeline() + .apply(source) + .setCoder( + getNeedsAttributes() + ? PubsubMessageWithAttributesCoder.of() + : PubsubMessagePayloadOnlyCoder.of()) + .apply(MapElements.via(getParseFn())); } @Override @@ -847,7 +884,7 @@ public class PubsubIO { * function translates the input type T to a PubsubMessage object, which is used by the sink * to separately set the PubSub message's payload and attributes. */ - public Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) { + private Write<T> withFormatFn(SimpleFunction<T, PubsubMessage> formatFn) { return toBuilder().setFormatFn(formatFn).build(); } @@ -954,4 +991,35 @@ public class PubsubIO { } } } + + private static class ParsePayloadAsUtf8 extends SimpleFunction<PubsubMessage, String> { + @Override + public String apply(PubsubMessage input) { + return new String(input.getMessage(), StandardCharsets.UTF_8); + } + } + + private static class ParsePayloadUsingCoder<T> extends SimpleFunction<PubsubMessage, T> { + private Coder<T> coder; + + public ParsePayloadUsingCoder(Coder<T> coder) { + this.coder = coder; + } + + @Override + public T apply(PubsubMessage input) { + try { + return CoderUtils.decodeFromByteArray(coder, input.getMessage()); + } catch (CoderException e) { + throw new RuntimeException("Could not decode Pubsub message", e); + } + } + } + + private static class IdentityMessageFn extends SimpleFunction<PubsubMessage, PubsubMessage> { + @Override + public PubsubMessage apply(PubsubMessage input) { + return input; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java new file mode 100644 index 0000000..f0dae46 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.util.StreamUtils; + +/** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload. */ +public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubIO.PubsubMessage> { + public static PubsubMessagePayloadOnlyCoder of() { + return new PubsubMessagePayloadOnlyCoder(); + } + + @Override + public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context) + throws IOException { + checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); + outStream.write(value.getMessage()); + } + + @Override + public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException { + checkState(context.isWholeStream, "Expected to only be used in a whole-stream context"); + return new PubsubIO.PubsubMessage( + StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java new file mode 100644 index 0000000..27f0f02 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** A coder for PubsubMessage including attributes. */ +public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.PubsubMessage> { + private static final Coder<byte[]> PAYLOAD_CODER = + NullableCoder.of(ByteArrayCoder.of()); + private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of( + StringUtf8Coder.of(), StringUtf8Coder.of()); + + public static PubsubMessageWithAttributesCoder of() { + return new PubsubMessageWithAttributesCoder(); + } + + public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context) + throws IOException { + PAYLOAD_CODER.encode( + value.getMessage(), + outStream, + context.nested()); + ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context); + } + + @Override + public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested()); + Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context); + return new PubsubIO.PubsubMessage(payload, attributes); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 903ae41..d366949 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -47,7 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; @@ -66,13 +65,11 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BucketingFunction; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MovingFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -110,7 +107,7 @@ import org.slf4j.LoggerFactory; * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide latency. * </ul> */ -public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> { +public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> { private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class); /** @@ -121,7 +118,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> /** * Coder for checkpoints. */ - private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder<>(); + private static final PubsubCheckpointCoder CHECKPOINT_CODER = new PubsubCheckpointCoder(); /** * Maximum number of messages per pull. @@ -231,7 +228,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> * we need to restore. */ @VisibleForTesting - static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark { + static class PubsubCheckpoint implements UnboundedSource.CheckpointMark { /** * The {@link SubscriptionPath} to the subscription the reader is reading from. May be * {@code null} if the {@link PubsubUnboundedSource} contains the subscription. @@ -247,7 +244,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> * the 'true' active reader may have changed. */ @Nullable - private PubsubReader<T> reader; + private PubsubReader reader; /** * If the checkpoint is for persisting: The ACK ids of messages which have been passed @@ -268,7 +265,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> public PubsubCheckpoint( @Nullable String subscriptionPath, - @Nullable PubsubReader<T> reader, + @Nullable PubsubReader reader, @Nullable List<String> safeToAckIds, List<String> notYetReadIds) { this.subscriptionPath = subscriptionPath; @@ -327,7 +324,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> /** * Return current time according to {@code reader}. */ - private static long now(PubsubReader<?> reader) { + private static long now(PubsubReader reader) { if (reader.outer.outer.clock == null) { return System.currentTimeMillis(); } else { @@ -340,7 +337,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> * NACK all messages which have been read from Pubsub but not passed downstream. * This way Pubsub will send them again promptly. */ - public void nackAll(PubsubReader<T> reader) throws IOException { + public void nackAll(PubsubReader reader) throws IOException { checkState(this.reader == null, "Cannot nackAll on persisting checkpoint"); List<String> batchYetToAckIds = new ArrayList<>(Math.min(notYetReadIds.size(), ACK_BATCH_SIZE)); @@ -360,13 +357,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } /** The coder for our checkpoints. */ - private static class PubsubCheckpointCoder<T> extends CustomCoder<PubsubCheckpoint<T>> { + private static class PubsubCheckpointCoder extends CustomCoder<PubsubCheckpoint> { private static final Coder<String> SUBSCRIPTION_PATH_CODER = NullableCoder.of(StringUtf8Coder.of()); private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of()); @Override - public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Context context) + public void encode(PubsubCheckpoint value, OutputStream outStream, Context context) throws IOException { SUBSCRIPTION_PATH_CODER.encode( value.subscriptionPath, @@ -376,10 +373,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } @Override - public PubsubCheckpoint<T> decode(InputStream inStream, Context context) throws IOException { + public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException { String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested()); List<String> notYetReadIds = LIST_CODER.decode(inStream, context); - return new PubsubCheckpoint<>(path, null, null, notYetReadIds); + return new PubsubCheckpoint(path, null, null, notYetReadIds); } } @@ -392,16 +389,14 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> * but not yet consumed downstream and/or ACKed back to Pubsub. */ @VisibleForTesting - static class PubsubReader<T> extends UnboundedSource.UnboundedReader<T> { + static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubIO.PubsubMessage> { /** * For access to topic and checkpointCoder. */ - private final PubsubSource<T> outer; + private final PubsubSource outer; @VisibleForTesting final SubscriptionPath subscription; - private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn; - /** * Client on which to talk to Pubsub. Contains a null value if the client has been closed. */ @@ -593,12 +588,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> /** * Construct a reader. */ - public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription, - SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) + public PubsubReader(PubsubOptions options, PubsubSource outer, SubscriptionPath subscription) throws IOException, GeneralSecurityException { this.outer = outer; this.subscription = subscription; - this.parseFn = parseFn; pubsubClient = new AtomicReference<>( outer.outer.pubsubFactory.newClient( @@ -970,20 +963,11 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } @Override - public T getCurrent() throws NoSuchElementException { + public PubsubIO.PubsubMessage getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - try { - if (parseFn != null) { - return parseFn.apply(new PubsubIO.PubsubMessage( - current.elementBytes, current.attributes)); - } else { - return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes); - } - } catch (CoderException e) { - throw new RuntimeException("Unable to decode element from Pubsub message: ", e); - } + return new PubsubIO.PubsubMessage(current.elementBytes, current.attributes); } @Override @@ -1031,7 +1015,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } @Override - public PubsubSource<T> getCurrentSource() { + public PubsubSource getCurrentSource() { return outer; } @@ -1073,7 +1057,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } @Override - public PubsubCheckpoint<T> getCheckpointMark() { + public PubsubCheckpoint getCheckpointMark() { int cur = numInFlightCheckpoints.incrementAndGet(); maxInFlightCheckpoints = Math.max(maxInFlightCheckpoints, cur); // It's possible for a checkpoint to be taken but never finalized. @@ -1087,10 +1071,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } if (outer.subscriptionPath == null) { // need to include the subscription in case we resume, as it's not stored in the source. - return new PubsubCheckpoint<>( + return new PubsubCheckpoint( subscription.getPath(), this, snapshotSafeToAckIds, snapshotNotYetReadIds); } - return new PubsubCheckpoint<>(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds); + return new PubsubCheckpoint(null, this, snapshotSafeToAckIds, snapshotNotYetReadIds); } @Override @@ -1104,28 +1088,28 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> // ================================================================================ @VisibleForTesting - static class PubsubSource<T> extends UnboundedSource<T, PubsubCheckpoint<T>> { - public final PubsubUnboundedSource<T> outer; + static class PubsubSource extends UnboundedSource<PubsubIO.PubsubMessage, PubsubCheckpoint> { + public final PubsubUnboundedSource outer; // The subscription to read from. @VisibleForTesting final SubscriptionPath subscriptionPath; - public PubsubSource(PubsubUnboundedSource<T> outer) { + public PubsubSource(PubsubUnboundedSource outer) { this(outer, outer.getSubscription()); } - private PubsubSource(PubsubUnboundedSource<T> outer, SubscriptionPath subscriptionPath) { + private PubsubSource(PubsubUnboundedSource outer, SubscriptionPath subscriptionPath) { this.outer = outer; this.subscriptionPath = subscriptionPath; } @Override - public List<PubsubSource<T>> split( + public List<PubsubSource> split( int desiredNumSplits, PipelineOptions options) throws Exception { - List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits); - PubsubSource<T> splitSource = this; + List<PubsubSource> result = new ArrayList<>(desiredNumSplits); + PubsubSource splitSource = this; if (subscriptionPath == null) { - splitSource = new PubsubSource<>(outer, outer.createRandomSubscription(options)); + splitSource = new PubsubSource(outer, outer.createRandomSubscription(options)); } for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) { // Since the source is immutable and Pubsub automatically shards we simply @@ -1136,10 +1120,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } @Override - public PubsubReader<T> createReader( + public PubsubReader createReader( PipelineOptions options, - @Nullable PubsubCheckpoint<T> checkpoint) { - PubsubReader<T> reader; + @Nullable PubsubCheckpoint checkpoint) { + PubsubReader reader; SubscriptionPath subscription = subscriptionPath; if (subscription == null) { if (checkpoint == null) { @@ -1151,8 +1135,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } } try { - reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription, - outer.parseFn); + reader = new PubsubReader(options.as(PubsubOptions.class), this, subscription); } catch (GeneralSecurityException | IOException e) { throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e); } @@ -1171,15 +1154,15 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @Nullable @Override - public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() { - @SuppressWarnings("unchecked") PubsubCheckpointCoder<T> typedCoder = - (PubsubCheckpointCoder<T>) CHECKPOINT_CODER; + public Coder<PubsubCheckpoint> getCheckpointMarkCoder() { + @SuppressWarnings("unchecked") PubsubCheckpointCoder typedCoder = + (PubsubCheckpointCoder) CHECKPOINT_CODER; return typedCoder; } @Override - public Coder<T> getDefaultOutputCoder() { - return outer.elementCoder; + public Coder<PubsubIO.PubsubMessage> getDefaultOutputCoder() { + return new PubsubMessageWithAttributesCoder(); } @Override @@ -1198,7 +1181,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> // StatsFn // ================================================================================ - private static class StatsFn<T> extends DoFn<T, T> { + private static class StatsFn extends DoFn<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> { private final Counter elementCounter = SourceMetrics.elementsRead(); private final PubsubClientFactory pubsubFactory; @@ -1292,13 +1275,6 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> private ValueProvider<SubscriptionPath> subscription; /** - * Coder for elements. Elements are effectively double-encoded: first to a byte array - * using this checkpointCoder, then to a base-64 string to conform to Pubsub's payload - * conventions. - */ - private final Coder<T> elementCoder; - - /** * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use * Pubsub message publish timestamp instead. */ @@ -1312,12 +1288,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @Nullable private final String idAttribute; - /** - * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be - * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes. - */ - @Nullable - SimpleFunction<PubsubIO.PubsubMessage, T> parseFn; + /** Whether this source should load the attributes of the PubsubMessage, or only the payload. */ + private final boolean needsAttributes; @VisibleForTesting PubsubUnboundedSource( @@ -1326,10 +1298,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @Nullable ValueProvider<ProjectPath> project, @Nullable ValueProvider<TopicPath> topic, @Nullable ValueProvider<SubscriptionPath> subscription, - Coder<T> elementCoder, @Nullable String timestampAttribute, @Nullable String idAttribute, - @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) { + boolean needsAttributes) { checkArgument((topic == null) != (subscription == null), "Exactly one of topic and subscription must be given"); checkArgument((topic == null) == (project == null), @@ -1339,10 +1310,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> this.project = project; this.topic = topic; this.subscription = subscription; - this.elementCoder = checkNotNull(elementCoder); this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; - this.parseFn = parseFn; + this.needsAttributes = needsAttributes; } /** @@ -1353,27 +1323,18 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> @Nullable ValueProvider<ProjectPath> project, @Nullable ValueProvider<TopicPath> topic, @Nullable ValueProvider<SubscriptionPath> subscription, - Coder<T> elementCoder, @Nullable String timestampAttribute, @Nullable String idAttribute, - @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) { + boolean needsAttributes) { this( null, pubsubFactory, project, topic, subscription, - elementCoder, timestampAttribute, idAttribute, - parseFn); - } - - /** - * Get the coder used for elements. - */ - public Coder<T> getElementCoder() { - return elementCoder; + needsAttributes); } /** @@ -1432,20 +1393,16 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> return idAttribute; } - /** - * Get the parsing function for PubSub attributes. - */ - @Nullable - public SimpleFunction<PubsubIO.PubsubMessage, T> getWithAttributesParseFn() { - return parseFn; + public boolean getNeedsAttributes() { + return needsAttributes; } @Override - public PCollection<T> expand(PBegin input) { + public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) { return input.getPipeline().begin() - .apply(Read.from(new PubsubSource<T>(this))) + .apply(Read.from(new PubsubSource(this))) .apply("PubsubUnboundedSource.Stats", - ParDo.of(new StatsFn<T>( + ParDo.of(new StatsFn( pubsubFactory, subscription, topic, timestampAttribute, idAttribute))); } http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index 5f06b88..8f5d1ea 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -49,19 +49,19 @@ public class PubsubIOTest { @Test public void testPubsubIOGetName() { assertEquals("PubsubIO.Read", - PubsubIO.<String>read().fromTopic("projects/myproject/topics/mytopic").getName()); + PubsubIO.readStrings().fromTopic("projects/myproject/topics/mytopic").getName()); assertEquals("PubsubIO.Write", - PubsubIO.<String>write().to("projects/myproject/topics/mytopic").getName()); + PubsubIO.writeStrings().to("projects/myproject/topics/mytopic").getName()); } @Test public void testTopicValidationSuccess() throws Exception { - PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc"); - PubsubIO.<String>read().fromTopic("projects/my-project/topics/ABC"); - PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-DeF"); - PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234"); - PubsubIO.<String>read().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc"); - PubsubIO.<String>read().fromTopic(new StringBuilder() + PubsubIO.readStrings().fromTopic("projects/my-project/topics/abc"); + PubsubIO.readStrings().fromTopic("projects/my-project/topics/ABC"); + PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-DeF"); + PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-1234"); + PubsubIO.readStrings().fromTopic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc"); + PubsubIO.readStrings().fromTopic(new StringBuilder() .append("projects/my-project/topics/A-really-long-one-") .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") @@ -72,13 +72,13 @@ public class PubsubIOTest { @Test public void testTopicValidationBadCharacter() throws Exception { thrown.expect(IllegalArgumentException.class); - PubsubIO.<String>read().fromTopic("projects/my-project/topics/abc-*-abc"); + PubsubIO.readStrings().fromTopic("projects/my-project/topics/abc-*-abc"); } @Test public void testTopicValidationTooLong() throws Exception { thrown.expect(IllegalArgumentException.class); - PubsubIO.<String>read().fromTopic(new StringBuilder().append + PubsubIO.readStrings().fromTopic(new StringBuilder().append ("projects/my-project/topics/A-really-long-one-") .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111") @@ -91,7 +91,7 @@ public class PubsubIOTest { String topic = "projects/project/topics/topic"; String subscription = "projects/project/subscriptions/subscription"; Duration maxReadTime = Duration.standardMinutes(5); - PubsubIO.Read<String> read = PubsubIO.<String>read() + PubsubIO.Read<String> read = PubsubIO.readStrings() .fromTopic(StaticValueProvider.of(topic)) .withTimestampAttribute("myTimestamp") .withIdAttribute("myId"); @@ -108,7 +108,7 @@ public class PubsubIOTest { String topic = "projects/project/topics/topic"; String subscription = "projects/project/subscriptions/subscription"; Duration maxReadTime = Duration.standardMinutes(5); - PubsubIO.Read<String> read = PubsubIO.<String>read() + PubsubIO.Read<String> read = PubsubIO.readStrings() .fromSubscription(StaticValueProvider.of(subscription)) .withTimestampAttribute("myTimestamp") .withIdAttribute("myId"); @@ -123,7 +123,7 @@ public class PubsubIOTest { @Test public void testNullTopic() { String subscription = "projects/project/subscriptions/subscription"; - PubsubIO.Read<String> read = PubsubIO.<String>read() + PubsubIO.Read<String> read = PubsubIO.readStrings() .fromSubscription(StaticValueProvider.of(subscription)); assertNull(read.getTopicProvider()); assertNotNull(read.getSubscriptionProvider()); @@ -133,7 +133,7 @@ public class PubsubIOTest { @Test public void testNullSubscription() { String topic = "projects/project/topics/topic"; - PubsubIO.Read<String> read = PubsubIO.<String>read() + PubsubIO.Read<String> read = PubsubIO.readStrings() .fromTopic(StaticValueProvider.of(topic)); assertNotNull(read.getTopicProvider()); assertNull(read.getSubscriptionProvider()); @@ -166,7 +166,7 @@ public class PubsubIOTest { @Test public void testWriteDisplayData() { String topic = "projects/project/topics/topic"; - PubsubIO.Write<?> write = PubsubIO.<String>write() + PubsubIO.Write<?> write = PubsubIO.writeStrings() .to(topic) .withTimestampAttribute("myTimestamp") .withIdAttribute("myId"); @@ -182,7 +182,7 @@ public class PubsubIOTest { @Category(ValidatesRunner.class) public void testPrimitiveWriteDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PubsubIO.Write<?> write = PubsubIO.<String>write().to("projects/project/topics/topic"); + PubsubIO.Write<?> write = PubsubIO.writeStrings().to("projects/project/topics/topic"); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("PubsubIO.Write should include the topic in its primitive display data", http://git-wip-us.apache.org/repos/asf/beam/blob/25dc94bc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index dc66ea1..592dfa3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -34,12 +34,12 @@ import static org.junit.Assert.assertTrue; import com.google.api.client.util.Clock; import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; @@ -79,7 +79,7 @@ public class PubsubUnboundedSourceTest { private AtomicLong now; private Clock clock; private PubsubTestClientFactory factory; - private PubsubSource<String> primSource; + private PubsubSource primSource; @Rule public TestPipeline p = TestPipeline.create(); @@ -93,11 +93,11 @@ public class PubsubUnboundedSourceTest { } }; factory = PubsubTestClient.createFactoryForPull(clock, SUBSCRIPTION, ACK_TIMEOUT_S, incoming); - PubsubUnboundedSource<String> source = - new PubsubUnboundedSource<>( + PubsubUnboundedSource source = + new PubsubUnboundedSource( clock, factory, null, null, StaticValueProvider.of(SUBSCRIPTION), - StringUtf8Coder.of(), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, null); - primSource = new PubsubSource<>(source); + TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, true /* needsAttributes */); + primSource = new PubsubSource(source); } private void setupOneMessage() { @@ -114,6 +114,10 @@ public class PubsubUnboundedSourceTest { factory = null; } + private static String data(PubsubIO.PubsubMessage message) { + return new String(message.getMessage(), StandardCharsets.UTF_8); + } + @Test public void checkpointCoderIsSane() throws Exception { setupOneMessage(ImmutableList.<IncomingMessage>of()); @@ -126,13 +130,13 @@ public class PubsubUnboundedSourceTest { @Test public void readOneMessage() throws IOException { setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubReader reader = primSource.createReader(p.getOptions(), null); // Read one message. assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); + assertEquals(DATA, data(reader.getCurrent())); assertFalse(reader.advance()); // ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); checkpoint.finalizeCheckpoint(); reader.close(); } @@ -140,19 +144,19 @@ public class PubsubUnboundedSourceTest { @Test public void timeoutAckAndRereadOneMessage() throws IOException { setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubReader reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); + assertEquals(DATA, data(reader.getCurrent())); // Let the ACK deadline for the above expire. now.addAndGet(65 * 1000); pubsubClient.advance(); // We'll now receive the same message again. assertTrue(reader.advance()); - assertEquals(DATA, reader.getCurrent()); + assertEquals(DATA, data(reader.getCurrent())); assertFalse(reader.advance()); // Now ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); checkpoint.finalizeCheckpoint(); reader.close(); } @@ -160,11 +164,11 @@ public class PubsubUnboundedSourceTest { @Test public void extendAck() throws IOException { setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubReader reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Pull the first message but don't take a checkpoint for it. assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); + assertEquals(DATA, data(reader.getCurrent())); // Extend the ack now.addAndGet(55 * 1000); pubsubClient.advance(); @@ -174,7 +178,7 @@ public class PubsubUnboundedSourceTest { pubsubClient.advance(); assertFalse(reader.advance()); // Now ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); checkpoint.finalizeCheckpoint(); reader.close(); } @@ -182,11 +186,11 @@ public class PubsubUnboundedSourceTest { @Test public void timeoutAckExtensions() throws IOException { setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubReader reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Pull the first message but don't take a checkpoint for it. assertTrue(reader.start()); - assertEquals(DATA, reader.getCurrent()); + assertEquals(DATA, data(reader.getCurrent())); // Extend the ack. now.addAndGet(55 * 1000); pubsubClient.advance(); @@ -202,9 +206,9 @@ public class PubsubUnboundedSourceTest { pubsubClient.advance(); // Reread the same message. assertTrue(reader.advance()); - assertEquals(DATA, reader.getCurrent()); + assertEquals(DATA, data(reader.getCurrent())); // Now ACK the message. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); checkpoint.finalizeCheckpoint(); reader.close(); } @@ -218,20 +222,20 @@ public class PubsubUnboundedSourceTest { incoming.add(new IncomingMessage(data.getBytes(), null, TIMESTAMP, 0, ackid, RECORD_ID)); } setupOneMessage(incoming); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubReader reader = primSource.createReader(p.getOptions(), null); // Consume two messages, only read one. assertTrue(reader.start()); - assertEquals("data_0", reader.getCurrent()); + assertEquals("data_0", data(reader.getCurrent())); // Grab checkpoint. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); checkpoint.finalizeCheckpoint(); assertEquals(1, checkpoint.notYetReadIds.size()); assertEquals("ackid_1", checkpoint.notYetReadIds.get(0)); // Read second message. assertTrue(reader.advance()); - assertEquals("data_1", reader.getCurrent()); + assertEquals("data_1", data(reader.getCurrent())); // Restore from checkpoint. byte[] checkpointBytes = @@ -244,7 +248,7 @@ public class PubsubUnboundedSourceTest { // Re-read second message. reader = primSource.createReader(p.getOptions(), checkpoint); assertTrue(reader.start()); - assertEquals("data_1", reader.getCurrent()); + assertEquals("data_1", data(reader.getCurrent())); // We are done. assertFalse(reader.advance()); @@ -278,7 +282,7 @@ public class PubsubUnboundedSourceTest { } setupOneMessage(incoming); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubReader reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); for (int i = 0; i < n; i++) { @@ -290,7 +294,7 @@ public class PubsubUnboundedSourceTest { // We'll checkpoint and ack within the 2min limit. now.addAndGet(30); pubsubClient.advance(); - String data = reader.getCurrent(); + String data = data(reader.getCurrent()); Integer messageNum = dataToMessageNum.remove(data); // No duplicate messages. assertNotNull(messageNum); @@ -310,7 +314,7 @@ public class PubsubUnboundedSourceTest { } assertThat(watermark, lessThanOrEqualTo(minOutstandingTimestamp)); // Ack messages, but only every other finalization. - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); if (i % 2000 == 1999) { checkpoint.finalizeCheckpoint(); } @@ -327,26 +331,25 @@ public class PubsubUnboundedSourceTest { public void noSubscriptionSplitGeneratesSubscription() throws Exception { TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic"); factory = PubsubTestClient.createFactoryForCreateSubscription(); - PubsubUnboundedSource<String> source = - new PubsubUnboundedSource<>( + PubsubUnboundedSource source = + new PubsubUnboundedSource( factory, StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), StaticValueProvider.of(topicPath), - null, - StringUtf8Coder.of(), - null, - null, - null); + null /* subscription */, + null /* timestampLabel */, + null /* idLabel */, + false /* needsAttributes */); assertThat(source.getSubscription(), nullValue()); assertThat(source.getSubscription(), nullValue()); PipelineOptions options = PipelineOptionsFactory.create(); - List<PubsubSource<String>> splits = - (new PubsubSource<>(source)).split(3, options); + List<PubsubSource> splits = + (new PubsubSource(source)).split(3, options); // We have at least one returned split assertThat(splits, hasSize(greaterThan(0))); - for (PubsubSource<String> split : splits) { + for (PubsubSource split : splits) { // Each split is equal assertThat(split, equalTo(splits.get(0))); } @@ -358,37 +361,36 @@ public class PubsubUnboundedSourceTest { public void noSubscriptionNoSplitGeneratesSubscription() throws Exception { TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic"); factory = PubsubTestClient.createFactoryForCreateSubscription(); - PubsubUnboundedSource<String> source = - new PubsubUnboundedSource<>( + PubsubUnboundedSource source = + new PubsubUnboundedSource( factory, StaticValueProvider.of(PubsubClient.projectPathFromId("my_project")), StaticValueProvider.of(topicPath), - null, - StringUtf8Coder.of(), - null, - null, - null); + null /* subscription */, + null /* timestampLabel */, + null /* idLabel */, + false /* needsAttributes */); assertThat(source.getSubscription(), nullValue()); assertThat(source.getSubscription(), nullValue()); PipelineOptions options = PipelineOptionsFactory.create(); - PubsubSource<String> actualSource = new PubsubSource<>(source); - PubsubReader<String> reader = actualSource.createReader(options, null); + PubsubSource actualSource = new PubsubSource(source); + PubsubReader reader = actualSource.createReader(options, null); SubscriptionPath createdSubscription = reader.subscription; assertThat(createdSubscription, not(nullValue())); - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); assertThat(checkpoint.subscriptionPath, equalTo(createdSubscription.getPath())); checkpoint.finalizeCheckpoint(); - PubsubCheckpoint<String> deserCheckpoint = + PubsubCheckpoint deserCheckpoint = CoderUtils.clone(actualSource.getCheckpointMarkCoder(), checkpoint); assertThat(checkpoint.subscriptionPath, not(nullValue())); assertThat(checkpoint.subscriptionPath, equalTo(deserCheckpoint.subscriptionPath)); - PubsubReader<String> readerFromOriginal = actualSource.createReader(options, checkpoint); - PubsubReader<String> readerFromDeser = actualSource.createReader(options, deserCheckpoint); + PubsubReader readerFromOriginal = actualSource.createReader(options, checkpoint); + PubsubReader readerFromDeser = actualSource.createReader(options, deserCheckpoint); assertThat(readerFromOriginal.subscription, equalTo(createdSubscription)); assertThat(readerFromDeser.subscription, equalTo(createdSubscription)); @@ -400,9 +402,9 @@ public class PubsubUnboundedSourceTest { @Test public void closeWithActiveCheckpoints() throws Exception { setupOneMessage(); - PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); + PubsubReader reader = primSource.createReader(p.getOptions(), null); reader.start(); - PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark(); + PubsubCheckpoint checkpoint = reader.getCheckpointMark(); reader.close(); checkpoint.finalizeCheckpoint(); }