This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 857eccedc55 Refactor PubsubIO Lineage metrics to work with all runners (#32319) 857eccedc55 is described below commit 857eccedc5544ed920e9d445d18b1de74843488d Author: Yi Hu <ya...@google.com> AuthorDate: Tue Aug 27 15:03:52 2024 -0400 Refactor PubsubIO Lineage metrics to work with all runners (#32319) * Move Lineage report outside of PubsubUnboundedSource * Move Lineage report outside of PubsubUnboundedSink --- .../java/org/apache/beam/sdk/metrics/Lineage.java | 6 ++- .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java | 10 +++++ .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 47 +++++++++++++++++++--- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 12 ------ .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 14 ------- 5 files changed, 56 insertions(+), 33 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 302ae4f2fef..07fda807bd4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -130,7 +130,11 @@ public class Lineage { .build(); Set<String> result = new HashSet<>(); for (MetricResult<StringSetResult> metrics : results.queryMetrics(filter).getStringSets()) { - result.addAll(metrics.getCommitted().getStringSet()); + try { + result.addAll(metrics.getCommitted().getStringSet()); + } catch (UnsupportedOperationException unused) { + // MetricsResult.getCommitted throws this exception when runner support missing, just skip. + } result.addAll(metrics.getAttempted().getStringSet()); } return result; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java index 47033451ab8..521e65b934b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import javax.naming.SizeLimitExceededException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; @@ -43,6 +44,8 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage> private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction; @Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction; + /** Last TopicPath that reported Lineage. */ + private transient @Nullable String reportedLineage; private final BadRecordRouter badRecordRouter; @@ -165,6 +168,13 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage> return; } } + String topic = message.getTopic(); + // topic shouldn't be null, but lineage report is fail-safe + if (topic != null && !topic.equals(reportedLineage)) { + Lineage.getSinks() + .add("pubsub", "topic", PubsubClient.topicPathFromPath(topic).getDataCatalogSegments()); + reportedLineage = topic; + } try { validatePubsubMessageSize(message, maxPublishBatchSize); } catch (SizeLimitExceededException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 8b582c1054f..b561b4711d5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -1148,6 +1149,20 @@ public class PubsubIO { getNeedsOrderingKey()); PCollection<PubsubMessage> preParse = input.apply(source); + return expandReadContinued(preParse, topicPath, subscriptionPath); + } + + /** + * Runner agnostic part of the Expansion. + * + * <p>Common logics (MapElements, SDK metrics, DLQ, etc) live here as PubsubUnboundedSource is + * overridden on Dataflow runner. + */ + private PCollection<T> expandReadContinued( + PCollection<PubsubMessage> preParse, + @Nullable ValueProvider<TopicPath> topicPath, + @Nullable ValueProvider<SubscriptionPath> subscriptionPath) { + TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {}; PCollection<T> read; if (getDeadLetterTopicProvider() == null @@ -1174,7 +1189,7 @@ public class PubsubIO { "Map Failures To BadRecords", ParDo.of(new ParseReadFailuresToBadRecords(preParse.getCoder()))); getBadRecordErrorHandler() - .addErrorCollection(badRecords.setCoder(BadRecord.getCoder(input.getPipeline()))); + .addErrorCollection(badRecords.setCoder(BadRecord.getCoder(preParse.getPipeline()))); } else { // Write out failures to the provided dead-letter topic. result @@ -1215,7 +1230,31 @@ public class PubsubIO { .withClientFactory(getPubsubClientFactory())); } } - + // report Lineage once + preParse + .getPipeline() + .apply(Impulse.create()) + .apply( + ParDo.of( + new DoFn<byte[], Void>() { + @ProcessElement + public void process() { + if (topicPath != null) { + TopicPath topic = topicPath.get(); + if (topic != null) { + Lineage.getSources() + .add("pubsub", "topic", topic.getDataCatalogSegments()); + } + } + if (subscriptionPath != null) { + SubscriptionPath sub = subscriptionPath.get(); + if (sub != null) { + Lineage.getSources() + .add("pubsub", "subscription", sub.getDataCatalogSegments()); + } + } + } + })); return read.setCoder(getCoder()); } @@ -1623,10 +1662,6 @@ public class PubsubIO { for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) { publish(entry.getKey(), entry.getValue().messages); } - // Report lineage for all topics seen - for (PubsubTopic topic : output.keySet()) { - Lineage.getSinks().add("pubsub", "topic", topic.dataCatalogSegments()); - } output = null; pubsubClient.close(); pubsubClient = null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 38d77aa3aac..aa8e3a41148 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.options.ValueProvider; @@ -232,9 +231,6 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, /** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */ private transient @Nullable PubsubClient pubsubClient; - /** Last TopicPath that reported Lineage. */ - private transient @Nullable TopicPath reportedLineage; - private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches"); private final Counter elementCounter = SinkMetrics.elementsWritten(); private final Counter byteCounter = SinkMetrics.bytesWritten(); @@ -294,14 +290,6 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, batchCounter.inc(); elementCounter.inc(messages.size()); byteCounter.inc(bytes); - // Report Lineage multiple once for same topic - if (!topicPath.equals(reportedLineage)) { - List<String> segments = topicPath.getDataCatalogSegments(); - if (segments.size() != 0) { - Lineage.getSinks().add("pubsub", "topic", segments); - } - reportedLineage = topicPath; - } } @StartBundle diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 95fa5c22341..b9a554d54ad 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -56,7 +56,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -1042,19 +1041,6 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub splitSource = new PubsubSource( outer, StaticValueProvider.of(outer.createRandomSubscription(options))); - TopicPath topic = outer.getTopic(); - if (topic != null) { - // is initial split on Read.fromTopic, report Lineage based on topic - Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments()); - } - } else { - if (subscriptionPath.equals(outer.getSubscriptionProvider())) { - SubscriptionPath sub = subscriptionPath.get(); - if (sub != null) { - // is a split on Read.fromSubscription - Lineage.getSources().add("pubsub", "subscription", sub.getDataCatalogSegments()); - } - } } for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) { // Since the source is immutable and Pubsub automatically shards we simply