This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ce71caf  [BEAM-14124] Add display data to BQ storage reads.
     new 57c8647  Merge pull request #17115 from ibzib/bq-display-data
ce71caf is described below

commit ce71cafc4801474ba7791a194200e4599b0b9e33
Author: Kyle Weaver <kcwea...@google.com>
AuthorDate: Thu Mar 17 13:12:35 2022 -0700

    [BEAM-14124] Add display data to BQ storage reads.
    
    Add display data for "Selected fields" and "Projection pushdown
    applied". I also want to add one for "Number of fields pushed down", but
    that will be a little more involved so I'll do it in a separate PR.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 26 ++++++++++-
 .../gcp/bigquery/BigQueryStorageTableSource.java   | 52 +++++++++++++++++-----
 .../io/gcp/bigquery/BigQueryIOStorageReadTest.java |  5 +++
 3 files changed, 70 insertions(+), 13 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 9786d90..c510db7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -590,6 +590,7 @@ public class BigQueryIO {
         .setMethod(TypedRead.Method.DEFAULT)
         .setUseAvroLogicalTypes(false)
         .setFormat(DataFormat.AVRO)
+        .setProjectionPushdownApplied(false)
         .build();
   }
 
@@ -805,6 +806,8 @@ public class BigQueryIO {
       abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn);
 
       abstract Builder<T> setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);
+
+      abstract Builder<T> setProjectionPushdownApplied(boolean 
projectionPushdownApplied);
     }
 
     abstract @Nullable ValueProvider<String> getJsonTableRef();
@@ -853,6 +856,8 @@ public class BigQueryIO {
 
     abstract Boolean getUseAvroLogicalTypes();
 
+    abstract boolean getProjectionPushdownApplied();
+
     /**
      * An enumeration type for the priority of a query.
      *
@@ -1229,7 +1234,8 @@ public class BigQueryIO {
                     getRowRestriction(),
                     getParseFn(),
                     outputCoder,
-                    getBigQueryServices())));
+                    getBigQueryServices(),
+                    getProjectionPushdownApplied())));
       }
 
       checkArgument(
@@ -1430,6 +1436,10 @@ public class BigQueryIO {
               DisplayData.item("table", 
BigQueryHelpers.displayTable(getTableProvider()))
                   .withLabel("Table"))
           .addIfNotNull(DisplayData.item("query", 
getQuery()).withLabel("Query"))
+          .addIfNotDefault(
+              DisplayData.item("projectionPushdownApplied", 
getProjectionPushdownApplied())
+                  .withLabel("Projection Pushdown Applied"),
+              false)
           .addIfNotNull(
               DisplayData.item("flattenResults", getFlattenResults())
                   .withLabel("Flatten Query Results"))
@@ -1438,6 +1448,13 @@ public class BigQueryIO {
                   .withLabel("Use Legacy SQL Dialect"))
           .addIfNotDefault(
               DisplayData.item("validation", 
getValidate()).withLabel("Validation Enabled"), true);
+
+      ValueProvider<List<String>> selectedFieldsProvider = getSelectedFields();
+      if (selectedFieldsProvider != null && 
selectedFieldsProvider.isAccessible()) {
+        builder.add(
+            DisplayData.item("selectedFields", String.join(", ", 
selectedFieldsProvider.get()))
+                .withLabel("Selected Fields"));
+      }
     }
 
     /** Ensures that methods of the from() / fromQuery() family are called at 
most once. */
@@ -1623,6 +1640,11 @@ public class BigQueryIO {
       return toBuilder().setUseAvroLogicalTypes(true).build();
     }
 
+    @VisibleForTesting
+    TypedRead<T> withProjectionPushdownApplied() {
+      return toBuilder().setProjectionPushdownApplied(true).build();
+    }
+
     @Override
     public boolean supportsProjectionPushdown() {
       // We can't do projection pushdown when a query is set. The query may 
project certain fields
@@ -1643,7 +1665,7 @@ public class BigQueryIO {
           outputFields.keySet());
       ImmutableList<String> fields =
           ImmutableList.copyOf(fieldAccessDescriptor.fieldNamesAccessed());
-      return withSelectedFields(fields);
+      return withSelectedFields(fields).withProjectionPushdownApplied();
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 2a14cd5..c53cab3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -45,6 +45,11 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryStorageTableSource.class);
 
+  private final ValueProvider<TableReference> tableReferenceProvider;
+  private final boolean projectionPushdownApplied;
+
+  private transient AtomicReference<Table> cachedTable;
+
   public static <T> BigQueryStorageTableSource<T> create(
       ValueProvider<TableReference> tableRefProvider,
       DataFormat format,
@@ -52,9 +57,17 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
       @Nullable ValueProvider<String> rowRestriction,
       SerializableFunction<SchemaAndRecord, T> parseFn,
       Coder<T> outputCoder,
-      BigQueryServices bqServices) {
+      BigQueryServices bqServices,
+      boolean projectionPushdownApplied) {
     return new BigQueryStorageTableSource<>(
-        tableRefProvider, format, selectedFields, rowRestriction, parseFn, 
outputCoder, bqServices);
+        tableRefProvider,
+        format,
+        selectedFields,
+        rowRestriction,
+        parseFn,
+        outputCoder,
+        bqServices,
+        projectionPushdownApplied);
   }
 
   public static <T> BigQueryStorageTableSource<T> create(
@@ -65,13 +78,16 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
       Coder<T> outputCoder,
       BigQueryServices bqServices) {
     return new BigQueryStorageTableSource<>(
-        tableRefProvider, null, selectedFields, rowRestriction, parseFn, 
outputCoder, bqServices);
+        tableRefProvider,
+        null,
+        selectedFields,
+        rowRestriction,
+        parseFn,
+        outputCoder,
+        bqServices,
+        false);
   }
 
