Remove google api BackOff usage from sdks/core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/367fcb28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/367fcb28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/367fcb28 Branch: refs/heads/master Commit: 367fcb28d544934797d25cb34d54136b2d7d6e99 Parents: 87578a6 Author: Vikas Kedigehalli <vika...@google.com> Authored: Fri May 5 19:24:51 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon May 8 19:19:29 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/common/ExampleUtils.java | 6 +- .../beam/examples/WindowedWordCountIT.java | 2 +- runners/google-cloud-dataflow-java/pom.xml | 2 +- .../runners/dataflow/DataflowPipelineJob.java | 18 +++- .../beam/runners/dataflow/util/PackageUtil.java | 3 +- .../dataflow/DataflowPipelineJobTest.java | 13 ++- .../runners/dataflow/util/PackageUtilTest.java | 2 +- runners/spark/pom.xml | 5 - .../beam/runners/spark/io/MicrobatchSource.java | 2 +- sdks/java/core/pom.xml | 5 - .../sdk/io/BoundedReadFromUnboundedSource.java | 2 +- .../beam/sdk/testing/FileChecksumMatcher.java | 2 +- .../java/org/apache/beam/sdk/util/BackOff.java | 81 ++++++++++++++++ .../org/apache/beam/sdk/util/BackOffUtils.java | 57 ++++++++++++ .../beam/sdk/util/ExplicitShardedFile.java | 3 - .../org/apache/beam/sdk/util/FluentBackoff.java | 1 - .../beam/sdk/util/NumberedShardedFile.java | 3 - .../org/apache/beam/sdk/util/ShardedFile.java | 2 - .../java/org/apache/beam/sdk/util/Sleeper.java | 48 ++++++++++ .../sdk/util/UploadIdResponseInterceptor.java | 60 ------------ .../org/apache/beam/SdkCoreApiSurfaceTest.java | 1 - .../beam/sdk/testing/ExpectedLogsTest.java | 2 +- .../sdk/testing/FastNanoClockAndSleeper.java | 47 ---------- .../testing/FastNanoClockAndSleeperTest.java | 47 ---------- .../sdk/testing/FileChecksumMatcherTest.java | 5 - .../beam/sdk/testing/SystemNanoTimeSleeper.java | 2 +- .../apache/beam/sdk/util/FluentBackoffTest.java | 1 - .../beam/sdk/util/NumberedShardedFileTest.java | 14 ++- .../util/UploadIdResponseInterceptorTest.java | 98 -------------------- .../sdk/extensions/gcp/options/GcpOptions.java | 3 +- .../apache/beam/sdk/util/BackOffAdapter.java | 43 +++++++++ .../java/org/apache/beam/sdk/util/GcsUtil.java | 13 ++- .../sdk/util/UploadIdResponseInterceptor.java | 60 ++++++++++++ .../beam/sdk/util/FastNanoClockAndSleeper.java | 47 ++++++++++ .../sdk/util/FastNanoClockAndSleeperTest.java | 47 ++++++++++ .../org/apache/beam/sdk/util/GcsUtilTest.java | 23 +++-- .../util/UploadIdResponseInterceptorTest.java | 98 ++++++++++++++++++++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 81 +++++++--------- .../gcp/bigquery/BigQueryTableRowIterator.java | 7 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 6 +- .../gcp/bigquery/BigQueryServicesImplTest.java | 24 +++-- .../sdk/io/gcp/bigquery/FakeJobService.java | 12 ++- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 6 +- .../sdk/io/gcp/testing/BigqueryMatcher.java | 4 +- .../sdk/io/gcp/testing/BigqueryMatcherTest.java | 7 +- 45 files changed, 621 insertions(+), 394 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java index 409085a..6e4698f 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -20,9 +20,6 @@ package org.apache.beam.examples.common; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.Bigquery.Datasets; import com.google.api.services.bigquery.Bigquery.Tables; @@ -51,8 +48,11 @@ import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.util.Transport; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index f7e35c0..93c4543 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -19,7 +19,6 @@ package org.apache.beam.examples; import static org.hamcrest.Matchers.equalTo; -import com.google.api.client.util.Sleeper; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -47,6 +46,7 @@ import org.apache.beam.sdk.util.ExplicitShardedFile; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.NumberedShardedFile; import org.apache.beam.sdk.util.ShardedFile; +import org.apache.beam.sdk.util.Sleeper; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index b579041..adbd4d7 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ <packaging>jar</packaging> <properties> - <dataflow.container_version>beam-master-20170506</dataflow.container_version> + <dataflow.container_version>beam-master-20170508</dataflow.container_version> <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version> <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version> </properties> http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 23084ed..2d23983 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; import org.slf4j.Logger; @@ -285,9 +286,10 @@ public class DataflowPipelineJob implements PipelineResult { BackOff backoff; if (!duration.isLongerThan(Duration.ZERO)) { - backoff = MESSAGES_BACKOFF_FACTORY.backoff(); + backoff = BackOffAdapter.toGcpBackOff(MESSAGES_BACKOFF_FACTORY.backoff()); } else { - backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff(); + backoff = BackOffAdapter.toGcpBackOff( + MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff()); } // This function tracks the cumulative time from the *first request* to enforce the wall-clock @@ -299,7 +301,10 @@ public class DataflowPipelineJob implements PipelineResult { do { // Get the state of the job before listing messages. This ensures we always fetch job // messages after the job finishes to ensure we have all them. - state = getStateWithRetries(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff(), sleeper); + state = getStateWithRetries( + BackOffAdapter.toGcpBackOff( + STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()), + sleeper); boolean hasError = state == State.UNKNOWN; if (messageHandler != null && !hasError) { @@ -354,7 +359,8 @@ public class DataflowPipelineJob implements PipelineResult { Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000); Duration remaining = duration.minus(consumed); if (remaining.isLongerThan(Duration.ZERO)) { - backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff(); + backoff = BackOffAdapter.toGcpBackOff( + MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(remaining).backoff()); } else { // If there is no time remaining, don't bother backing off. backoff = BackOff.STOP_BACKOFF; @@ -437,7 +443,9 @@ public class DataflowPipelineJob implements PipelineResult { return terminalState; } - return getStateWithRetries(STATUS_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + return getStateWithRetries( + BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.backoff()), + Sleeper.DEFAULT); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 5ddcd29..931f7ea 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -54,6 +54,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.ZipFiles; import org.joda.time.Duration; @@ -210,7 +211,7 @@ class PackageUtil { } // Upload file, retrying on failure. - BackOff backoff = BACKOFF_FACTORY.backoff(); + BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); while (true) { try { LOG.debug("Uploading classpath element {} to {}", source, target); http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index df894d2..3c261d0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -56,8 +56,9 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.BackOffAdapter; +import org.apache.beam.sdk.util.FastNanoClockAndSleeper; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; @@ -346,7 +347,10 @@ public class DataflowPipelineJobTest { assertEquals( State.RUNNING, - job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock)); + job.getStateWithRetries( + BackOffAdapter.toGcpBackOff( + DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()), + fastClock)); } @Test @@ -367,7 +371,10 @@ public class DataflowPipelineJobTest { long startTime = fastClock.nanoTime(); assertEquals( State.UNKNOWN, - job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock)); + job.getStateWithRetries( + BackOffAdapter.toGcpBackOff( + DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()), + fastClock)); long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime); checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL, DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff); http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 4ae3a77..c7a660e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -79,8 +79,8 @@ import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.testing.RegexMatcher; +import org.apache.beam.sdk.util.FastNanoClockAndSleeper; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; import org.apache.beam.sdk.util.MimeTypes; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 94f163d..2c8372b 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -190,11 +190,6 @@ <artifactId>joda-time</artifactId> </dependency> <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client</artifactId> - <version>${google-clients.version}</version> - </dependency> - <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 53d1ba7..3b48caf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.io; -import com.google.api.client.util.BackOff; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -36,6 +35,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index a6b89b8..7ed6c8a 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -125,11 +125,6 @@ <dependencies> <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client</artifactId> - </dependency> - - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index d9adf92..c882447 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io; -import com.google.api.client.util.BackOff; import com.google.auto.value.AutoValue; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; @@ -35,6 +34,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.values.PBegin; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index 82a6b71..5ed0525 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -21,7 +21,6 @@ package org.apache.beam.sdk.testing; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.api.client.util.Sleeper; import com.google.common.base.Strings; import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; @@ -34,6 +33,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.NumberedShardedFile; import org.apache.beam.sdk.util.ShardedFile; +import org.apache.beam.sdk.util.Sleeper; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java new file mode 100644 index 0000000..5bc6027 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.IOException; + +/** + * Back-off policy when retrying an operation. + * + * <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency. + */ +public interface BackOff { + + /** Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */ + long STOP = -1L; + + /** Reset to initial state. */ + void reset() throws IOException; + + /** + * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to + * indicate that no retries should be made. + * + * <p> + * Example usage: + * </p> + * + * <pre> + long backOffMillis = backoff.nextBackOffMillis(); + if (backOffMillis == Backoff.STOP) { + // do not retry operation + } else { + // sleep for backOffMillis milliseconds and retry operation + } + * </pre> + */ + long nextBackOffMillis() throws IOException; + + /** + * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried + * immediately without waiting. + */ + BackOff ZERO_BACKOFF = new BackOff() { + + public void reset() throws IOException { + } + + public long nextBackOffMillis() throws IOException { + return 0; + } + }; + + /** + * Fixed back-off policy that always returns {@code #STOP} for {@link #nextBackOffMillis()}, + * meaning that the operation should not be retried. + */ + BackOff STOP_BACKOFF = new BackOff() { + + public void reset() throws IOException { + } + + public long nextBackOffMillis() throws IOException { + return STOP; + } + }; +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java new file mode 100644 index 0000000..aa7461c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import java.io.IOException; + +/** + * Utilities for {@link BackOff}. + * + * <p><b>Note</b>: This is copied from Google API client library to avoid its dependency. + */ +public final class BackOffUtils { + + /** + * Runs the next iteration of the back-off policy, and returns whether to continue to retry the + * operation. + * + * <p> + * If {@code true}, it will call {@link Sleeper#sleep(long)} with the specified number of + * milliseconds from {@link BackOff#nextBackOffMillis()}. + * </p> + * + * @param sleeper sleeper + * @param backOff back-off policy + * @return whether to continue to back off; in other words, whether + * {@link BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP} + * @throws InterruptedException if any thread has interrupted the current thread + */ + public static boolean next(Sleeper sleeper, BackOff backOff) + throws InterruptedException, IOException { + long backOffTime = backOff.nextBackOffMillis(); + if (backOffTime == BackOff.STOP) { + return false; + } + sleeper.sleep(backOffTime); + return true; + } + + private BackOffUtils() { + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java index 0f184de..50e5ed1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExplicitShardedFile.java @@ -18,9 +18,6 @@ package org.apache.beam.sdk.util; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Lists; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java index 479d7a8..468b742 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; -import com.google.api.client.util.BackOff; import com.google.common.base.MoreObjects; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java index e18dd96..8889358 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java @@ -21,9 +21,6 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Iterables; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java index ec9ed64..5961c4d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ShardedFile.java @@ -18,8 +18,6 @@ package org.apache.beam.sdk.util; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; import java.io.IOException; import java.io.Serializable; import java.util.List; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java new file mode 100644 index 0000000..d180ec6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Sleeper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +/** + * Sleeper interface to use for requesting the current thread to sleep as specified in + * {@link Thread#sleep(long)}. + * + * <p> + * The default implementation can be accessed at {@link #DEFAULT}. Primarily used for testing. + * </p> + * + * <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency. + */ +public interface Sleeper { + + /** + * Causes the currently executing thread to sleep (temporarily cease execution) for the specified + * number of milliseconds as specified in {@link Thread#sleep(long)}. + * + * @param millis length of time to sleep in milliseconds + * @throws InterruptedException if any thread has interrupted the current thread + */ + void sleep(long millis) throws InterruptedException; + + /** Provides the default implementation based on {@link Thread#sleep(long)}. */ + Sleeper DEFAULT = new Sleeper() { + + public void sleep(long millis) throws InterruptedException { + Thread.sleep(millis); + } + }; +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java deleted file mode 100644 index f685b69..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.http.GenericUrl; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpResponseInterceptor; -import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a response intercepter that logs the upload id if the upload - * id header exists and it is the first request (does not have upload_id parameter in the request). - * Only logs if debug level is enabled. - */ -public class UploadIdResponseInterceptor implements HttpResponseInterceptor { - - private static final Logger LOG = LoggerFactory.getLogger(UploadIdResponseInterceptor.class); - private static final String UPLOAD_ID_PARAM = "upload_id"; - private static final String UPLOAD_TYPE_PARAM = "uploadType"; - private static final String UPLOAD_HEADER = "X-GUploader-UploadID"; - - @Override - public void interceptResponse(HttpResponse response) throws IOException { - if (!LOG.isDebugEnabled()) { - return; - } - String uploadId = response.getHeaders().getFirstHeaderStringValue(UPLOAD_HEADER); - if (uploadId == null) { - return; - } - - GenericUrl url = response.getRequest().getUrl(); - // The check for no upload id limits the output to one log line per upload. - // The check for upload type makes sure this is an upload and not a read. - if (url.get(UPLOAD_ID_PARAM) == null && url.get(UPLOAD_TYPE_PARAM) != null) { - LOG.debug( - "Upload ID for url {} on worker {} is {}", - url, - System.getProperty("worker_id"), - uploadId); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java index 26dd9f9..080f34a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java @@ -38,7 +38,6 @@ public class SdkCoreApiSurfaceTest { final Set<String> allowed = ImmutableSet.of( "org.apache.beam", - "com.google.api.client", "com.fasterxml.jackson.annotation", "com.fasterxml.jackson.core", "com.fasterxml.jackson.databind", http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java index 1762d0d..d307bed 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java @@ -38,7 +38,7 @@ import org.junit.runners.model.Statement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Tests for {@link FastNanoClockAndSleeper}. */ +/** Tests for {@link ExpectedLogs}. */ @RunWith(JUnit4.class) public class ExpectedLogsTest { private static final Logger LOG = LoggerFactory.getLogger(ExpectedLogsTest.class); http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java deleted file mode 100644 index 6bfafa5..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeper.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.testing; - -import com.google.api.client.util.NanoClock; -import com.google.api.client.util.Sleeper; -import org.junit.rules.ExternalResource; -import org.junit.rules.TestRule; - -/** - * This object quickly moves time forward based upon how much it has been asked to sleep, - * without actually sleeping, to simulate the backoff. - */ -public class FastNanoClockAndSleeper extends ExternalResource - implements NanoClock, Sleeper, TestRule { - private long fastNanoTime; - - @Override - public long nanoTime() { - return fastNanoTime; - } - - @Override - protected void before() throws Throwable { - fastNanoTime = NanoClock.SYSTEM.nanoTime(); - } - - @Override - public void sleep(long millis) throws InterruptedException { - fastNanoTime += millis * 1000000L; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java deleted file mode 100644 index 7d20951..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FastNanoClockAndSleeperTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.testing; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.TimeUnit; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link FastNanoClockAndSleeper}. */ -@RunWith(JUnit4.class) -public class FastNanoClockAndSleeperTest { - @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper(); - - @Test - public void testClockAndSleeper() throws Exception { - long sleepTimeMs = TimeUnit.SECONDS.toMillis(30); - long sleepTimeNano = TimeUnit.MILLISECONDS.toNanos(sleepTimeMs); - long fakeTimeNano = fastNanoClockAndSleeper.nanoTime(); - long startTimeNano = System.nanoTime(); - fastNanoClockAndSleeper.sleep(sleepTimeMs); - long maxTimeNano = startTimeNano + TimeUnit.SECONDS.toNanos(1); - // Verify that actual time didn't progress as much as was requested - assertTrue(System.nanoTime() < maxTimeNano); - // Verify that the fake time did go up by the amount requested - assertEquals(fakeTimeNano + sleepTimeNano, fastNanoClockAndSleeper.nanoTime()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java index 4ee6750..80f02e0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/FileChecksumMatcherTest.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.testing; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import com.google.api.client.util.BackOff; import com.google.common.io.Files; import java.io.File; import java.io.IOException; @@ -43,14 +42,10 @@ public class FileChecksumMatcherTest { public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule - public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); - private BackOff backOff = FileChecksumMatcher.BACK_OFF_FACTORY.backoff(); - @Test public void testPreconditionChecksumIsNull() throws IOException { String tmpPath = tmpFolder.newFile().getPath(); http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java index 3677e84..dd57669 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java @@ -17,9 +17,9 @@ */ package org.apache.beam.sdk.testing; -import com.google.api.client.util.Sleeper; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import org.apache.beam.sdk.util.Sleeper; /** * This class provides an expensive sleeper to deal with issues around Java's http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java index 20b03cf..e810278 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java @@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import com.google.api.client.util.BackOff; import java.io.IOException; import org.joda.time.Duration; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java index 43a9166..cf8c722 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NumberedShardedFileTest.java @@ -25,16 +25,13 @@ import static org.mockito.Matchers.anyCollection; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; -import com.google.api.client.util.BackOff; import com.google.common.io.Files; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.regex.Pattern; -import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.LocalResources; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -42,17 +39,18 @@ import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; /** Tests for {@link NumberedShardedFile}. */ @RunWith(JUnit4.class) public class NumberedShardedFileTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); - - @Mock private PipelineResult pResult = Mockito.mock(PipelineResult.class); + private Sleeper fastClock = new Sleeper() { + @Override + public void sleep(long millis) throws InterruptedException { + // No sleep. + } + }; private final BackOff backOff = NumberedShardedFile.BACK_OFF_FACTORY.backoff(); private String filePattern; http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java deleted file mode 100644 index 8b9f77e..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.http.GenericUrl; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.testing.http.HttpTesting; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import java.io.IOException; -import org.apache.beam.sdk.testing.ExpectedLogs; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * A test for {@link UploadIdResponseInterceptor}. - */ - -@RunWith(JUnit4.class) -public class UploadIdResponseInterceptorTest { - - @Rule public ExpectedException expectedException = ExpectedException.none(); - // Note that expected logs also turns on debug logging. - @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(UploadIdResponseInterceptor.class); - - /** - * Builds a HttpResponse with the given string response. - * - * @param header header value to provide or null if none. - * @param uploadId upload id to provide in the url upload id param or null if none. - * @param uploadType upload type to provide in url upload type param or null if none. - * @return HttpResponse with the given parameters - * @throws IOException - */ - private HttpResponse buildHttpResponse(String header, String uploadId, String uploadType) - throws IOException { - MockHttpTransport.Builder builder = new MockHttpTransport.Builder(); - MockLowLevelHttpResponse resp = new MockLowLevelHttpResponse(); - builder.setLowLevelHttpResponse(resp); - resp.setStatusCode(200); - GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL); - if (header != null) { - resp.addHeader("X-GUploader-UploadID", header); - } - if (uploadId != null) { - url.put("upload_id", uploadId); - } - if (uploadType != null) { - url.put("uploadType", uploadType); - } - return builder.build().createRequestFactory().buildGetRequest(url).execute(); - } - - /** - * Tests the responses that should not log. - */ - @Test - public void testResponseNoLogging() throws IOException { - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, null)); - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", null)); - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", null)); - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", null, null)); - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, "type")); - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", "type")); - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", "type")); - expectedLogs.verifyNotLogged(""); - } - - /** - * Check that a response logs with the correct log. - */ - @Test - public void testResponseLogs() throws IOException { - new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("abc", null, "type")); - GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL); - url.put("uploadType", "type"); - String worker = System.getProperty("worker_id"); - expectedLogs.verifyDebug("Upload ID for url " + url + " on worker " + worker + " is abc"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index a4128e8..985520f 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; @@ -319,7 +320,7 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { return getProjectNumber( projectId, crmClient, - BACKOFF_FACTORY.backoff(), + BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()), Sleeper.DEFAULT); } http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java new file mode 100644 index 0000000..e5a0a6e --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/BackOffAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.IOException; + +/** + * An adapter for converting between Apache Beam and Google API client representations of backoffs. + */ +public class BackOffAdapter { + /** + * Returns an adapter to convert from {@link BackOff} to + * {@link com.google.api.client.util.BackOff}. + */ + public static com.google.api.client.util.BackOff toGcpBackOff(final BackOff backOff) { + return new com.google.api.client.util.BackOff() { + @Override + public void reset() throws IOException { + backOff.reset(); + } + + @Override + public long nextBackOffMillis() throws IOException { + return backOff.nextBackOffMillis(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index ee2e231..18e3e2b 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -306,6 +306,9 @@ public class GcsUtil { return uploadBufferSizeBytes; } + private static BackOff createBackOff() { + return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); + } /** * Returns the file size from GCS or throws {@link FileNotFoundException} * if the resource does not exist. @@ -318,7 +321,7 @@ public class GcsUtil { * Returns the {@link StorageObject} for the given {@link GcsPath}. */ public StorageObject getObject(GcsPath gcsPath) throws IOException { - return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT); } @VisibleForTesting @@ -377,7 +380,7 @@ public class GcsUtil { try { return ResilientOperation.retry( ResilientOperation.getGoogleRequestCallable(listObject), - BACKOFF_FACTORY.backoff(), + createBackOff(), RetryDeterminer.SOCKET_ERRORS, IOException.class); } catch (Exception e) { @@ -469,7 +472,7 @@ public class GcsUtil { public boolean bucketAccessible(GcsPath path) throws IOException { return bucketAccessible( path, - BACKOFF_FACTORY.backoff(), + createBackOff(), Sleeper.DEFAULT); } @@ -482,7 +485,7 @@ public class GcsUtil { public long bucketOwner(GcsPath path) throws IOException { return getBucket( path, - BACKOFF_FACTORY.backoff(), + createBackOff(), Sleeper.DEFAULT).getProjectNumber().longValue(); } @@ -492,7 +495,7 @@ public class GcsUtil { */ public void createBucket(String projectId, Bucket bucket) throws IOException { createBucket( - projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + projectId, bucket, createBackOff(), Sleeper.DEFAULT); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java new file mode 100644 index 0000000..f685b69 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/UploadIdResponseInterceptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseInterceptor; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a response intercepter that logs the upload id if the upload + * id header exists and it is the first request (does not have upload_id parameter in the request). + * Only logs if debug level is enabled. + */ +public class UploadIdResponseInterceptor implements HttpResponseInterceptor { + + private static final Logger LOG = LoggerFactory.getLogger(UploadIdResponseInterceptor.class); + private static final String UPLOAD_ID_PARAM = "upload_id"; + private static final String UPLOAD_TYPE_PARAM = "uploadType"; + private static final String UPLOAD_HEADER = "X-GUploader-UploadID"; + + @Override + public void interceptResponse(HttpResponse response) throws IOException { + if (!LOG.isDebugEnabled()) { + return; + } + String uploadId = response.getHeaders().getFirstHeaderStringValue(UPLOAD_HEADER); + if (uploadId == null) { + return; + } + + GenericUrl url = response.getRequest().getUrl(); + // The check for no upload id limits the output to one log line per upload. + // The check for upload type makes sure this is an upload and not a read. + if (url.get(UPLOAD_ID_PARAM) == null && url.get(UPLOAD_TYPE_PARAM) != null) { + LOG.debug( + "Upload ID for url {} on worker {} is {}", + url, + System.getProperty("worker_id"), + uploadId); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java new file mode 100644 index 0000000..f1392d7 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; +import org.junit.rules.ExternalResource; +import org.junit.rules.TestRule; + +/** + * This object quickly moves time forward based upon how much it has been asked to sleep, + * without actually sleeping, to simulate the backoff. + */ +public class FastNanoClockAndSleeper extends ExternalResource + implements NanoClock, Sleeper, TestRule { + private long fastNanoTime; + + @Override + public long nanoTime() { + return fastNanoTime; + } + + @Override + protected void before() throws Throwable { + fastNanoTime = SYSTEM.nanoTime(); + } + + @Override + public void sleep(long millis) throws InterruptedException { + fastNanoTime += millis * 1000000L; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java new file mode 100644 index 0000000..03f9588 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FastNanoClockAndSleeper}. */ +@RunWith(JUnit4.class) +public class FastNanoClockAndSleeperTest { + @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper(); + + @Test + public void testClockAndSleeper() throws Exception { + long sleepTimeMs = TimeUnit.SECONDS.toMillis(30); + long sleepTimeNano = TimeUnit.MILLISECONDS.toNanos(sleepTimeMs); + long fakeTimeNano = fastNanoClockAndSleeper.nanoTime(); + long startTimeNano = System.nanoTime(); + fastNanoClockAndSleeper.sleep(sleepTimeMs); + long maxTimeNano = startTimeNano + TimeUnit.SECONDS.toNanos(1); + // Verify that actual time didn't progress as much as was requested + assertTrue(System.nanoTime() < maxTimeNano); + // Verify that the fake time did go up by the amount requested + assertEquals(fakeTimeNano + sleepTimeNano, fastNanoClockAndSleeper.nanoTime()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 6ffcaeb..0af584e 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -76,7 +76,6 @@ import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.junit.Rule; @@ -374,7 +373,8 @@ public class GcsUtilTest { Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT.withMaxRetries(2).backoff()); when(mockStorage.objects()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); @@ -491,7 +491,7 @@ public class GcsUtilTest { Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.insert( @@ -514,7 +514,7 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); GoogleJsonResponseException expectedException = googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Waves hand mysteriously", "These aren't the buckets you're looking for"); @@ -541,7 +541,8 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT.backoff()); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); @@ -564,7 +565,8 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT.backoff()); GoogleJsonResponseException expectedException = googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Waves hand mysteriously", "These aren't the buckets you're looking for"); @@ -589,7 +591,8 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT.backoff()); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); @@ -612,7 +615,8 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT.backoff()); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); @@ -635,7 +639,8 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + BackOff mockBackOff = BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT.backoff()); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java new file mode 100644 index 0000000..8b9f77e --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/UploadIdResponseInterceptorTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.testing.http.HttpTesting; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import java.io.IOException; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test for {@link UploadIdResponseInterceptor}. + */ + +@RunWith(JUnit4.class) +public class UploadIdResponseInterceptorTest { + + @Rule public ExpectedException expectedException = ExpectedException.none(); + // Note that expected logs also turns on debug logging. + @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(UploadIdResponseInterceptor.class); + + /** + * Builds a HttpResponse with the given string response. + * + * @param header header value to provide or null if none. + * @param uploadId upload id to provide in the url upload id param or null if none. + * @param uploadType upload type to provide in url upload type param or null if none. + * @return HttpResponse with the given parameters + * @throws IOException + */ + private HttpResponse buildHttpResponse(String header, String uploadId, String uploadType) + throws IOException { + MockHttpTransport.Builder builder = new MockHttpTransport.Builder(); + MockLowLevelHttpResponse resp = new MockLowLevelHttpResponse(); + builder.setLowLevelHttpResponse(resp); + resp.setStatusCode(200); + GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL); + if (header != null) { + resp.addHeader("X-GUploader-UploadID", header); + } + if (uploadId != null) { + url.put("upload_id", uploadId); + } + if (uploadType != null) { + url.put("uploadType", uploadType); + } + return builder.build().createRequestFactory().buildGetRequest(url).execute(); + } + + /** + * Tests the responses that should not log. + */ + @Test + public void testResponseNoLogging() throws IOException { + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, null)); + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", null)); + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", null)); + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", null, null)); + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, null, "type")); + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("hh", "a", "type")); + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse(null, "h", "type")); + expectedLogs.verifyNotLogged(""); + } + + /** + * Check that a response logs with the correct log. + */ + @Test + public void testResponseLogs() throws IOException { + new UploadIdResponseInterceptor().interceptResponse(buildHttpResponse("abc", null, "type")); + GenericUrl url = new GenericUrl(HttpTesting.SIMPLE_URL); + url.put("uploadType", "type"); + String worker = System.getProperty("worker_id"); + expectedLogs.verifyDebug("Upload ID for url " + url + " on worker " + worker + " is abc"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index b348abd..5d5a519 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; @@ -93,6 +94,9 @@ class BigQueryServicesImpl implements BigQueryServices { // The initial backoff for polling the status of a BigQuery job. private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1); + private static final FluentBackoff DEFAULT_BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF); + @Override public JobService getJobService(BigQueryOptions options) { return new JobServiceImpl(options); @@ -114,6 +118,10 @@ class BigQueryServicesImpl implements BigQueryServices { return BigQueryJsonReaderImpl.fromQuery(bqOptions, projectId, queryConfig); } + private static BackOff createDefaultBackoff() { + return BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff()); + } + @VisibleForTesting static class JobServiceImpl implements BigQueryServices.JobService { private final ApiErrorExtractor errorExtractor; @@ -205,10 +213,7 @@ class BigQueryServicesImpl implements BigQueryServices { private static void startJob(Job job, ApiErrorExtractor errorExtractor, Bigquery client) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff); + startJob(job, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff()); } @VisibleForTesting @@ -249,11 +254,12 @@ class BigQueryServicesImpl implements BigQueryServices { @Override public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(maxAttempts) - .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF) - .withMaxBackoff(Duration.standardMinutes(1)) - .backoff(); + BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT + .withMaxRetries(maxAttempts) + .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF) + .withMaxBackoff(Duration.standardMinutes(1)) + .backoff()); return pollJob(jobRef, Sleeper.DEFAULT, backoff); } @@ -299,16 +305,13 @@ class BigQueryServicesImpl implements BigQueryServices { .setConfiguration(new JobConfiguration() .setQuery(queryConfig) .setDryRun(true)); - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); return executeWithRetries( client.jobs().insert(projectId, job), String.format( "Unable to dry run query: %s, aborting after %d retries.", queryConfig, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff, + createDefaultBackoff(), ALWAYS_RETRY).getStatistics(); } @@ -321,10 +324,7 @@ class BigQueryServicesImpl implements BigQueryServices { */ @Override public Job getJob(JobReference jobRef) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - return getJob(jobRef, Sleeper.DEFAULT, backoff); + return getJob(jobRef, Sleeper.DEFAULT, createDefaultBackoff()); } @VisibleForTesting @@ -371,7 +371,7 @@ class BigQueryServicesImpl implements BigQueryServices { FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5); // A backoff for rate limit exceeded errors. Retries forever. - private static final FluentBackoff DEFAULT_BACKOFF_FACTORY = + private static final FluentBackoff RATE_LIMIT_BACKOFF_FACTORY = FluentBackoff.DEFAULT .withInitialBackoff(Duration.standardSeconds(1)) .withMaxBackoff(Duration.standardMinutes(2)); @@ -420,10 +420,7 @@ class BigQueryServicesImpl implements BigQueryServices { @Nullable public Table getTable(TableReference tableRef) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - return getTable(tableRef, backoff, Sleeper.DEFAULT); + return getTable(tableRef, createDefaultBackoff(), Sleeper.DEFAULT); } @VisibleForTesting @@ -528,9 +525,6 @@ class BigQueryServicesImpl implements BigQueryServices { */ @Override public void deleteTable(TableReference tableRef) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); executeWithRetries( client.tables().delete( tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), @@ -538,16 +532,13 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to delete table: %s, aborting after %d retries.", tableRef.getTableId(), MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff, + createDefaultBackoff(), ALWAYS_RETRY); } @Override public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - return isTableEmpty(tableRef, backoff, Sleeper.DEFAULT); + return isTableEmpty(tableRef, createDefaultBackoff(), Sleeper.DEFAULT); } @VisibleForTesting @@ -575,16 +566,13 @@ class BigQueryServicesImpl implements BigQueryServices { @Override public Dataset getDataset(String projectId, String datasetId) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); return executeWithRetries( client.datasets().get(projectId, datasetId), String.format( "Unable to get dataset: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff, + createDefaultBackoff(), DONT_RETRY_NOT_FOUND); } @@ -599,10 +587,8 @@ class BigQueryServicesImpl implements BigQueryServices { public void createDataset( String projectId, String datasetId, @Nullable String location, @Nullable String description) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff); + createDataset( + projectId, datasetId, location, description, Sleeper.DEFAULT, createDefaultBackoff()); } private void createDataset( @@ -659,16 +645,13 @@ class BigQueryServicesImpl implements BigQueryServices { @Override public void deleteDataset(String projectId, String datasetId) throws IOException, InterruptedException { - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); executeWithRetries( client.datasets().delete(projectId, datasetId), String.format( "Unable to delete table: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff, + createDefaultBackoff(), ALWAYS_RETRY); } @@ -725,7 +708,9 @@ class BigQueryServicesImpl implements BigQueryServices { executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() { @Override public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException { - BackOff backoff = DEFAULT_BACKOFF_FACTORY.backoff(); + // A backoff for rate limit exceeded errors. Retries forever. + BackOff backoff = BackOffAdapter.toGcpBackOff( + RATE_LIMIT_BACKOFF_FACTORY.backoff()); while (true) { try { return insert.execute().getInsertErrors(); @@ -811,7 +796,10 @@ class BigQueryServicesImpl implements BigQueryServices { TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) throws IOException, InterruptedException { return insertAll( - ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + ref, rowList, insertIdList, + BackOffAdapter.toGcpBackOff( + INSERT_BACKOFF_FACTORY.backoff()), + Sleeper.DEFAULT); } @@ -822,9 +810,6 @@ class BigQueryServicesImpl implements BigQueryServices { Table table = new Table(); table.setDescription(tableDescription); - BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); return executeWithRetries( client.tables().patch( tableReference.getProjectId(), @@ -835,7 +820,7 @@ class BigQueryServicesImpl implements BigQueryServices { "Unable to patch table description: %s, aborting after %d retries.", tableReference, MAX_RPC_RETRIES), Sleeper.DEFAULT, - backoff, + createDefaultBackoff(), ALWAYS_RETRY); } } http://git-wip-us.apache.org/repos/asf/beam/blob/367fcb28/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 59f2bb6..ba19cf0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -57,6 +57,8 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; + +import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; import org.slf4j.Logger; @@ -453,8 +455,9 @@ class BigQueryTableRowIterator implements AutoCloseable { throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backOff = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff(); + BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff()); T result = null; while (true) {