ahmedabu98 commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1107774129


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received 
response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", 
readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",

Review Comment:
   `"Estimated bytes this read session will scan when all streams are 
consumed"` ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received 
response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", 
readSession.getStreamsList().size());

Review Comment:
   Is there a reason these are split into two logs? Combining may be more 
readable.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1527,233 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, 
jobIdTokenView));
     }
 
+    private PCollectionTuple createTupleForDirectRead(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<ReadStream> readStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, ReadStream>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws 
Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table 
holding the results.
+                          // The getTargetTable call runs a new instance of 
the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = 
querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a 
desired stream count and
+                          // let the BigQuery storage server pick the number 
of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              
BigQueryHelpers.toTableResourceName(
+                                                  
queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) 
{
+                            readSession = 
storageClient.createReadSession(request);
+                          }
+
+                          for (ReadStream readStream : 
readSession.getStreamsList()) {
+                            c.output(readStream);
+                          }
+
+                          c.output(readSessionTag, readSession);
+                          c.output(
+                              tableSchemaTag,
+                              
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+                        }
+                      })
+                  .withOutputTags(
+                      readStreamsTag, 
TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+      return tuple;
+    }
+
+    private PCollectionTuple createTupleForDirectReadWithStreamBundle(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<List<ReadStream>> listReadStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, List<ReadStream>>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws 
Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table 
holding the results.
+                          // The getTargetTable call runs a new instance of 
the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = 
querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a 
desired stream count and
+                          // let the BigQuery storage server pick the number 
of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              
BigQueryHelpers.toTableResourceName(
+                                                  
queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) 
{
+                            readSession = 
storageClient.createReadSession(request);
+                          }
+                          int streamIndex = 0;
+                          int streamsPerBundle = 10;
+                          List<ReadStream> streamBundle = Lists.newArrayList();
+                          for (ReadStream readStream : 
readSession.getStreamsList()) {
+                            streamIndex++;
+                            streamBundle.add(readStream);
+                            if (streamIndex % streamsPerBundle == 0) {
+                              c.output(streamBundle);
+                              streamBundle = Lists.newArrayList();
+                            }
+                          }
+

Review Comment:
   ```suggestion
                               if (streamIndex % streamsPerBundle == 0) {
                                 c.output(streamBundle);
                                 streamBundle = Lists.newArrayList();
                               }
                             }
                             if (streamIndex % streamsPerBundle != 0) {
                               c.output(streamBundle);
                             }
   ```
   
   Should also account for the last `streamBundle` that may not have a perfect 
10 readStreams.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received 
response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", 
readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / 
readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);

Review Comment:
   `"Estimated bytes each ReadStream will consume"`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
     try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
       readSession = client.createReadSession(createReadSessionRequest);
       LOG.info(
-          "Sent BigQuery Storage API CreateReadSession request '{}'; received 
response '{}'.",
-          createReadSessionRequest,
-          readSession);
+          "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+          createReadSessionRequest);
+      LOG.info(
+          "Received number of streams in response: '{}'.", 
readSession.getStreamsList().size());
     }
 
     if (readSession.getStreamsList().isEmpty()) {
       // The underlying table is empty or all rows have been pruned.
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "readSession.getEstimatedTotalBytesScanned(): '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / 
readSession.getStreamsCount();
+        LOG.info("bytesPerStream: '{}'", bytesPerStream);
+        streamsPerBundle = (int) Math.ceil(desiredBundleSizeBytes / 
bytesPerStream);
+      } else {
+        streamsPerBundle = (int) Math.ceil((double) streamCount / 10);
+      }
+      streamsPerBundle = Math.min(streamCount, streamsPerBundle);
+      LOG.info("streamsPerBundle: '{}'", streamsPerBundle);

Review Comment:
   `"Distributing {} ReadStreams per bundle"`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.beam.sdk.io.Source} representing a bundle of Streams in 
a BigQuery ReadAPI
+ * Session. This Source ONLY supports splitting at the StreamBundle level.
+ *
+ * <p>{@link BigQueryStorageStreamBundleSource} defines a split-point as the 
starting offset of each
+ * Stream. As a result, the number of valid split points in the Source is 
equal to the number of
+ * Streams in the StreamBundle and this Source does NOT support sub-Stream 
splitting.
+ *
+ * <p>Additionally, the underlying {@link 
org.apache.beam.sdk.io.range.OffsetRangeTracker} and
+ * {@link OffsetBasedSource} operate in the split point space and do NOT 
directly interact with the
+ * Streams constituting the StreamBundle. Consequently, fractional values used 
in
+ * `splitAtFraction()` are translated into StreamBundleIndices and the 
underlying RangeTracker
+ * handles the split operation by checking the validity of the split point. 
This has the following
+ * implications for the `splitAtFraction()` operation:
+ *
+ * <p>1. Fraction values that point to the "middle" of a Stream will be 
translated to the
+ * appropriate Stream boundary by the RangeTracker.
+ *
+ * <p>2. Once a Stream is being read from, the RangeTracker will only accept 
`splitAtFraction()`
+ * calls that point to StreamBundleIndices that are greater than the 
StreamBundleIndex of the
+ * current Stream
+ *
+ * @param <T> Type of records represented by the source.
+ * @see OffsetBasedSource
+ * @see org.apache.beam.sdk.io.range.OffsetRangeTracker
+ * @see org.apache.beam.sdk.io.BlockBasedSource (semantically similar to {@link
+ *     BigQueryStorageStreamBundleSource})
+ */
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, 
"tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a 
different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> 
newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, 
"readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, 
"streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, 
"jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, 
"outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, 
"bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", 
readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", 
readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", 
readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid 
sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // A stream source can't be split without reading from it due to 
server-side liquid sharding.
+    // TODO: Implement dynamic work rebalancing.
+    return ImmutableList.of(this);
+  }

Review Comment:
   Would it be useful to implement this method?  The stream bundle could be 
split down the same way it's done in BigQueryStorageSourceBase.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,18 +205,32 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + 
readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where 
it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), 
sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, 
bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    }
+    List<ReadStream> streamBundle = Lists.newArrayList();
+    List<BigQueryStorageStreamBundleSource<T>> sources = Lists.newArrayList();
     for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, 
bqServices));
+      streamIndex++;
+      streamBundle.add(readStream);
+      if (streamIndex % streamsPerBundle == 0) {
+        sources.add(
+            BigQueryStorageStreamBundleSource.create(
+                readSession, streamBundle, trimmedSchema, parseFn, 
outputCoder, bqServices, 1L));
+        streamBundle = Lists.newArrayList();
+      }

Review Comment:
   Similarly, add the last `streamBundle` to `sources` if it didn't make the 
`streamsPerBundle` threshold.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to