This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new eba1ec36adc Bigtable: skip reading large rows (#37586)
eba1ec36adc is described below

commit eba1ec36adcac85d37ab7ea25986ebeec3dada47
Author: Mattie Fu <[email protected]>
AuthorDate: Fri Feb 13 11:51:31 2026 -0500

    Bigtable: skip reading large rows (#37586)
    
    * Bigtable: add experimental option to skip reading large rows in the 
pipeline
    
    * fix test
    
    * fix test
---
 .../io/gcp/bigtable/BigtableServiceFactory.java    |  7 +++-
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 31 ++++++++++++----
 .../beam/sdk/io/gcp/bigtable/BigtableReadIT.java   | 41 ++++++++++++++++++++++
 .../io/gcp/bigtable/BigtableServiceImplTest.java   |  3 +-
 4 files changed, 74 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
index 4c7805f6558..9933204d6f6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
@@ -54,6 +54,8 @@ class BigtableServiceFactory implements Serializable {
   private static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS =
       "bigtable_enable_client_side_metrics";
 
+  private static final String BIGTABLE_ENABLE_SKIP_LARGE_ROWS = 
"bigtable_enable_skip_large_rows";
+
   @AutoValue
   abstract static class ConfigId implements Serializable {
 
@@ -133,7 +135,10 @@ class BigtableServiceFactory implements Serializable {
         BigtableDataSettings.enableBuiltinMetrics();
       }
 
-      BigtableService service = new BigtableServiceImpl(settings);
+      boolean skipLargeRows =
+          ExperimentalOptions.hasExperiment(pipelineOptions, 
BIGTABLE_ENABLE_SKIP_LARGE_ROWS);
+
+      BigtableService service = new BigtableServiceImpl(settings, 
skipLargeRows);
       entry = BigtableServiceEntry.create(configId, service);
       entries.put(configId.id(), entry);
       refCounts.put(configId.id(), new AtomicInteger(1));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 3451bbf450c..f7aa50a7437 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -104,10 +104,17 @@ class BigtableServiceImpl implements BigtableService {
   private static final double WATERMARK_PERCENTAGE = .1;
   private static final long MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024; // 100MB
 
+  private final boolean skipLargeRows;
+
   BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
+    this(settings, false);
+  }
+
+  BigtableServiceImpl(BigtableDataSettings settings, boolean skipLargeRows) 
throws IOException {
     this.projectId = settings.getProjectId();
     this.instanceId = settings.getInstanceId();
     this.client = BigtableDataClient.create(settings);
+    this.skipLargeRows = skipLargeRows;
     LOG.info("Started Bigtable service with settings {}", settings);
   }
 
@@ -142,6 +149,7 @@ class BigtableServiceImpl implements BigtableService {
     private ServerStream<Row> stream;
 
     private boolean exhausted;
+    private final boolean skipLargeRows;
 
     @VisibleForTesting
     BigtableReaderImpl(
@@ -150,13 +158,15 @@ class BigtableServiceImpl implements BigtableService {
         String instanceId,
         String tableId,
         List<ByteKeyRange> ranges,
-        @Nullable RowFilter rowFilter) {
+        @Nullable RowFilter rowFilter,
+        boolean skipLargeRows) {
       this.client = client;
       this.projectId = projectId;
       this.instanceId = instanceId;
       this.tableId = tableId;
       this.ranges = ranges;
       this.rowFilter = rowFilter;
+      this.skipLargeRows = skipLargeRows;
     }
 
     @Override
@@ -173,11 +183,19 @@ class BigtableServiceImpl implements BigtableService {
       if (rowFilter != null) {
         query.filter(Filters.FILTERS.fromProto(rowFilter));
       }
+
       try {
-        stream =
-            client
-                .readRowsCallable(new BigtableRowProtoAdapter())
-                .call(query, GrpcCallContext.createDefault());
+        if (skipLargeRows) {
+          stream =
+              client
+                  .skipLargeRowsCallable(new BigtableRowProtoAdapter())
+                  .call(query, GrpcCallContext.createDefault());
+        } else {
+          stream =
+              client
+                  .readRowsCallable(new BigtableRowProtoAdapter())
+                  .call(query, GrpcCallContext.createDefault());
+        }
         results = stream.iterator();
         serviceCallMetric.call("ok");
       } catch (StatusRuntimeException e) {
@@ -667,7 +685,8 @@ class BigtableServiceImpl implements BigtableService {
           instanceId,
           source.getTableId().get(),
           source.getRanges(),
-          source.getRowFilter());
+          source.getRowFilter(),
+          skipLargeRows);
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 4ce9ad10b2c..1da38e6d083 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -27,6 +27,7 @@ import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.data.v2.BigtableDataClient;
 import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
 import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.TableId;
 import java.io.IOException;
 import java.util.Date;
 import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
@@ -34,6 +35,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.metrics.Lineage;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -148,6 +150,45 @@ public class BigtableReadIT {
     checkLineageSourceMetric(r, tableId);
   }
 
+  @Test
+  public void testE2EBigtableReadWithSkippingLargeRows() {
+    
tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(COLUMN_FAMILY_NAME));
+
+    // Write a few rows first
+    int numRows = 20;
+    int numLargeRows = 3;
+    // Each mutation can't exceed 100 MB. Break it down to 3 columns
+    String value = StringUtils.repeat("v", 90 * 1000 * 1000);
+    for (int i = 0; i < numLargeRows; i++) {
+      for (int j = 0; j < 3; j++) {
+        client.mutateRow(
+            RowMutation.create(TableId.of(tableId), "large_row-" + i)
+                .setCell(COLUMN_FAMILY_NAME, "q" + i, value));
+      }
+    }
+
+    for (int i = 0; i < numRows - numLargeRows; i++) {
+      client.mutateRow(
+          RowMutation.create(TableId.of(tableId), "row-" + i)
+              .setCell(COLUMN_FAMILY_NAME, "q", "value"));
+    }
+
+    ExperimentalOptions.addExperiment(
+        options.as(ExperimentalOptions.class), 
"bigtable_enable_skip_large_rows");
+
+    Pipeline p = Pipeline.create(options);
+    PCollection<Long> count =
+        p.apply(
+                BigtableIO.read()
+                    .withProjectId(project)
+                    .withInstanceId(options.getInstanceId())
+                    .withTableId(tableId))
+            .apply(Count.globally());
+    PAssert.thatSingleton(count).isEqualTo((long) numRows - numLargeRows);
+    PipelineResult r = p.run();
+    checkLineageSourceMetric(r, tableId);
+  }
+
   private void checkLineageSourceMetric(PipelineResult r, String tableId) {
     // TODO(https://github.com/apache/beam/issues/32071) test malformed,
     //   when pipeline.run() is non-blocking, the metrics are not available by 
the time of query
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index 0564493ca1a..37d7d89021d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -179,7 +179,8 @@ public class BigtableServiceImplTest {
             bigtableDataSettings.getInstanceId(),
             mockBigtableSource.getTableId().get(),
             mockBigtableSource.getRanges(),
-            null);
+            null,
+            false);
 
     underTest.start();
     Assert.assertEquals(expectedRow, underTest.getCurrentRow());

Reply via email to