This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 21cc2ba [BEAM-5855] Remove duplicated code files from the Dataflow
worker donation (#6821)
21cc2ba is described below
commit 21cc2bab72178f1e292b13356b23ed2609a758ba
Author: Sam sam <[email protected]>
AuthorDate: Wed Oct 24 18:47:33 2018 -0700
[BEAM-5855] Remove duplicated code files from the Dataflow worker donation
(#6821)
* Remove duplicated code files from the Dataflow worker donation. Modify
existing files to use deduplicated code.
---
.../dataflow/harness/util/ThrowingBiConsumer.java | 31 ---
.../dataflow/harness/util/ThrowingBiFunction.java | 31 ---
.../dataflow/harness/util/ThrowingConsumer.java | 31 ---
.../dataflow/harness/util/ThrowingFunction.java | 31 ---
.../dataflow/harness/util/ThrowingRunnable.java | 29 ---
.../worker/DataflowBatchWorkerHarness.java | 8 +-
.../dataflow/worker/StreamingDataflowWorker.java | 8 +-
.../dataflow/worker/WorkerCustomSources.java | 4 +-
.../dataflow/worker/util/FluentBackoff.java | 237 ---------------------
.../worker/windmill/GrpcWindmillServer.java | 8 +-
.../RegisterAndProcessBundleOperationTest.java | 2 +-
.../dataflow/worker/util/FluentBackoffTest.java | 218 -------------------
12 files changed, 15 insertions(+), 623 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java
deleted file mode 100644
index a6c7e1d..0000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.BiConsumer;
-
-/**
- * A {@link BiConsumer} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingBiConsumer<T1, T2> {
- void accept(T1 t1, T2 t2) throws Exception;
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java
deleted file mode 100644
index 7db62eb..0000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.BiFunction;
-
-/**
- * A {@link BiFunction} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingBiFunction<T1, T2, T3> {
- T3 apply(T1 t1, T2 t2) throws Exception;
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java
deleted file mode 100644
index 9209ad5..0000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.Consumer;
-
-/**
- * A {@link Consumer} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingConsumer<T> {
- void accept(T t) throws Exception;
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java
deleted file mode 100644
index 187bb7f..0000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.Function;
-
-/**
- * A {@link Function} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingFunction<T1, T2> {
- T2 apply(T1 value) throws Exception;
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java
deleted file mode 100644
index d74d157..0000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-/**
- * A {@link Runnable} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingRunnable {
- void run() throws Exception;
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
index 565bbce..6e07e59 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
@@ -19,9 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
@@ -29,8 +26,11 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index dd0ee5e..c0f4b44 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -22,9 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment;
import static
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MapTask;
@@ -114,7 +111,6 @@ import
org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
@@ -127,6 +123,10 @@ import
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWo
import
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.StreamPool;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 128834c..4597018 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -26,7 +26,6 @@ import static
org.apache.beam.runners.dataflow.util.Structs.getStrings;
import static
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
-import com.google.api.client.util.BackOff;
import com.google.api.client.util.Base64;
import com.google.api.services.dataflow.model.ApproximateReportedProgress;
import com.google.api.services.dataflow.model.ApproximateSplitRequest;
@@ -54,13 +53,14 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.internal.CustomSources;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java
deleted file mode 100644
index 8a1cc1e..0000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.BackOff;
-import com.google.common.base.MoreObjects;
-import org.joda.time.Duration;
-
-/**
- * A fluent builder for {@link BackOff} objects that allows customization of
the retry algorithm.
- *
- * @see #DEFAULT for the default configuration parameters.
- */
-public final class FluentBackoff {
-
- private static final double DEFAULT_EXPONENT = 1.5;
- private static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
- private static final Duration DEFAULT_MIN_BACKOFF =
Duration.standardSeconds(1);
- private static final Duration DEFAULT_MAX_BACKOFF =
Duration.standardDays(1000);
- private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
- private static final Duration DEFAULT_MAX_CUM_BACKOFF =
Duration.standardDays(1000);
-
- private final double exponent;
- private final Duration initialBackoff;
- private final Duration maxBackoff;
- private final Duration maxCumulativeBackoff;
- private final int maxRetries;
-
- /**
- * By default the {@link BackOff} created by this builder will use
exponential backoff (base
- * exponent 1.5) with an initial backoff of 1 second. These parameters can
be overridden with
- * {@link #withExponent(double)} and {@link #withInitialBackoff(Duration)},
respectively, and the
- * maximum backoff after exponential increase can be capped using {@link
- * FluentBackoff#withMaxBackoff(Duration)}.
- *
- * <p>The default {@link BackOff} does not limit the number of retries. To
limit the backoff, the
- * maximum total number of retries can be set using {@link
#withMaxRetries(int)}. The total time
- * spent in backoff can be time-bounded as well by configuring {@link
- * #withMaxCumulativeBackoff(Duration)}. If either of these limits are
reached, calls to {@link
- * BackOff#nextBackOffMillis()} will return {@link BackOff#STOP} to signal
that no more retries
- * should continue.
- */
- public static final FluentBackoff DEFAULT =
- new FluentBackoff(
- DEFAULT_EXPONENT,
- DEFAULT_MIN_BACKOFF,
- DEFAULT_MAX_BACKOFF,
- DEFAULT_MAX_CUM_BACKOFF,
- DEFAULT_MAX_RETRIES);
-
- /**
- * Instantiates a {@link BackOff} that will obey the current configuration.
- *
- * @see FluentBackoff
- */
- public BackOff backoff() {
- return new BackoffImpl(this);
- }
-
- /**
- * Returns a copy of this {@link FluentBackoff} that instead uses the
specified exponent to
- * control the exponential growth of delay.
- *
- * <p>Does not modify this object.
- *
- * @see FluentBackoff
- */
- public FluentBackoff withExponent(double exponent) {
- checkArgument(exponent > 0, "exponent %s must be greater than 0",
exponent);
- return new FluentBackoff(
- exponent, initialBackoff, maxBackoff, maxCumulativeBackoff,
maxRetries);
- }
-
- /**
- * Returns a copy of this {@link FluentBackoff} that instead uses the
specified initial backoff
- * duration.
- *
- * <p>Does not modify this object.
- *
- * @see FluentBackoff
- */
- public FluentBackoff withInitialBackoff(Duration initialBackoff) {
- checkArgument(
- initialBackoff.isLongerThan(Duration.ZERO),
- "initialBackoff %s must be at least 1 millisecond",
- initialBackoff);
- return new FluentBackoff(
- exponent, initialBackoff, maxBackoff, maxCumulativeBackoff,
maxRetries);
- }
-
- /**
- * Returns a copy of this {@link FluentBackoff} that limits the maximum
backoff of an individual
- * attempt to the specified duration.
- *
- * <p>Does not modify this object.
- *
- * @see FluentBackoff
- */
- public FluentBackoff withMaxBackoff(Duration maxBackoff) {
- checkArgument(
- maxBackoff.getMillis() > 0, "maxBackoff %s must be at least 1
millisecond", maxBackoff);
- return new FluentBackoff(
- exponent, initialBackoff, maxBackoff, maxCumulativeBackoff,
maxRetries);
- }
-
- /**
- * Returns a copy of this {@link FluentBackoff} that limits the total time
spent in backoff
- * returned across all calls to {@link BackOff#nextBackOffMillis()}.
- *
- * <p>Does not modify this object.
- *
- * @see FluentBackoff
- */
- public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff)
{
- checkArgument(
- maxCumulativeBackoff.isLongerThan(Duration.ZERO),
- "maxCumulativeBackoff %s must be at least 1 millisecond",
- maxCumulativeBackoff);
- return new FluentBackoff(
- exponent, initialBackoff, maxBackoff, maxCumulativeBackoff,
maxRetries);
- }
-
- /**
- * Returns a copy of this {@link FluentBackoff} that limits the total number
of retries, aka the
- * total number of calls to {@link BackOff#nextBackOffMillis()} before
returning {@link
- * BackOff#STOP}.
- *
- * <p>Does not modify this object.
- *
- * @see FluentBackoff
- */
- public FluentBackoff withMaxRetries(int maxRetries) {
- checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative",
maxRetries);
- return new FluentBackoff(
- exponent, initialBackoff, maxBackoff, maxCumulativeBackoff,
maxRetries);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(FluentBackoff.class)
- .add("exponent", exponent)
- .add("initialBackoff", initialBackoff)
- .add("maxBackoff", maxBackoff)
- .add("maxRetries", maxRetries)
- .add("maxCumulativeBackoff", maxCumulativeBackoff)
- .toString();
- }
-
- private static class BackoffImpl implements BackOff {
-
- // Customization of this backoff.
- private final FluentBackoff backoffConfig;
- // Current state
- private Duration currentCumulativeBackoff;
- private int currentRetry;
-
- @Override
- public void reset() {
- currentRetry = 0;
- currentCumulativeBackoff = Duration.ZERO;
- }
-
- @Override
- public long nextBackOffMillis() {
- // Maximum number of retries reached.
- if (currentRetry >= backoffConfig.maxRetries) {
- return BackOff.STOP;
- }
- // Maximum cumulative backoff reached.
- if
(currentCumulativeBackoff.compareTo(backoffConfig.maxCumulativeBackoff) >= 0) {
- return BackOff.STOP;
- }
-
- double currentIntervalMillis =
- Math.min(
- backoffConfig.initialBackoff.getMillis()
- * Math.pow(backoffConfig.exponent, currentRetry),
- backoffConfig.maxBackoff.getMillis());
- double randomOffset =
- (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR *
currentIntervalMillis;
- long nextBackoffMillis = Math.round(currentIntervalMillis +
randomOffset);
- // Cap to limit on cumulative backoff
- Duration remainingCumulative =
- backoffConfig.maxCumulativeBackoff.minus(currentCumulativeBackoff);
- nextBackoffMillis = Math.min(nextBackoffMillis,
remainingCumulative.getMillis());
-
- // Update state and return backoff.
- currentCumulativeBackoff =
currentCumulativeBackoff.plus(nextBackoffMillis);
- currentRetry += 1;
- return nextBackoffMillis;
- }
-
- private BackoffImpl(FluentBackoff backoffConfig) {
- this.backoffConfig = backoffConfig;
- this.reset();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(BackoffImpl.class)
- .add("backoffConfig", backoffConfig)
- .add("currentRetry", currentRetry)
- .add("currentCumulativeBackoff", currentCumulativeBackoff)
- .toString();
- }
- }
-
- private FluentBackoff(
- double exponent,
- Duration initialBackoff,
- Duration maxBackoff,
- Duration maxCumulativeBackoff,
- int maxRetries) {
- this.exponent = exponent;
- this.initialBackoff = initialBackoff;
- this.maxBackoff = maxBackoff;
- this.maxRetries = maxRetries;
- this.maxCumulativeBackoff = maxCumulativeBackoff;
- }
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index a3974d6..25b2d78 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -60,7 +57,6 @@ import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
import
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
@@ -88,6 +84,10 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWor
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.grpc.v1.io.grpc.CallCredentials;
import org.apache.beam.vendor.grpc.v1.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1.io.grpc.StatusRuntimeException;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 6bd0d96..3ca9c7d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -61,7 +61,6 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.runners.dataflow.harness.util.ThrowingRunnable;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import
org.apache.beam.runners.dataflow.worker.DataflowPortabilityPCollectionView;
import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
@@ -76,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.ThrowingRunnable;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java
deleted file mode 100644
index ad20448..0000000
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.util;
-
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.api.client.util.BackOff;
-import java.io.IOException;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link FluentBackoff}. */
-@RunWith(JUnit4.class)
-public class FluentBackoffTest {
-
- @Rule public ExpectedException thrown = ExpectedException.none();
- private final FluentBackoff defaultBackoff = FluentBackoff.DEFAULT;
-
- @Test
- public void testInvalidExponent() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("exponent -2.0 must be greater than 0");
- defaultBackoff.withExponent(-2.0);
- }
-
- @Test
- public void testInvalidInitialBackoff() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("initialBackoff PT0S must be at least 1 millisecond");
- defaultBackoff.withInitialBackoff(Duration.ZERO);
- }
-
- @Test
- public void testInvalidMaxBackoff() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("maxBackoff PT0S must be at least 1 millisecond");
- defaultBackoff.withMaxBackoff(Duration.ZERO);
- }
-
- @Test
- public void testInvalidMaxRetries() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("maxRetries -1 cannot be negative");
- defaultBackoff.withMaxRetries(-1);
- }
-
- @Test
- public void testInvalidCumulativeBackoff() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("maxCumulativeBackoff PT-0.002S must be at least 1
millisecond");
- defaultBackoff.withMaxCumulativeBackoff(Duration.millis(-2));
- }
-
- /** Tests with bounded interval, custom exponent, and unlimited retries. */
- @Test
- public void testBoundedIntervalWithReset() throws Exception {
- BackOff backOff =
- FluentBackoff.DEFAULT
- .withInitialBackoff(Duration.millis(500))
- .withMaxBackoff(Duration.standardSeconds(1))
- .backoff();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L),
lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L),
lessThan(1126L)));
- assertThat(
- backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
lessThanOrEqualTo(1500L)));
- assertThat(
- backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
lessThanOrEqualTo(1500L)));
- assertThat(
- backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
lessThanOrEqualTo(1500L)));
- assertThat(
- backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
lessThanOrEqualTo(1500L)));
- assertThat(
- backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
lessThanOrEqualTo(1500L)));
-
- // Reset, should go back to short times.
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L),
lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L),
lessThan(1126L)));
- assertThat(
- backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
lessThanOrEqualTo(1500L)));
- assertThat(
- backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
lessThanOrEqualTo(1500L)));
- }
-
- /** Tests with bounded interval, custom exponent, limited retries, and a
reset. */
- @Test
- public void testMaxRetriesWithReset() throws Exception {
- BackOff backOff =
-
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(500)).withMaxRetries(1).backoff();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L),
lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
- assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
- assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
- assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
-
- backOff.reset();
- assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L),
lessThan(751L)));
- assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
- }
-
- private static long countMaximumBackoff(BackOff backOff) throws IOException {
- long cumulativeBackoffMillis = 0;
- long currentBackoffMillis = backOff.nextBackOffMillis();
- while (currentBackoffMillis != BackOff.STOP) {
- cumulativeBackoffMillis += currentBackoffMillis;
- currentBackoffMillis = backOff.nextBackOffMillis();
- }
- return cumulativeBackoffMillis;
- }
-
- /** Tests with bounded interval, custom exponent, limited cumulative time,
and a reset. */
- @Test
- public void testBoundedIntervalAndCumTimeWithReset() throws Exception {
- BackOff backOff =
- FluentBackoff.DEFAULT
- .withInitialBackoff(Duration.millis(500))
- .withMaxBackoff(Duration.standardSeconds(1))
- .withMaxCumulativeBackoff(Duration.standardMinutes(1))
- .backoff();
-
- assertThat(countMaximumBackoff(backOff),
equalTo(Duration.standardMinutes(1).getMillis()));
-
- backOff.reset();
- assertThat(countMaximumBackoff(backOff),
equalTo(Duration.standardMinutes(1).getMillis()));
- // sanity check: should get 0 if we don't reset
- assertThat(countMaximumBackoff(backOff), equalTo(0L));
-
- backOff.reset();
- assertThat(countMaximumBackoff(backOff),
equalTo(Duration.standardMinutes(1).getMillis()));
- }
-
- /** Tests with bounded interval, custom exponent, limited cumulative time
and retries. */
- @Test
- public void testBoundedIntervalAndCumTimeAndRetriesWithReset() throws
Exception {
- BackOff backOff =
- FluentBackoff.DEFAULT
- .withInitialBackoff(Duration.millis(500))
- .withMaxBackoff(Duration.standardSeconds(1))
- .withMaxCumulativeBackoff(Duration.standardMinutes(1))
- .backoff();
-
- long cumulativeBackoffMillis = 0;
- long currentBackoffMillis = backOff.nextBackOffMillis();
- while (currentBackoffMillis != BackOff.STOP) {
- cumulativeBackoffMillis += currentBackoffMillis;
- currentBackoffMillis = backOff.nextBackOffMillis();
- }
- assertThat(cumulativeBackoffMillis,
equalTo(Duration.standardMinutes(1).getMillis()));
- }
-
- @Test
- public void testFluentBackoffToString() throws IOException {
- FluentBackoff config =
- FluentBackoff.DEFAULT
- .withExponent(3.4)
- .withMaxRetries(4)
- .withInitialBackoff(Duration.standardSeconds(3))
- .withMaxBackoff(Duration.standardHours(1))
- .withMaxCumulativeBackoff(Duration.standardDays(1));
-
- assertEquals(
- "FluentBackoff{exponent=3.4, initialBackoff=PT3S, maxBackoff=PT3600S,"
- + " maxRetries=4, maxCumulativeBackoff=PT86400S}",
- config.toString());
- }
-
- @Test
- public void testBackoffImplToString() throws IOException {
- FluentBackoff config =
- FluentBackoff.DEFAULT
- .withExponent(3.4)
- .withMaxRetries(4)
- .withInitialBackoff(Duration.standardSeconds(3))
- .withMaxBackoff(Duration.standardHours(1))
- .withMaxCumulativeBackoff(Duration.standardDays(1));
- BackOff backOff = config.backoff();
-
- assertEquals(
- "BackoffImpl{backoffConfig="
- + config.toString()
- + ","
- + " currentRetry=0, currentCumulativeBackoff=PT0S}",
- backOff.toString());
-
- // backoff once, ignoring result
- backOff.nextBackOffMillis();
-
- // currentRetry is exact, we can test it.
- assertThat(backOff.toString(), containsString("currentRetry=1"));
- // currentCumulativeBackoff is not exact; we cannot even check that it's
non-zero (randomness).
- }
-}