This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 857eccedc55 Refactor PubsubIO Lineage metrics to work with all runners 
(#32319)
857eccedc55 is described below

commit 857eccedc5544ed920e9d445d18b1de74843488d
Author: Yi Hu <ya...@google.com>
AuthorDate: Tue Aug 27 15:03:52 2024 -0400

    Refactor PubsubIO Lineage metrics to work with all runners (#32319)
    
    * Move Lineage report outside of PubsubUnboundedSource
    
    * Move Lineage report outside of PubsubUnboundedSink
---
 .../java/org/apache/beam/sdk/metrics/Lineage.java  |  6 ++-
 .../sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java  | 10 +++++
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    | 47 +++++++++++++++++++---
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java     | 12 ------
 .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java   | 14 -------
 5 files changed, 56 insertions(+), 33 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index 302ae4f2fef..07fda807bd4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -130,7 +130,11 @@ public class Lineage {
             .build();
     Set<String> result = new HashSet<>();
     for (MetricResult<StringSetResult> metrics : 
results.queryMetrics(filter).getStringSets()) {
-      result.addAll(metrics.getCommitted().getStringSet());
+      try {
+        result.addAll(metrics.getCommitted().getStringSet());
+      } catch (UnsupportedOperationException unused) {
+        // MetricsResult.getCommitted throws this exception when runner 
support missing, just skip.
+      }
       result.addAll(metrics.getAttempted().getStringSet());
     }
     return result;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
index 47033451ab8..521e65b934b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import javax.naming.SizeLimitExceededException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
@@ -43,6 +44,8 @@ public class PreparePubsubWriteDoFn<InputT> extends 
DoFn<InputT, PubsubMessage>
 
   private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> 
formatFunction;
   @Nullable SerializableFunction<ValueInSingleWindow<InputT>, 
PubsubIO.PubsubTopic> topicFunction;
+  /** Last TopicPath that reported Lineage. */
+  private transient @Nullable String reportedLineage;
 
   private final BadRecordRouter badRecordRouter;
 
@@ -165,6 +168,13 @@ public class PreparePubsubWriteDoFn<InputT> extends 
DoFn<InputT, PubsubMessage>
         return;
       }
     }
+    String topic = message.getTopic();
+    // topic shouldn't be null, but lineage report is fail-safe
+    if (topic != null && !topic.equals(reportedLineage)) {
+      Lineage.getSinks()
+          .add("pubsub", "topic", 
PubsubClient.topicPathFromPath(topic).getDataCatalogSegments());
+      reportedLineage = topic;
+    }
     try {
       validatePubsubMessageSize(message, maxPublishBatchSize);
     } catch (SizeLimitExceededException e) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 8b582c1054f..b561b4711d5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -56,6 +56,7 @@ import 
org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -1148,6 +1149,20 @@ public class PubsubIO {
               getNeedsOrderingKey());
 
       PCollection<PubsubMessage> preParse = input.apply(source);
+      return expandReadContinued(preParse, topicPath, subscriptionPath);
+    }
+
+    /**
+     * Runner agnostic part of the Expansion.
+     *
+     * <p>Common logics (MapElements, SDK metrics, DLQ, etc) live here as 
PubsubUnboundedSource is
+     * overridden on Dataflow runner.
+     */
+    private PCollection<T> expandReadContinued(
+        PCollection<PubsubMessage> preParse,
+        @Nullable ValueProvider<TopicPath> topicPath,
+        @Nullable ValueProvider<SubscriptionPath> subscriptionPath) {
+
       TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
       PCollection<T> read;
       if (getDeadLetterTopicProvider() == null
@@ -1174,7 +1189,7 @@ public class PubsubIO {
                       "Map Failures To BadRecords",
                       ParDo.of(new 
ParseReadFailuresToBadRecords(preParse.getCoder())));
           getBadRecordErrorHandler()
-              
.addErrorCollection(badRecords.setCoder(BadRecord.getCoder(input.getPipeline())));
+              
.addErrorCollection(badRecords.setCoder(BadRecord.getCoder(preParse.getPipeline())));
         } else {
           // Write out failures to the provided dead-letter topic.
           result
@@ -1215,7 +1230,31 @@ public class PubsubIO {
                       .withClientFactory(getPubsubClientFactory()));
         }
       }
-
+      // report Lineage once
+      preParse
+          .getPipeline()
+          .apply(Impulse.create())
+          .apply(
+              ParDo.of(
+                  new DoFn<byte[], Void>() {
+                    @ProcessElement
+                    public void process() {
+                      if (topicPath != null) {
+                        TopicPath topic = topicPath.get();
+                        if (topic != null) {
+                          Lineage.getSources()
+                              .add("pubsub", "topic", 
topic.getDataCatalogSegments());
+                        }
+                      }
+                      if (subscriptionPath != null) {
+                        SubscriptionPath sub = subscriptionPath.get();
+                        if (sub != null) {
+                          Lineage.getSources()
+                              .add("pubsub", "subscription", 
sub.getDataCatalogSegments());
+                        }
+                      }
+                    }
+                  }));
       return read.setCoder(getCoder());
     }
 
@@ -1623,10 +1662,6 @@ public class PubsubIO {
         for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
           publish(entry.getKey(), entry.getValue().messages);
         }
-        // Report lineage for all topics seen
-        for (PubsubTopic topic : output.keySet()) {
-          Lineage.getSinks().add("pubsub", "topic", 
topic.dataCatalogSegments());
-        }
         output = null;
         pubsubClient.close();
         pubsubClient = null;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 38d77aa3aac..aa8e3a41148 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -41,7 +41,6 @@ import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -232,9 +231,6 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
     /** Client on which to talk to Pubsub. Null until created by {@link 
#startBundle}. */
     private transient @Nullable PubsubClient pubsubClient;
 
-    /** Last TopicPath that reported Lineage. */
-    private transient @Nullable TopicPath reportedLineage;
-
     private final Counter batchCounter = Metrics.counter(WriterFn.class, 
"batches");
     private final Counter elementCounter = SinkMetrics.elementsWritten();
     private final Counter byteCounter = SinkMetrics.bytesWritten();
@@ -294,14 +290,6 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
       batchCounter.inc();
       elementCounter.inc(messages.size());
       byteCounter.inc(bytes);
-      // Report Lineage multiple once for same topic
-      if (!topicPath.equals(reportedLineage)) {
-        List<String> segments = topicPath.getDataCatalogSegments();
-        if (segments.size() != 0) {
-          Lineage.getSinks().add("pubsub", "topic", segments);
-        }
-        reportedLineage = topicPath;
-      }
     }
 
     @StartBundle
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 95fa5c22341..b9a554d54ad 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -56,7 +56,6 @@ import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
 import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
 import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -1042,19 +1041,6 @@ public class PubsubUnboundedSource extends 
PTransform<PBegin, PCollection<Pubsub
         splitSource =
             new PubsubSource(
                 outer, 
StaticValueProvider.of(outer.createRandomSubscription(options)));
-        TopicPath topic = outer.getTopic();
-        if (topic != null) {
-          // is initial split on Read.fromTopic, report Lineage based on topic
-          Lineage.getSources().add("pubsub", "source", 
topic.getDataCatalogSegments());
-        }
-      } else {
-        if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
-          SubscriptionPath sub = subscriptionPath.get();
-          if (sub != null) {
-            // is a split on Read.fromSubscription
-            Lineage.getSources().add("pubsub", "subscription", 
sub.getDataCatalogSegments());
-          }
-        }
       }
       for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
         // Since the source is immutable and Pubsub automatically shards we 
simply

Reply via email to