This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c472530 Change PubSubSource and PubSubSink translation to avoid special transform overrides. new a48abeb Merge pull request #14384 from [BEAM-10861] Change PubSubSource and PubSubSink translation to avoid special transform overrides c472530 is described below commit c4725301da8f1fbc3982bca986f4d9a1b9a4ce19 Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Tue Mar 30 18:00:59 2021 -0700 Change PubSubSource and PubSubSink translation to avoid special transform overrides. --- .../pipeline/src/main/proto/beam_runner_api.proto | 4 + .../beam/runners/dataflow/DataflowRunner.java | 180 +------------------- sdks/java/io/google-cloud-platform/build.gradle | 3 +- .../io/gcp/pubsub/PubSubPayloadTranslation.java | 159 +++++++++++++++++ .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 78 ++++++--- .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 73 +++++--- .../sdk/io/gcp/pubsub/RunnerImplementedSink.java | 68 -------- .../pubsub/RunnerImplementedSinkTranslation.java | 87 ---------- .../sdk/io/gcp/pubsub/RunnerImplementedSource.java | 83 --------- .../pubsub/RunnerImplementedSourceTranslation.java | 102 ----------- ....java => PubSubReadPayloadTranslationTest.java} | 189 +++++++++++---------- ...java => PubSubWritePayloadTranslationTest.java} | 37 ++-- .../sdk/io/gcp/pubsub/PubsubIOExternalTest.java | 12 +- .../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 80 +++++++-- 14 files changed, 450 insertions(+), 705 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 134fcb6..138e352 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -696,6 +696,8 @@ message WriteFilesPayload { // with a native implementation. // The SDK should guarantee that only one of topic, subscription, // topic_runtime_overridden and subscription_runtime_overridden is set. +// The output of PubSubReadPayload should be bytes of serialized PubsubMessage +// proto if with_attributes == true. Otherwise, the bytes is the raw payload. message PubSubReadPayload { // Topic to read from. Exactly one of topic or subscription should be set. @@ -727,6 +729,8 @@ message PubSubReadPayload { // with a native implementation. // The SDK should guarantee that only one of topic and topic_runtime_overridden // is set. +// The output of PubSubWritePayload should be bytes if serialized PubsubMessage +// proto. message PubSubWritePayload { // Topic to write to. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a06951f..cdb7e67 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -95,7 +95,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; @@ -113,12 +112,8 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesAndMessageIdCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSink; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -137,11 +132,9 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; @@ -165,7 +158,6 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; @@ -207,7 +199,6 @@ import org.slf4j.LoggerFactory; public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class); - /** Provided configuration options. */ private final DataflowPipelineOptions options; @@ -455,27 +446,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); } - /** For portable jobs, Dataflow still requires an override of the PubsubIO transforms. */ - private List<PTransformOverride> getPortableOverrides() { - ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder(); - - if (!hasExperiment(options, "enable_custom_pubsub_source")) { - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(PubsubUnboundedSource.class), - new DataflowReadFromPubsubSourceForRunnerV2OverrideFactory())); - } - - if (!hasExperiment(options, "enable_custom_pubsub_sink")) { - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), - new DataflowWriteToPubsubRunnerV2OverrideFactory())); - } - - return overridesBuilder.build(); - } - private List<PTransformOverride> getOverrides(boolean streaming) { boolean fnApiEnabled = useUnifiedWorker(options); ImmutableList.Builder<PTransformOverride> overridesBuilder = ImmutableList.builder(); @@ -506,12 +476,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } if (streaming) { if (!hasExperiment(options, "enable_custom_pubsub_source")) { - if (useUnifiedWorker(options)) { - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(PubsubUnboundedSource.class), - new DataflowReadFromPubsubSourceForRunnerV2OverrideFactory())); - } else { + if (!useUnifiedWorker(options)) { overridesBuilder.add( PTransformOverride.of( PTransformMatchers.classEqualTo(PubsubUnboundedSource.class), @@ -519,12 +484,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } if (!hasExperiment(options, "enable_custom_pubsub_sink")) { - if (useUnifiedWorker(options)) { - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), - new DataflowWriteToPubsubRunnerV2OverrideFactory())); - } else { + if (!useUnifiedWorker(options)) { overridesBuilder.add( PTransformOverride.of( PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), @@ -989,9 +949,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { .addAllCapabilities(Environments.getJavaCapabilities()) .build()); - if (useUnifiedWorker(options)) { - pipeline.replaceAll(getPortableOverrides()); - } RunnerApi.Pipeline portablePipelineProto = PipelineTranslation.toProto(pipeline, portableComponents, false); LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(portablePipelineProto)); @@ -1495,25 +1452,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - private static class DataflowReadFromPubsubSourceForRunnerV2OverrideFactory - implements PTransformOverrideFactory< - PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> { - - @Override - public PTransformReplacement<PBegin, PCollection<PubsubMessage>> getReplacementTransform( - AppliedPTransform<PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> transform) { - return PTransformReplacement.of( - transform.getPipeline().begin(), - new DataflowReadFromPubsubForRunnerV2(transform.getTransform())); - } - - @Override - public Map<PCollection<?>, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PCollection<?>> outputs, PCollection<PubsubMessage> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } - } - /** * Suppress application of {@link PubsubUnboundedSource#expand} in streaming mode so that we can * instead defer to Windmill's implementation. @@ -1552,38 +1490,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - public static class DataflowReadFromPubsubForRunnerV2 - extends PTransform<PBegin, PCollection<PubsubMessage>> { - - private final PubsubUnboundedSource transform; - - public DataflowReadFromPubsubForRunnerV2(PubsubUnboundedSource transform) { - this.transform = transform; - } - - @Override - public PCollection<PubsubMessage> expand(PBegin input) { - PCollection<byte[]> outputFromRunner = - input.apply( - "DataflowReadFromPubsubRunnerV2", new RunnerImplementedSource(this.transform)); - - SerializableFunction<byte[], PubsubMessage> function; - if (transform.getNeedsAttributes() || transform.getNeedsMessageId()) { - function = new PubsubMessages.ParsePubsubMessageProtoAsPayload(); - } else { - function = new DeserializeBytesIntoPubsubMessagePayloadOnly(); - } - return outputFromRunner.apply( - "MapBytesToPubsubMessages", - MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function)); - } - - static { - DataflowPipelineTranslator.registerTransformTranslator( - RunnerImplementedSource.class, new DataflowPubsubSourceRunnerV2Translator()); - } - } - private static void translateOverriddenPubsubSourceStep( PubsubUnboundedSource overriddenTransform, StepTranslationContext stepTranslationContext) { stepTranslationContext.addInput(PropertyNames.FORMAT, "pubsub"); @@ -1642,20 +1548,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - private static class DataflowPubsubSourceRunnerV2Translator - implements TransformTranslator<RunnerImplementedSource> { - - @Override - public void translate(RunnerImplementedSource transform, TranslationContext context) { - checkArgument( - context.getPipelineOptions().isStreaming(), - "DataflowPubsubSourceRunnerV2Translator is only for streaming pipelines."); - StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); - translateOverriddenPubsubSourceStep(transform.getOverriddenSource(), stepContext); - stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform)); - } - } - private static class IdentityMessageFn extends SimpleFunction<PubsubMessage, PubsubMessage> { @Override @@ -1715,20 +1607,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - /** Rewrite {@link RunnerImplementedSink} to the appropriate internal node. */ - static class DataflowRunnerV2PubsubSinkTranslator - implements TransformTranslator<RunnerImplementedSink> { - @Override - public void translate(RunnerImplementedSink transform, TranslationContext context) { - checkArgument( - context.getPipelineOptions().isStreaming(), - "DataflowRunnerV2PubsubSink is only for streaming pipelines."); - StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite"); - StreamingPubsubSinkTranslators.translate( - transform.getOverrideSink(), stepContext, context.getInput(transform)); - } - } - private static void translate( PubsubUnboundedSink overriddenTransform, StepTranslationContext stepContext, @@ -2206,60 +2084,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - /** - * A replacement {@link PTransform} for {@link PubsubUnboundedSink} when using dataflow runner v2. - */ - private static class DataflowWriteToPubsubForRunnerV2 - extends PTransform<PCollection<PubsubMessage>, PDone> { - - private final PubsubUnboundedSink transform; - - public DataflowWriteToPubsubForRunnerV2(PubsubUnboundedSink transform) { - this.transform = transform; - } - - @Override - public PDone expand(PCollection<PubsubMessage> input) { - input - .apply( - "Output Serialized PubsubMessage Proto", - MapElements.into(new TypeDescriptor<byte[]>() {}) - .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto())) - .setCoder(ByteArrayCoder.of()) - .apply("Write to Runner Implemented Sink", new RunnerImplementedSink(transform)); - - return PDone.in(input.getPipeline()); - } - - static { - DataflowPipelineTranslator.registerTransformTranslator( - RunnerImplementedSink.class, - new StreamingPubsubSinkTranslators.DataflowRunnerV2PubsubSinkTranslator()); - } - } - - /** - * A {@link PTransformOverrideFactory} to provide replacement {@link PTransform} for {@link - * PubsubUnboundedSink} when using dataflow runner v2. - */ - private static class DataflowWriteToPubsubRunnerV2OverrideFactory - implements PTransformOverrideFactory<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> { - - @Override - public PTransformReplacement<PCollection<PubsubMessage>, PDone> getReplacementTransform( - AppliedPTransform<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> transform) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - new DataflowWriteToPubsubForRunnerV2(transform.getTransform())); - } - - @Override - public Map<PCollection<?>, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PCollection<?>> outputs, PDone newOutput) { - return Collections.emptyMap(); - } - } - @VisibleForTesting static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT> implements PTransformOverrideFactory< diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 4b25f56..579a352 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -23,8 +23,7 @@ applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.gcp', enableSpotbugs: false, classesTriggerCheckerBugs: [ - 'RunnerImplementedSourceTranslation': 'https://github.com/typetools/checker-framework/issues/3791', - 'RunnerImplementedSinkTranslation': 'https://github.com/typetools/checker-framework/issues/3791', + 'PubSubPayloadTranslation': 'https://github.com/typetools/checker-framework/issues/3791', ], ) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java new file mode 100644 index 0000000..a1a2675 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload; +import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.PubsubSource; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; + +@SuppressWarnings({ + "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +/** + * Utility methods for translating a {@link Unbounded} which reads from {@link + * PubsubUnboundedSource} to {@link RunnerApi} representations. + */ +public class PubSubPayloadTranslation { + static class PubSubReadPayloadTranslator + implements TransformPayloadTranslator<Read.Unbounded<?>> { + + @Override + public String getUrn(Read.Unbounded<?> transform) { + if (!(transform.getSource() instanceof PubsubUnboundedSource.PubsubSource)) { + return null; + } + return PTransformTranslation.PUBSUB_READ; + } + + @Override + public RunnerApi.FunctionSpec translate( + AppliedPTransform<?, ?, Unbounded<?>> transform, SdkComponents components) { + if (!(transform.getTransform().getSource() instanceof PubsubUnboundedSource.PubsubSource)) { + return null; + } + PubSubReadPayload.Builder payloadBuilder = PubSubReadPayload.newBuilder(); + PubsubUnboundedSource pubsubUnboundedSource = + ((PubsubSource) transform.getTransform().getSource()).outer; + ValueProvider<TopicPath> topicProvider = pubsubUnboundedSource.getTopicProvider(); + if (topicProvider != null) { + if (topicProvider.isAccessible()) { + payloadBuilder.setTopic(topicProvider.get().getFullPath()); + } else { + payloadBuilder.setTopicRuntimeOverridden( + ((NestedValueProvider) topicProvider).propertyName()); + } + } + ValueProvider<SubscriptionPath> subscriptionProvider = + pubsubUnboundedSource.getSubscriptionProvider(); + if (subscriptionProvider != null) { + if (subscriptionProvider.isAccessible()) { + payloadBuilder.setSubscription(subscriptionProvider.get().getFullPath()); + } else { + payloadBuilder.setSubscriptionRuntimeOverridden( + ((NestedValueProvider) subscriptionProvider).propertyName()); + } + } + + if (pubsubUnboundedSource.getTimestampAttribute() != null) { + payloadBuilder.setTimestampAttribute(pubsubUnboundedSource.getTimestampAttribute()); + } + if (pubsubUnboundedSource.getIdAttribute() != null) { + payloadBuilder.setIdAttribute(pubsubUnboundedSource.getIdAttribute()); + } + payloadBuilder.setWithAttributes( + pubsubUnboundedSource.getNeedsAttributes() || pubsubUnboundedSource.getNeedsMessageId()); + return FunctionSpec.newBuilder() + .setUrn(getUrn(transform.getTransform())) + .setPayload(payloadBuilder.build().toByteString()) + .build(); + } + } + + static class PubSubWritePayloadTranslator + implements TransformPayloadTranslator<PubsubUnboundedSink.PubsubSink> { + + @Override + public String getUrn(PubsubUnboundedSink.PubsubSink transform) { + return PTransformTranslation.PUBSUB_WRITE; + } + + @Override + public RunnerApi.FunctionSpec translate( + AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform, + SdkComponents components) { + PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder(); + ValueProvider<TopicPath> topicProvider = transform.getTransform().outer.getTopicProvider(); + if (topicProvider.isAccessible()) { + payloadBuilder.setTopic(topicProvider.get().getFullPath()); + } else { + payloadBuilder.setTopicRuntimeOverridden( + ((NestedValueProvider) topicProvider).propertyName()); + } + if (transform.getTransform().outer.getTimestampAttribute() != null) { + payloadBuilder.setTimestampAttribute( + transform.getTransform().outer.getTimestampAttribute()); + } + if (transform.getTransform().outer.getIdAttribute() != null) { + payloadBuilder.setIdAttribute(transform.getTransform().outer.getIdAttribute()); + } + return FunctionSpec.newBuilder() + .setUrn(getUrn(transform.getTransform())) + .setPayload(payloadBuilder.build().toByteString()) + .build(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap( + PubsubUnboundedSink.PubsubSink.class, new PubSubWritePayloadTranslator()); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(Read.Unbounded.class, new PubSubReadPayloadTranslator()); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 5c9fca3..e6ce089 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; @@ -46,6 +47,7 @@ import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing; import org.checkerframework.checker.nullness.qual.Nullable; @@ -141,7 +144,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, // ================================================================================ /** Convert elements to messages and shard them. */ - private static class ShardFn extends DoFn<PubsubMessage, KV<Integer, OutgoingMessage>> { + private static class ShardFn extends DoFn<byte[], KV<Integer, OutgoingMessage>> { private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements"); private final int numShards; private final RecordIdMethod recordIdMethod; @@ -154,8 +157,9 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, @ProcessElement public void processElement(ProcessContext c) throws Exception { elementCounter.inc(); - PubsubMessage message = c.element(); - byte[] elementBytes = message.getPayload(); + com.google.pubsub.v1.PubsubMessage message = + com.google.pubsub.v1.PubsubMessage.parseFrom(c.element()); + byte[] elementBytes = message.getData().toByteArray(); long timestampMsSinceEpoch = c.timestamp().getMillis(); @Nullable String recordId = null; @@ -409,29 +413,51 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, @Override public PDone expand(PCollection<PubsubMessage> input) { - input - .apply( - "PubsubUnboundedSink.Window", - Window.<PubsubMessage>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterFirst.of( - AfterPane.elementCountAtLeast(publishBatchSize), - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(maxLatency)))) - .discardingFiredPanes()) - .apply("PubsubUnboundedSink.Shard", ParDo.of(new ShardFn(numShards, recordIdMethod))) - .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) - .apply(GroupByKey.create()) + return input .apply( - "PubsubUnboundedSink.Writer", - ParDo.of( - new WriterFn( - pubsubFactory, - topic, - timestampAttribute, - idAttribute, - publishBatchSize, - publishBatchBytes))); - return PDone.in(input.getPipeline()); + "Output Serialized PubsubMessage Proto", + MapElements.into(new TypeDescriptor<byte[]>() {}) + .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto())) + .setCoder(ByteArrayCoder.of()) + .apply(new PubsubSink(this)); + } + + static class PubsubSink extends PTransform<PCollection<byte[]>, PDone> { + public final PubsubUnboundedSink outer; + + PubsubSink(PubsubUnboundedSink outer) { + this.outer = outer; + } + + @Override + public PDone expand(PCollection<byte[]> input) { + input + .apply( + "PubsubUnboundedSink.Window", + Window.<byte[]>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterPane.elementCountAtLeast(outer.publishBatchSize), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(outer.maxLatency)))) + .discardingFiredPanes()) + .apply( + "PubsubUnboundedSink.Shard", + ParDo.of(new ShardFn(outer.numShards, outer.recordIdMethod))) + .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) + .apply(GroupByKey.create()) + .apply( + "PubsubUnboundedSink.Writer", + ParDo.of( + new WriterFn( + outer.pubsubFactory, + outer.topic, + outer.timestampAttribute, + outer.idAttribute, + outer.publishBatchSize, + outer.publishBatchBytes))); + return PDone.in(input.getPipeline()); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 37d0e94..bd5e868 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; @@ -53,15 +54,19 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.SourceMetrics; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -70,6 +75,7 @@ import org.apache.beam.sdk.util.BucketingFunction; import org.apache.beam.sdk.util.MovingFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; @@ -358,7 +364,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub * consumed downstream and/or ACKed back to Pubsub. */ @VisibleForTesting - static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubMessage> { + static class PubsubReader extends UnboundedSource.UnboundedReader<byte[]> { /** For access to topic and checkpointCoder. */ private final PubsubSource outer; @@ -882,14 +888,14 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub } @Override - public PubsubMessage getCurrent() throws NoSuchElementException { + public byte[] getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return new PubsubMessage( - current.message().getData().toByteArray(), - current.message().getAttributesMap(), - current.recordId()); + if (this.outer.outer.getNeedsMessageId() || this.outer.outer.getNeedsAttributes()) { + return current.message().toByteArray(); + } + return current.message().getData().toByteArray(); } @Override @@ -1010,7 +1016,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub // ================================================================================ @VisibleForTesting - static class PubsubSource extends UnboundedSource<PubsubMessage, PubsubCheckpoint> { + static class PubsubSource extends UnboundedSource<byte[], PubsubCheckpoint> { public final PubsubUnboundedSource outer; // The subscription to read from. @VisibleForTesting final ValueProvider<SubscriptionPath> subscriptionPath; @@ -1086,16 +1092,8 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub } @Override - public Coder<PubsubMessage> getOutputCoder() { - if (outer.getNeedsMessageId()) { - return outer.getNeedsAttributes() - ? PubsubMessageWithAttributesAndMessageIdCoder.of() - : PubsubMessageWithMessageIdCoder.of(); - } else { - return outer.getNeedsAttributes() - ? PubsubMessageWithAttributesCoder.of() - : PubsubMessagePayloadOnlyCoder.of(); - } + public Coder<byte[]> getOutputCoder() { + return ByteArrayCoder.of(); } @Override @@ -1336,14 +1334,39 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub @Override public PCollection<PubsubMessage> expand(PBegin input) { - return input - .getPipeline() - .begin() - .apply(Read.from(new PubsubSource(this))) - .apply( - "PubsubUnboundedSource.Stats", - ParDo.of( - new StatsFn(pubsubFactory, subscription, topic, timestampAttribute, idAttribute))); + SerializableFunction<byte[], PubsubMessage> function; + if (getNeedsAttributes() || getNeedsMessageId()) { + function = new PubsubMessages.ParsePubsubMessageProtoAsPayload(); + } else { + function = new DeserializeBytesIntoPubsubMessagePayloadOnly(); + } + PCollection<PubsubMessage> messages = + input + .getPipeline() + .begin() + .apply(Read.from(new PubsubSource(this))) + .apply( + "MapBytesToPubsubMessages", + MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(function)); + if (usesStatsFn(input.getPipeline().getOptions())) { + messages = + messages.apply( + "PubsubUnboundedSource.Stats", + ParDo.of( + new StatsFn( + pubsubFactory, subscription, topic, timestampAttribute, idAttribute))); + } + return messages; + } + + private boolean usesStatsFn(PipelineOptions options) { + if (ExperimentalOptions.hasExperiment(options, "enable_custom_pubsub_source")) { + return true; + } + if (!options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) { + return true; + } + return false; } private SubscriptionPath createRandomSubscription(PipelineOptions options) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java deleted file mode 100644 index 6c70b50..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSink.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -@SuppressWarnings({ - "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -/** - * A {@link PTransform} which represents a runner implemented Pubsub sink. {@link - * RunnerImplementedSinkTranslator} will translate this transform into well-known composite. - */ -@Internal -public class RunnerImplementedSink extends PTransform<PCollection<byte[]>, PDone> { - private final PubsubUnboundedSink sink; - - public RunnerImplementedSink(PubsubUnboundedSink sink) { - this.sink = sink; - } - - public PubsubUnboundedSink getOverrideSink() { - return sink; - } - - public ValueProvider<TopicPath> getTopicProvider() { - return sink.getTopicProvider(); - } - - public String getTimestampAttribute() { - return sink.getTimestampAttribute(); - } - - public String getIdAttribute() { - return sink.getIdAttribute(); - } - - @Override - public PDone expand(PCollection<byte[]> input) { - return PDone.in(input.getPipeline()); - } - - @Override - protected String getKindString() { - return "RunnerImplementedSink"; - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java deleted file mode 100644 index 056871f..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslation.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import com.google.auto.service.AutoService; -import java.util.Collections; -import java.util.Map; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; -import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload; -import org.apache.beam.runners.core.construction.PTransformTranslation; -import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.runners.core.construction.SdkComponents; -import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; - -@SuppressWarnings({ - "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -/** - * Utility methods for translating a {@link RunnerImplementedSink} to {@link RunnerApi} - * representations. - */ -public class RunnerImplementedSinkTranslation { - static class RunnerImplementedSinkTranslator - implements TransformPayloadTranslator<RunnerImplementedSink> { - - @Override - public String getUrn(RunnerImplementedSink transform) { - return PTransformTranslation.PUBSUB_WRITE; - } - - @Override - public RunnerApi.FunctionSpec translate( - AppliedPTransform<?, ?, RunnerImplementedSink> transform, SdkComponents components) { - PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder(); - ValueProvider<TopicPath> topicProvider = transform.getTransform().getTopicProvider(); - if (topicProvider.isAccessible()) { - payloadBuilder.setTopic(topicProvider.get().getFullPath()); - } else { - payloadBuilder.setTopicRuntimeOverridden( - ((NestedValueProvider) topicProvider).propertyName()); - } - if (transform.getTransform().getTimestampAttribute() != null) { - payloadBuilder.setTimestampAttribute(transform.getTransform().getTimestampAttribute()); - } - if (transform.getTransform().getIdAttribute() != null) { - payloadBuilder.setIdAttribute(transform.getTransform().getIdAttribute()); - } - return FunctionSpec.newBuilder() - .setUrn(getUrn(transform.getTransform())) - .setPayload(payloadBuilder.build().toByteString()) - .build(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class Registrar implements TransformPayloadTranslatorRegistrar { - - @Override - public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return Collections.singletonMap( - RunnerImplementedSink.class, new RunnerImplementedSinkTranslator()); - } - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java deleted file mode 100644 index b1170fc..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSource.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.WindowingStrategy; - -@SuppressWarnings({ - "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -/** - * A {@link PTransform} which represents a runner implemented Pubsub source. {@link - * RunnerImplementedSourceTranslator} will translate this transform into well-known composite. - */ -@Internal -public class RunnerImplementedSource extends PTransform<PBegin, PCollection<byte[]>> { - private final PubsubUnboundedSource source; - - public RunnerImplementedSource(PubsubUnboundedSource source) { - this.source = source; - } - - public PubsubUnboundedSource getOverriddenSource() { - return source; - } - - public ValueProvider<TopicPath> getTopicProvider() { - return source.getTopicProvider(); - } - - public ValueProvider<SubscriptionPath> getSubscriptionProvider() { - return source.getSubscriptionProvider(); - } - - public String getTimestampAttribute() { - return source.getTimestampAttribute(); - } - - public String getIdAttribute() { - return source.getIdAttribute(); - } - - public boolean isWithAttributes() { - return source.getNeedsAttributes() || source.getNeedsMessageId(); - } - - @Override - public PCollection<byte[]> expand(PBegin input) { - ByteArrayCoder coder = ByteArrayCoder.of(); - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder) - .setCoder(coder); - } - - @Override - protected String getKindString() { - return "RunnerImplementedSource"; - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java deleted file mode 100644 index 5f002d0..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslation.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import com.google.auto.service.AutoService; -import java.util.Collections; -import java.util.Map; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; -import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload; -import org.apache.beam.runners.core.construction.PTransformTranslation; -import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.runners.core.construction.SdkComponents; -import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; -import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; - -@SuppressWarnings({ - "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556) - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -/** - * Utility methods for translating a {@link RunnerImplementedSource} to {@link RunnerApi} - * representations. - */ -public class RunnerImplementedSourceTranslation { - static class RunnerImplementedSourceTranslator - implements TransformPayloadTranslator<RunnerImplementedSource> { - - @Override - public String getUrn(RunnerImplementedSource transform) { - return PTransformTranslation.PUBSUB_READ; - } - - @Override - public RunnerApi.FunctionSpec translate( - AppliedPTransform<?, ?, RunnerImplementedSource> transform, SdkComponents components) { - PubSubReadPayload.Builder payloadBuilder = PubSubReadPayload.newBuilder(); - ValueProvider<TopicPath> topicProvider = transform.getTransform().getTopicProvider(); - if (topicProvider != null) { - if (topicProvider.isAccessible()) { - payloadBuilder.setTopic(topicProvider.get().getFullPath()); - } else { - payloadBuilder.setTopicRuntimeOverridden( - ((NestedValueProvider) topicProvider).propertyName()); - } - } - ValueProvider<SubscriptionPath> subscriptionProvider = - transform.getTransform().getSubscriptionProvider(); - if (subscriptionProvider != null) { - if (subscriptionProvider.isAccessible()) { - payloadBuilder.setSubscription(subscriptionProvider.get().getFullPath()); - } else { - payloadBuilder.setSubscriptionRuntimeOverridden( - ((NestedValueProvider) subscriptionProvider).propertyName()); - } - } - - if (transform.getTransform().getTimestampAttribute() != null) { - payloadBuilder.setTimestampAttribute(transform.getTransform().getTimestampAttribute()); - } - if (transform.getTransform().getIdAttribute() != null) { - payloadBuilder.setIdAttribute(transform.getTransform().getIdAttribute()); - } - payloadBuilder.setWithAttributes(transform.getTransform().isWithAttributes()); - return FunctionSpec.newBuilder() - .setUrn(getUrn(transform.getTransform())) - .setPayload(payloadBuilder.build().toByteString()) - .build(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class Registrar implements TransformPayloadTranslatorRegistrar { - - @Override - public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return Collections.singletonMap( - RunnerImplementedSource.class, new RunnerImplementedSourceTranslator()); - } - } -} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java similarity index 53% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java index d1cea3a..9e43b59 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSourceTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubReadPayloadTranslationTest.java @@ -25,9 +25,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubReadPayload; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSourceTranslation.RunnerImplementedSourceTranslator; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -43,15 +43,15 @@ import org.junit.runners.Parameterized.Parameters; /** Test RunnerImplementedSourceTranslator. */ @RunWith(Parameterized.class) -public class RunnerImplementedSourceTranslationTest { +public class PubSubReadPayloadTranslationTest { private static final String TIMESTAMP_ATTRIBUTE = "timestamp"; private static final String ID_ATTRIBUTE = "id"; private static final String PROJECT = "project"; private static final TopicPath TOPIC = PubsubClient.topicPathFromName(PROJECT, "testTopic"); private static final SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName(PROJECT, "testSubscription"); - private final RunnerImplementedSourceTranslator sourceTranslator = - new RunnerImplementedSourceTranslator(); + private final PubSubPayloadTranslation.PubSubReadPayloadTranslator sourceTranslator = + new PubSubPayloadTranslation.PubSubReadPayloadTranslator(); public static TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); private static final ValueProvider<TopicPath> TOPIC_PROVIDER = pipeline.newProvider(TOPIC); @@ -64,16 +64,17 @@ public class RunnerImplementedSourceTranslationTest { new Object[][] { { // Read payload only from TOPIC. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - StaticValueProvider.of(TOPIC), - null /* subscription */, - null /* timestampLabel */, - null /* idLabel */, - false /* needsAttributes */, - false /* needsMessageId*/)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + StaticValueProvider.of(TOPIC), + null /* subscription */, + null /* timestampLabel */, + null /* idLabel */, + false /* needsAttributes */, + false /* needsMessageId*/))), PubSubReadPayload.newBuilder() .setTopic(TOPIC.getFullPath()) .setWithAttributes(false) @@ -81,16 +82,17 @@ public class RunnerImplementedSourceTranslationTest { }, { // Read with attributes and message id from TOPIC. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - StaticValueProvider.of(TOPIC), - null /* subscription */, - TIMESTAMP_ATTRIBUTE /* timestampLabel */, - ID_ATTRIBUTE /* idLabel */, - true /* needsAttributes */, - true /* needsMessageId */)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + StaticValueProvider.of(TOPIC), + null /* subscription */, + TIMESTAMP_ATTRIBUTE /* timestampLabel */, + ID_ATTRIBUTE /* idLabel */, + true /* needsAttributes */, + true /* needsMessageId */))), PubSubReadPayload.newBuilder() .setTopic(TOPIC.getFullPath()) .setIdAttribute(ID_ATTRIBUTE) @@ -100,16 +102,17 @@ public class RunnerImplementedSourceTranslationTest { }, { // Read payload from runtime provided topic. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - TOPIC_PROVIDER, - null /* subscription */, - null /* timestampLabel */, - null /* idLabel */, - false /* needsAttributes */, - false /* needsMessageId */)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + TOPIC_PROVIDER, + null /* subscription */, + null /* timestampLabel */, + null /* idLabel */, + false /* needsAttributes */, + false /* needsMessageId */))), PubSubReadPayload.newBuilder() .setTopicRuntimeOverridden(((NestedValueProvider) TOPIC_PROVIDER).propertyName()) .setWithAttributes(false) @@ -117,16 +120,17 @@ public class RunnerImplementedSourceTranslationTest { }, { // Read payload with attributes and message id from runtime provided topic. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - TOPIC_PROVIDER, - null /* subscription */, - TIMESTAMP_ATTRIBUTE /* timestampLabel */, - ID_ATTRIBUTE /* idLabel */, - true /* needsAttributes */, - true /* needsMessageId */)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + TOPIC_PROVIDER, + null /* subscription */, + TIMESTAMP_ATTRIBUTE /* timestampLabel */, + ID_ATTRIBUTE /* idLabel */, + true /* needsAttributes */, + true /* needsMessageId */))), PubSubReadPayload.newBuilder() .setTopicRuntimeOverridden(((NestedValueProvider) TOPIC_PROVIDER).propertyName()) .setIdAttribute(ID_ATTRIBUTE) @@ -136,16 +140,17 @@ public class RunnerImplementedSourceTranslationTest { }, { // Read payload only from SUBSCRIPTION. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - null /* topic */, - StaticValueProvider.of(SUBSCRIPTION), - null /* timestampLabel */, - null /* idLabel */, - false /* needsAttributes */, - false /* needsMessageId */)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + null /* topic */, + StaticValueProvider.of(SUBSCRIPTION), + null /* timestampLabel */, + null /* idLabel */, + false /* needsAttributes */, + false /* needsMessageId */))), PubSubReadPayload.newBuilder() .setSubscription(SUBSCRIPTION.getFullPath()) .setWithAttributes(false) @@ -153,16 +158,17 @@ public class RunnerImplementedSourceTranslationTest { }, { // Read payload with attributes and message id from SUBSCRIPTION. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - null /* topic */, - StaticValueProvider.of(SUBSCRIPTION), - TIMESTAMP_ATTRIBUTE /* timestampLabel */, - ID_ATTRIBUTE /* idLabel */, - true /* needsAttributes */, - true /* needsMessageId */)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + null /* topic */, + StaticValueProvider.of(SUBSCRIPTION), + TIMESTAMP_ATTRIBUTE /* timestampLabel */, + ID_ATTRIBUTE /* idLabel */, + true /* needsAttributes */, + true /* needsMessageId */))), PubSubReadPayload.newBuilder() .setSubscription(SUBSCRIPTION.getFullPath()) .setIdAttribute(ID_ATTRIBUTE) @@ -172,16 +178,17 @@ public class RunnerImplementedSourceTranslationTest { }, { // Read payload only from runtime provided subscription. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - null /* topic */, - SUBSCRIPTION_PROVIDER, - null /* timestampLabel */, - null /* idLabel */, - false /* needsAttributes */, - false /* needsMessageId */)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + null /* topic */, + SUBSCRIPTION_PROVIDER, + null /* timestampLabel */, + null /* idLabel */, + false /* needsAttributes */, + false /* needsMessageId */))), PubSubReadPayload.newBuilder() .setSubscriptionRuntimeOverridden( ((NestedValueProvider) SUBSCRIPTION_PROVIDER).propertyName()) @@ -190,16 +197,17 @@ public class RunnerImplementedSourceTranslationTest { }, { // Read payload with attributes and message id from runtime provided subscription. - new RunnerImplementedSource( - new PubsubUnboundedSource( - PubsubTestClient.createFactoryForCreateSubscription(), - StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), - null /* topic */, - SUBSCRIPTION_PROVIDER, - TIMESTAMP_ATTRIBUTE /* timestampLabel */, - ID_ATTRIBUTE /* idLabel */, - true /* needsAttributes */, - true /* needsMessageId */)), + Read.from( + new PubsubUnboundedSource.PubsubSource( + new PubsubUnboundedSource( + PubsubTestClient.createFactoryForCreateSubscription(), + StaticValueProvider.of(PubsubClient.projectPathFromId(PROJECT)), + null /* topic */, + SUBSCRIPTION_PROVIDER, + TIMESTAMP_ATTRIBUTE /* timestampLabel */, + ID_ATTRIBUTE /* idLabel */, + true /* needsAttributes */, + true /* needsMessageId */))), PubSubReadPayload.newBuilder() .setSubscriptionRuntimeOverridden( ((NestedValueProvider) SUBSCRIPTION_PROVIDER).propertyName()) @@ -212,24 +220,25 @@ public class RunnerImplementedSourceTranslationTest { } @Parameter(0) - public RunnerImplementedSource runnerImplementedSource; + public Read.Unbounded<byte[]> readFromPubSub; @Parameter(1) public PubSubReadPayload pubsubReadPayload; @Test public void testTranslateSourceToFunctionSpec() throws Exception { - PCollection<byte[]> output = pipeline.apply(runnerImplementedSource); - AppliedPTransform<?, ?, RunnerImplementedSource> appliedPTransform = + PCollection<byte[]> output = pipeline.apply(readFromPubSub); + AppliedPTransform<?, ?, Read.Unbounded<byte[]>> appliedPTransform = AppliedPTransform.of( - "sink", + "ReadFromPubsub", PValues.expandInput(pipeline.begin()), PValues.expandOutput(output), - runnerImplementedSource, + readFromPubSub, pipeline); SdkComponents components = SdkComponents.create(); components.registerEnvironment(Environments.createDockerEnvironment("java")); - RunnerApi.FunctionSpec spec = sourceTranslator.translate(appliedPTransform, components); + RunnerApi.FunctionSpec spec = + sourceTranslator.translate((AppliedPTransform) appliedPTransform, components); assertEquals(PTransformTranslation.PUBSUB_READ, spec.getUrn()); PubSubReadPayload result = PubSubReadPayload.parseFrom(spec.getPayload()); assertEquals(pubsubReadPayload, result); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java similarity index 80% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java index 52ebd22..872e671 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/RunnerImplementedSinkTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java @@ -25,8 +25,9 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PubSubWritePayload; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SdkComponents; +import org.apache.beam.sdk.io.gcp.pubsub.PubSubPayloadTranslation.PubSubWritePayloadTranslator; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; -import org.apache.beam.sdk.io.gcp.pubsub.RunnerImplementedSinkTranslation.RunnerImplementedSinkTranslator; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink.PubsubSink; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -44,18 +45,18 @@ import org.junit.runners.JUnit4; /** Test RunnerImplementedSinkTranslator. */ @RunWith(JUnit4.class) -public class RunnerImplementedSinkTranslationTest { +public class PubSubWritePayloadTranslationTest { private static final String TIMESTAMP_ATTRIBUTE = "timestamp"; private static final String ID_ATTRIBUTE = "id"; private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); - private final RunnerImplementedSinkTranslator sinkTranslator = - new RunnerImplementedSinkTranslator(); + private final PubSubPayloadTranslation.PubSubWritePayloadTranslator sinkTranslator = + new PubSubWritePayloadTranslator(); @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Test public void testTranslateSinkWithTopic() throws Exception { - PubsubUnboundedSink pubsubSink = + PubsubUnboundedSink pubsubUnboundedSink = new PubsubUnboundedSink( null, StaticValueProvider.of(TOPIC), @@ -66,16 +67,12 @@ public class RunnerImplementedSinkTranslationTest { 0, Duration.ZERO, null); - RunnerImplementedSink runnerImplementedSink = new RunnerImplementedSink(pubsubSink); + PubsubUnboundedSink.PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSink); PCollection<byte[]> input = pipeline.apply(Create.of(new byte[0])); - PDone output = input.apply(runnerImplementedSink); - AppliedPTransform<?, ?, RunnerImplementedSink> appliedPTransform = + PDone output = input.apply(pubsubSink); + AppliedPTransform<?, ?, PubsubSink> appliedPTransform = AppliedPTransform.of( - "sink", - PValues.expandInput(input), - PValues.expandOutput(output), - runnerImplementedSink, - pipeline); + "sink", PValues.expandInput(input), PValues.expandOutput(output), pubsubSink, pipeline); SdkComponents components = SdkComponents.create(); components.registerEnvironment(Environments.createDockerEnvironment("java")); RunnerApi.FunctionSpec spec = sinkTranslator.translate(appliedPTransform, components); @@ -91,19 +88,15 @@ public class RunnerImplementedSinkTranslationTest { @Test public void testTranslateSinkWithTopicOverridden() throws Exception { ValueProvider<TopicPath> runtimeProvider = pipeline.newProvider(TOPIC); - PubsubUnboundedSink pubsubSink = + PubsubUnboundedSink pubsubUnboundedSinkSink = new PubsubUnboundedSink( null, runtimeProvider, TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, 0, 0, Duration.ZERO, null); - RunnerImplementedSink runnerImplementedSink = new RunnerImplementedSink(pubsubSink); + PubsubSink pubsubSink = new PubsubSink(pubsubUnboundedSinkSink); PCollection<byte[]> input = pipeline.apply(Create.of(new byte[0])); - PDone output = input.apply(runnerImplementedSink); - AppliedPTransform<?, ?, RunnerImplementedSink> appliedPTransform = + PDone output = input.apply(pubsubSink); + AppliedPTransform<?, ?, PubsubSink> appliedPTransform = AppliedPTransform.of( - "sink", - PValues.expandInput(input), - PValues.expandOutput(output), - runnerImplementedSink, - pipeline); + "sink", PValues.expandInput(input), PValues.expandOutput(output), pubsubSink, pipeline); SdkComponents components = SdkComponents.create(); components.registerEnvironment(Environments.createDockerEnvironment("java")); RunnerApi.FunctionSpec spec = sinkTranslator.translate(appliedPTransform, components); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index e9c2a15..7e7e2e2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -159,13 +159,17 @@ public class PubsubIOExternalTest { RunnerApi.PTransform writeComposite = result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)); - // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer + // test_namespacetest/PubsubUnboundedSink/PubsubSink RunnerApi.PTransform writeComposite2 = - result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(3)); + result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(1)); - // test_namespacetest/PubsubUnboundedSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer) + // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer + RunnerApi.PTransform writeComposite3 = + result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(3)); + + // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer) RunnerApi.PTransform writeParDo = - result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(0)); + result.getComponents().getTransformsOrThrow(writeComposite3.getSubtransforms(0)); RunnerApi.ParDoPayload parDoPayload = RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index 8c25c40..a9564d3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertTrue; import com.google.api.client.util.Clock; import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -120,8 +121,12 @@ public class PubsubUnboundedSourceTest { factory = null; } - private static String data(PubsubMessage message) { - return new String(message.getPayload(), StandardCharsets.UTF_8); + private static String data(byte[] message, boolean payloadOnly) throws Exception { + if (payloadOnly) { + return new String(message, StandardCharsets.UTF_8); + } + PubsubMessage data = PubsubMessage.parseFrom(message); + return new String(data.getData().toByteArray(), StandardCharsets.UTF_8); } @Test @@ -134,12 +139,16 @@ public class PubsubUnboundedSourceTest { } @Test - public void readOneMessage() throws IOException { + public void readOneMessage() throws Exception { setupOneMessage(); PubsubReader reader = primSource.createReader(p.getOptions(), null); // Read one message. assertTrue(reader.start()); - assertEquals(DATA, data(reader.getCurrent())); + assertEquals( + DATA, + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); assertFalse(reader.advance()); // ACK the message. PubsubCheckpoint checkpoint = reader.getCheckpointMark(); @@ -148,18 +157,26 @@ public class PubsubUnboundedSourceTest { } @Test - public void timeoutAckAndRereadOneMessage() throws IOException { + public void timeoutAckAndRereadOneMessage() throws Exception { setupOneMessage(); PubsubReader reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); assertTrue(reader.start()); - assertEquals(DATA, data(reader.getCurrent())); + assertEquals( + DATA, + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); // Let the ACK deadline for the above expire. now.addAndGet(65 * 1000); pubsubClient.advance(); // We'll now receive the same message again. assertTrue(reader.advance()); - assertEquals(DATA, data(reader.getCurrent())); + assertEquals( + DATA, + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); assertFalse(reader.advance()); // Now ACK the message. PubsubCheckpoint checkpoint = reader.getCheckpointMark(); @@ -168,13 +185,17 @@ public class PubsubUnboundedSourceTest { } @Test - public void extendAck() throws IOException { + public void extendAck() throws Exception { setupOneMessage(); PubsubReader reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Pull the first message but don't take a checkpoint for it. assertTrue(reader.start()); - assertEquals(DATA, data(reader.getCurrent())); + assertEquals( + DATA, + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); // Extend the ack now.addAndGet(55 * 1000); pubsubClient.advance(); @@ -190,13 +211,17 @@ public class PubsubUnboundedSourceTest { } @Test - public void timeoutAckExtensions() throws IOException { + public void timeoutAckExtensions() throws Exception { setupOneMessage(); PubsubReader reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Pull the first message but don't take a checkpoint for it. assertTrue(reader.start()); - assertEquals(DATA, data(reader.getCurrent())); + assertEquals( + DATA, + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); // Extend the ack. now.addAndGet(55 * 1000); pubsubClient.advance(); @@ -212,7 +237,11 @@ public class PubsubUnboundedSourceTest { pubsubClient.advance(); // Reread the same message. assertTrue(reader.advance()); - assertEquals(DATA, data(reader.getCurrent())); + assertEquals( + DATA, + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); // Now ACK the message. PubsubCheckpoint checkpoint = reader.getCheckpointMark(); checkpoint.finalizeCheckpoint(); @@ -220,7 +249,7 @@ public class PubsubUnboundedSourceTest { } @Test - public void multipleReaders() throws IOException { + public void multipleReaders() throws Exception { List<IncomingMessage> incoming = new ArrayList<>(); for (int i = 0; i < 2; i++) { String data = String.format("data_%d", i); @@ -239,7 +268,11 @@ public class PubsubUnboundedSourceTest { PubsubReader reader = primSource.createReader(p.getOptions(), null); // Consume two messages, only read one. assertTrue(reader.start()); - assertEquals("data_0", data(reader.getCurrent())); + assertEquals( + "data_0", + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); // Grab checkpoint. PubsubCheckpoint checkpoint = reader.getCheckpointMark(); @@ -249,7 +282,11 @@ public class PubsubUnboundedSourceTest { // Read second message. assertTrue(reader.advance()); - assertEquals("data_1", data(reader.getCurrent())); + assertEquals( + "data_1", + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); // Restore from checkpoint. byte[] checkpointBytes = @@ -262,7 +299,11 @@ public class PubsubUnboundedSourceTest { // Re-read second message. reader = primSource.createReader(p.getOptions(), checkpoint); assertTrue(reader.start()); - assertEquals("data_1", data(reader.getCurrent())); + assertEquals( + "data_1", + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId()))); // We are done. assertFalse(reader.advance()); @@ -278,7 +319,7 @@ public class PubsubUnboundedSourceTest { } @Test - public void readManyMessages() throws IOException { + public void readManyMessages() throws Exception { Map<String, Integer> dataToMessageNum = new HashMap<>(); final int m = 97; @@ -315,7 +356,10 @@ public class PubsubUnboundedSourceTest { // We'll checkpoint and ack within the 2min limit. now.addAndGet(30); pubsubClient.advance(); - String data = data(reader.getCurrent()); + String data = + data( + reader.getCurrent(), + !(primSource.outer.getNeedsAttributes() || primSource.outer.getNeedsMessageId())); Integer messageNum = dataToMessageNum.remove(data); // No duplicate messages. assertNotNull(messageNum);