This is an automated email from the ASF dual-hosted git repository.

sniemitz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cce412e302 Ensure that BigtableIO closes the stream after reading 
(#31478)
9cce412e302 is described below

commit 9cce412e30257903ab60490a16f7dbe71404f729
Author: Justin Uang <[email protected]>
AuthorDate: Mon Jun 3 15:33:43 2024 -0400

    Ensure that BigtableIO closes the stream after reading (#31478)
    
    * Close the stream correctly
    
    * Add changes
    
    * Fix formatting
    
    * Implement close in FakeBigtableIOReader
    
    * format
---
 CHANGES.md                                         |  1 +
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       |  1 +
 .../beam/sdk/io/gcp/bigtable/BigtableService.java  |  2 ++
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 23 +++++++++++++++++++---
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   |  3 +++
 5 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 18289e322d9..91bdfef6916 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,7 @@
 ## I/Os
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Ensure that BigtableIO closes the reader streams 
([#31477](https://github.com/apache/beam/issues/31477)).
 
 ## New Features / Improvements
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1ed70024f59..4feccd397d7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -1953,6 +1953,7 @@ public class BigtableIO {
     public void close() throws IOException {
       LOG.info("Closing reader after reading {} records.", recordsReturned);
       if (reader != null) {
+        reader.close();
         reader = null;
       }
       if (serviceEntry != null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 1e3839b5df4..261cc3ac081 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -75,6 +75,8 @@ interface BigtableService extends Serializable {
      * current row because the last such call was unsuccessful.
      */
     Row getCurrentRow() throws NoSuchElementException;
+
+    void close();
   }
 
   /** Returns a {@link Reader} that will read from the specified source. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index dad3370dae6..06e0108259d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -25,6 +25,7 @@ import com.google.api.gax.batching.BatchingException;
 import com.google.api.gax.grpc.GrpcCallContext;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStream;
 import com.google.api.gax.rpc.StreamController;
 import com.google.bigtable.v2.Cell;
 import com.google.bigtable.v2.Column;
@@ -137,6 +138,10 @@ class BigtableServiceImpl implements BigtableService {
 
     private Row currentRow;
 
+    private ServerStream<Row> stream;
+
+    private boolean exhausted;
+
     @VisibleForTesting
     BigtableReaderImpl(
         BigtableDataClient client,
@@ -168,11 +173,11 @@ class BigtableServiceImpl implements BigtableService {
         query.filter(Filters.FILTERS.fromProto(rowFilter));
       }
       try {
-        results =
+        stream =
             client
                 .readRowsCallable(new BigtableRowProtoAdapter())
-                .call(query, GrpcCallContext.createDefault())
-                .iterator();
+                .call(query, GrpcCallContext.createDefault());
+        results = stream.iterator();
         serviceCallMetric.call("ok");
       } catch (StatusRuntimeException e) {
         serviceCallMetric.call(e.getStatus().getCode().toString());
@@ -187,6 +192,7 @@ class BigtableServiceImpl implements BigtableService {
         currentRow = results.next();
         return true;
       }
+      exhausted = true;
       return false;
     }
 
@@ -197,6 +203,14 @@ class BigtableServiceImpl implements BigtableService {
       }
       return currentRow;
     }
+
+    @Override
+    public void close() {
+      if (!exhausted) {
+        stream.cancel();
+        exhausted = true;
+      }
+    }
   }
 
   @VisibleForTesting
@@ -295,6 +309,9 @@ class BigtableServiceImpl implements BigtableService {
           Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
     }
 
+    @Override
+    public void close() {}
+
     @Override
     public boolean start() throws IOException {
       future = fetchNextSegment();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index ae960533340..dd6a55ff437 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1846,6 +1846,9 @@ public class BigtableIOTest {
       }
       return currentRow;
     }
+
+    @Override
+    public void close() {}
   }
 
   /** A {@link FakeBigtableReader} implementation that throw exceptions at 
given stage. */

Reply via email to