-  private final ValueProvider<TableReference> tableReferenceProvider;
-
-  private transient AtomicReference<Table> cachedTable;
-
   private BigQueryStorageTableSource(
       ValueProvider<TableReference> tableRefProvider,
       DataFormat format,
@@ -79,9 +95,11 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
       @Nullable ValueProvider<String> rowRestriction,
       SerializableFunction<SchemaAndRecord, T> parseFn,
       Coder<T> outputCoder,
-      BigQueryServices bqServices) {
+      BigQueryServices bqServices,
+      boolean projectionPushdownApplied) {
     super(format, selectedFields, rowRestriction, parseFn, outputCoder, 
bqServices);
     this.tableReferenceProvider = checkNotNull(tableRefProvider, 
"tableRefProvider");
+    this.projectionPushdownApplied = projectionPushdownApplied;
     cachedTable = new AtomicReference<>();
   }
 
@@ -93,9 +111,21 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.addIfNotNull(
-        DisplayData.item("table", 
BigQueryHelpers.displayTable(tableReferenceProvider))
-            .withLabel("Table"));
+    builder
+        .addIfNotNull(
+            DisplayData.item("table", 
BigQueryHelpers.displayTable(tableReferenceProvider))
+                .withLabel("Table"))
+        .addIfNotDefault(
+            DisplayData.item("projectionPushdownApplied", 
projectionPushdownApplied)
+                .withLabel("Projection Pushdown Applied"),
+            false);
+
+    if (selectedFieldsProvider != null && 
selectedFieldsProvider.isAccessible()) {
+      builder.add(
+          DisplayData.item("selectedFields", String.join(", ", 
selectedFieldsProvider.get()))
+              .withLabel("Selected Fields"));
+    }
+
     // Note: This transform does not set launchesBigQueryJobs because it 
doesn't launch
     // BigQuery jobs, but instead uses the storage api to directly read the 
table.
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 8f8ab86..e9a98a5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -279,9 +279,13 @@ public class BigQueryIOStorageReadTest {
         BigQueryIO.read(new TableRowParser())
             .withCoder(TableRowJsonCoder.of())
             .withMethod(Method.DIRECT_READ)
+            .withSelectedFields(ImmutableList.of("foo", "bar"))
+            .withProjectionPushdownApplied()
             .from(tableSpec);
     DisplayData displayData = DisplayData.from(typedRead);
     assertThat(displayData, hasDisplayItem("table", tableSpec));
+    assertThat(displayData, hasDisplayItem("selectedFields", "foo, bar"));
+    assertThat(displayData, hasDisplayItem("projectionPushdownApplied", true));
   }
 
   @Test
@@ -2097,6 +2101,7 @@ public class BigQueryIOStorageReadTest {
     TypedRead<Row> pushdownRead = (TypedRead<Row>) pushdownT;
     assertEquals(Method.DIRECT_READ, pushdownRead.getMethod());
     assertThat(pushdownRead.getSelectedFields().get(), 
Matchers.containsInAnyOrder("foo"));
+    assertTrue(pushdownRead.getProjectionPushdownApplied());
   }
 
   @Test

Reply via email to