This is an automated email from the ASF dual-hosted git repository. gleb pushed a commit to branch kanterov_bigtable_vendored_guava in repository https://gitbox.apache.org/repos/asf/beam.git
commit b038ee74ad4804c4ce06c772b6f80b8936bc1575 Author: Gleb Kanterov <[email protected]> AuthorDate: Wed Feb 27 18:02:45 2019 +0100 Fix usage of non-vendored guava in BigTableIO --- .../src/main/resources/beam/suppressions.xml | 2 +- .../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 6 +- .../bigtable/VendoredListenableFutureAdapter.java | 66 ++++++++++++++++++++++ 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml index 247a980..05ae412 100644 --- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml @@ -98,7 +98,7 @@ <suppress id="ForbidNonVendoredGuava" files=".*cassandra.*CassandraIO\.java" /> <suppress id="ForbidNonVendoredGuava" files=".*kinesis.*KinesisIO\.java" /> <suppress id="ForbidNonVendoredGuava" files=".*kinesis.*KinesisProducerMock\.java" /> - <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*BigtableServiceImpl\.java" /> + <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*VendoredListenableFutureAdapter\.java" /> <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*BigtableServiceImplTest\.java" /> <suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamValuesRel\.java" /> <suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamEnumerableConverterTest\.java" /> diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 5d5de21..767faff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -32,8 +32,6 @@ import com.google.cloud.bigtable.grpc.BigtableSession; import com.google.cloud.bigtable.grpc.BigtableTableName; import com.google.cloud.bigtable.grpc.async.BulkMutation; import com.google.cloud.bigtable.grpc.scanner.ResultScanner; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.protobuf.ByteString; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -48,6 +46,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Closer; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,7 +228,7 @@ class BigtableServiceImpl implements BigtableService { CompletableFuture<MutateRowResponse> result = new CompletableFuture<>(); Futures.addCallback( - bulkMutation.add(request), + new VendoredListenableFutureAdapter<>(bulkMutation.add(request)), new FutureCallback<MutateRowResponse>() { @Override public void onSuccess(MutateRowResponse mutateRowResponse) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java new file mode 100644 index 0000000..c736083 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture; + +/** Adapts {@link ListenableFuture} from bigtable-client-core to vendored guava. */ +class VendoredListenableFutureAdapter<V> implements ListenableFuture<V> { + + private final com.google.common.util.concurrent.ListenableFuture<V> underlying; + + VendoredListenableFutureAdapter( + com.google.common.util.concurrent.ListenableFuture<V> underlying) { + this.underlying = underlying; + } + + @Override + public void addListener(Runnable listener, Executor executor) { + underlying.addListener(listener, executor); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return underlying.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return underlying.isCancelled(); + } + + @Override + public boolean isDone() { + return underlying.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return underlying.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return underlying.get(timeout, unit); + } +}
