This is an automated email from the ASF dual-hosted git repository.
sniemitz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9cce412e302 Ensure that BigtableIO closes the stream after reading
(#31478)
9cce412e302 is described below
commit 9cce412e30257903ab60490a16f7dbe71404f729
Author: Justin Uang <[email protected]>
AuthorDate: Mon Jun 3 15:33:43 2024 -0400
Ensure that BigtableIO closes the stream after reading (#31478)
* Close the stream correctly
* Add changes
* Fix formatting
* Implement close in FakeBigtableIOReader
* format
---
CHANGES.md | 1 +
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 1 +
.../beam/sdk/io/gcp/bigtable/BigtableService.java | 2 ++
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 23 +++++++++++++++++++---
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 3 +++
5 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 18289e322d9..91bdfef6916 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,7 @@
## I/Os
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* Ensure that BigtableIO closes the reader streams
([#31477](https://github.com/apache/beam/issues/31477)).
## New Features / Improvements
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 1ed70024f59..4feccd397d7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -1953,6 +1953,7 @@ public class BigtableIO {
public void close() throws IOException {
LOG.info("Closing reader after reading {} records.", recordsReturned);
if (reader != null) {
+ reader.close();
reader = null;
}
if (serviceEntry != null) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 1e3839b5df4..261cc3ac081 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -75,6 +75,8 @@ interface BigtableService extends Serializable {
* current row because the last such call was unsuccessful.
*/
Row getCurrentRow() throws NoSuchElementException;
+
+ void close();
}
/** Returns a {@link Reader} that will read from the specified source. */
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index dad3370dae6..06e0108259d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -25,6 +25,7 @@ import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
@@ -137,6 +138,10 @@ class BigtableServiceImpl implements BigtableService {
private Row currentRow;
+ private ServerStream<Row> stream;
+
+ private boolean exhausted;
+
@VisibleForTesting
BigtableReaderImpl(
BigtableDataClient client,
@@ -168,11 +173,11 @@ class BigtableServiceImpl implements BigtableService {
query.filter(Filters.FILTERS.fromProto(rowFilter));
}
try {
- results =
+ stream =
client
.readRowsCallable(new BigtableRowProtoAdapter())
- .call(query, GrpcCallContext.createDefault())
- .iterator();
+ .call(query, GrpcCallContext.createDefault());
+ results = stream.iterator();
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().toString());
@@ -187,6 +192,7 @@ class BigtableServiceImpl implements BigtableService {
currentRow = results.next();
return true;
}
+ exhausted = true;
return false;
}
@@ -197,6 +203,14 @@ class BigtableServiceImpl implements BigtableService {
}
return currentRow;
}
+
+ @Override
+ public void close() {
+ if (!exhausted) {
+ stream.cancel();
+ exhausted = true;
+ }
+ }
}
@VisibleForTesting
@@ -295,6 +309,9 @@ class BigtableServiceImpl implements BigtableService {
Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
}
+ @Override
+ public void close() {}
+
@Override
public boolean start() throws IOException {
future = fetchNextSegment();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index ae960533340..dd6a55ff437 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1846,6 +1846,9 @@ public class BigtableIOTest {
}
return currentRow;
}
+
+ @Override
+ public void close() {}
}
/** A {@link FakeBigtableReader} implementation that throw exceptions at
given stage. */