mutianf commented on code in PR #29548:
URL: https://github.com/apache/beam/pull/29548#discussion_r1415791630
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -470,50 +477,50 @@ static class BigtableWriterImpl implements Writer {
private String projectId;
private String instanceId;
private String tableId;
+ private Duration closeWaitTimeout;
private Distribution bulkSize = Metrics.distribution("BigTable-" +
tableId, "batchSize");
private Distribution latency = Metrics.distribution("BigTable-" + tableId,
"batchLatencyMs");
BigtableWriterImpl(
- BigtableDataClient client, String projectId, String instanceId, String
tableId) {
+ BigtableDataClient client,
+ String projectId,
+ String instanceId,
+ String tableId,
+ Duration closeWaitTimeout) {
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
this.bulkMutation = client.newBulkMutationBatcher(tableId);
- }
-
- @Override
- public void flush() throws IOException {
- if (bulkMutation != null) {
- try {
- stopwatch.start();
- bulkMutation.flush();
- bulkSize.update(outstandingMutations);
- outstandingMutations = 0;
- stopwatch.stop();
- latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // We fail since flush() operation was interrupted.
- throw new IOException(e);
- }
- }
+ this.closeWaitTimeout = closeWaitTimeout;
}
@Override
public void close() throws IOException {
if (bulkMutation != null) {
try {
stopwatch.start();
- bulkMutation.flush();
- bulkMutation.close();
+ // closeAsync will send any remaining elements in the batch.
+ // If the experimental close wait timeout flag is set,
+ // set a timeout waiting for the future.
+ ApiFuture<Void> future = bulkMutation.closeAsync();
+ if (Duration.ZERO.isShorterThan(closeWaitTimeout)) {
+ future.get(closeWaitTimeout.getMillis(), TimeUnit.MILLISECONDS);
+ } else {
+ future.get();
+ }
bulkSize.update(outstandingMutations);
outstandingMutations = 0;
stopwatch.stop();
latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ } catch (TimeoutException e) {
+ // We fail because future.get() timed out
+ throw new IOException("BulkMutation took too long to close", e);
+ } catch (ExecutionException e) {
+ throw new IOException("ExecutionException when closing the
BulkMutation", e.getCause());
Review Comment:
I think entry failure will be retried and if it bubbles up we rely on beam
to retry the bundle?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]