This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 70e41f4ebef Add docs and warning about using SpannerIO.Read in 
streaming pipelines (#29601)
70e41f4ebef is described below

commit 70e41f4ebef55667943616f322708be950e91ddf
Author: Niel Markwick <ni...@users.noreply.github.com>
AuthorDate: Tue Dec 5 20:59:40 2023 +0100

    Add docs and warning about using SpannerIO.Read in streaming pipelines 
(#29601)
    
    * Add docs and warning about using SpannerIO.Read in streaming.
    
    Add more documentation around SpannerIO.Read and .ReadAll
    explaining PartitionedRead API, batching, and how it should
    not be used for unbounded reads in Streaming pipelines.
    
    Add a warning if SpannerIO.ReadAll is applied to an unbounded input.
---
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  | 87 +++++++++++++++++++---
 1 file changed, 76 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 786fa91f558..b6ec8097a5f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -142,12 +142,15 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Reading from Cloud Spanner</h3>
  *
- * <p>To read from Cloud Spanner, apply {@link Read} transformation. It will 
return a {@link
- * PCollection} of {@link Struct Structs}, where each element represents an 
individual row returned
- * from the read operation. Both Query and Read APIs are supported. See more 
information about <a
+ * <h4>Bulk reading of a single query or table</h4>
+ *
+ * <p>To perform a single read from Cloud Spanner, construct a {@link Read} 
transform using {@link
+ * SpannerIO#read() SpannerIO.read()}. It will return a {@link PCollection} of 
{@link Struct
+ * Structs}, where each element represents an individual row returned from the 
read operation. Both
+ * Query and Read APIs are supported. See more information about <a
  * href="https://cloud.google.com/spanner/docs/reads";>reading from Cloud 
Spanner</a>
  *
- * <p>To execute a <strong>query</strong>, specify a {@link 
Read#withQuery(Statement)} or {@link
+ * <p>To execute a <strong>Query</strong>, specify a {@link 
Read#withQuery(Statement)} or {@link
  * Read#withQuery(String)} during the construction of the transform.
  *
  * <pre>{@code
@@ -158,8 +161,17 @@ import org.slf4j.LoggerFactory;
  *         .withQuery("SELECT id, name, email FROM users"));
  * }</pre>
  *
- * <p>To use the Read API, specify a {@link Read#withTable(String) table name} 
and a {@link
- * Read#withColumns(List) list of columns}.
+ * <p>Reads by default use the <a
+ * 
href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel";>PartitionQuery
 API</a>
+ * which enforces some limitations on the type of queries that can be used so 
that the data can be
+ * read in parallel. If the query is not supported by the PartitionQuery API, 
then you can specify a
+ * non-partitioned read by setting {@link Read#withBatching(boolean) 
withBatching(false)}. If the
+ * amount of data being read by a non-partitioned read is very large, it may 
be useful to add a
+ * {@link Reshuffle#viaRandomKey()} transform on the output so that the 
downstream transforms can
+ * run in parallel.
+ *
+ * <p>To read an entire <strong>Table</strong>, use {@link 
Read#withTable(String)} and optionally
+ * specify a {@link Read#withColumns(List) list of columns}.
  *
  * <pre>{@code
  * PCollection<Struct> rows = p.apply(
@@ -170,13 +182,26 @@ import org.slf4j.LoggerFactory;
  *        .withColumns("id", "name", "email"));
  * }</pre>
  *
- * <p>To optimally read using index, specify the index name using {@link 
Read#withIndex}.
+ * <p>To read using an <strong>Index</strong>, specify the index name using 
{@link
+ * Read#withIndex(String)}.
+ *
+ * <pre>{@code
+ * PCollection<Struct> rows = p.apply(
+ *    SpannerIO.read()
+ *        .withInstanceId(instanceId)
+ *        .withDatabaseId(dbId)
+ *        .withTable("users")
+ *        .withIndex("users_by_name")
+ *        .withColumns("id", "name", "email"));
+ * }</pre>
+ *
+ * <h4>Read consistency</h4>
  *
  * <p>The transform is guaranteed to be executed on a consistent snapshot of 
data, utilizing the
  * power of read only transactions. Staleness of data can be controlled using 
{@link
  * Read#withTimestampBound} or {@link Read#withTimestamp(Timestamp)} methods. 
<a
- * href="https://cloud.google.com/spanner/docs/transactions";>Read more</a> 
about transactions in
- * Cloud Spanner.
+ * 
href="https://cloud.google.com/spanner/docs/transactions#read-only_transactions";>Read
 more</a>
+ * about transactions in Cloud Spanner.
  *
  * <p>It is possible to read several {@link PCollection PCollections} within a 
single transaction.
  * Apply {@link SpannerIO#createTransaction()} transform, that lazily creates 
a transaction. The
@@ -204,6 +229,29 @@ import org.slf4j.LoggerFactory;
  *        .withTransaction(tx));
  * }</pre>
  *
+ * <h4>Bulk reading of multiple queries or tables</h4>
+ *
+ * You can perform multiple consistent reads on a set of tables or using a set 
of queries by
+ * constructing a {@link ReadAll} transform using {@link SpannerIO#readAll() 
SpannerIO.readAll()}.
+ * This transform takes a {@link PCollection} of {@link ReadOperation} 
elements, and performs the
+ * partitioned read on each of them using the same Read Only Transaction for 
consistent results.
+ *
+ * <p>Note that this transform should <strong>not</strong> be used in 
Streaming pipelines. This is
+ * because the same Read Only Transaction, which is created once when the 
pipeline is first
+ * executed, will be used for all reads. The data being read will therefore 
become stale, and if no
+ * reads are made for more than 1 hour, the transaction will automatically 
timeout and be closed by
+ * the Spanner server, meaning that any subsequent reads will fail.
+ *
+ * <pre>{@code
+ * // Build a collection of ReadOperations.
+ * PCollection<ReadOperation> reads = ...
+ *
+ * PCollection<Struct> rows = reads.apply(
+ *     SpannerIO.readAll()
+ *         .withInstanceId(instanceId)
+ *         .withDatabaseId(dbId)
+ * }</pre>
+ *
  * <h3>Writing to Cloud Spanner</h3>
  *
  * <p>The Cloud Spanner {@link Write} transform writes to Cloud Spanner by 
executing a collection of
@@ -362,6 +410,12 @@ import org.slf4j.LoggerFactory;
  * <p>{@link Write} can be used as a streaming sink, however as with batch 
mode note that the write
  * order of individual {@link Mutation}/{@link MutationGroup} objects is not 
guaranteed.
  *
+ * <p>{@link Read} and {@link ReadAll} can be used in Streaming pipelines to 
read a set of Facts on
+ * pipeline startup.
+ *
+ * <p>{@link ReadAll} should not be used on an unbounded {@code 
PCollection<ReadOperation>}, for the
+ * reasons stated above.
+ *
  * <h3>Updates to the I/O connector code</h3>
  *
  * For any significant significant updates to this I/O connector, please 
consider involving
@@ -564,8 +618,10 @@ public class SpannerIO {
     }
 
     /**
-     * By default Batch API is used to read data from Cloud Spanner. It is 
useful to disable
-     * batching when the underlying query is not root-partitionable.
+     * By default the <a
+     * 
href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel";>PartitionQuery
+     * API</a> is used to read data from Cloud Spanner. It is useful to 
disable batching when the
+     * underlying query is not root-partitionable.
      */
     public ReadAll withBatching(boolean batching) {
       return toBuilder().setBatching(batching).build();
@@ -585,6 +641,15 @@ public class SpannerIO {
 
     @Override
     public PCollection<Struct> expand(PCollection<ReadOperation> input) {
+
+      if (PCollection.IsBounded.UNBOUNDED == input.isBounded()) {
+        // Warn that SpannerIO.ReadAll should not be used on unbounded inputs.
+        LOG.warn(
+            "SpannerIO.ReadAll({}) is being applied to an unbounded input. "
+                + "This is not supported and can lead to runtime failures.",
+            this.getName());
+      }
+
       PTransform<PCollection<ReadOperation>, PCollection<Struct>> 
readTransform;
       if (getBatching()) {
         readTransform =

Reply via email to