This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ba714221d5e organize and refactor GrpcWindmillServer. (#29156) ba714221d5e is described below commit ba714221d5efaea58a59bccaad2eaeef70bd4ec4 Author: martin trieu <marti...@google.com> AuthorDate: Tue Oct 31 02:37:10 2023 -0700 organize and refactor GrpcWindmillServer. (#29156) organize and refactor GrpcWindmillServer to prepare for Streaming Engine Client changes. --- .../google-cloud-dataflow-java/worker/build.gradle | 3 + .../worker/MetricTrackingWindmillServerStub.java | 4 +- .../dataflow/worker/StreamingDataflowWorker.java | 6 +- .../options/StreamingDataflowWorkerOptions.java | 2 +- .../worker/windmill/WindmillServerBase.java | 8 +- .../worker/windmill/WindmillServerStub.java | 8 +- .../{ => client}/AbstractWindmillStream.java | 3 +- .../windmill/{ => client}/WindmillStream.java | 21 +- .../windmill/{ => client}/WindmillStreamPool.java | 2 +- .../grpc}/AppendableInputStream.java | 2 +- .../grpc}/GetWorkTimingInfosTracker.java | 2 +- .../grpc}/GrpcCommitWorkStream.java | 9 +- .../grpc}/GrpcDeadlineClientInterceptor.java | 2 +- .../windmill/client/grpc/GrpcDispatcherClient.java | 136 +++++ .../grpc}/GrpcGetDataStream.java | 13 +- .../grpc}/GrpcGetDataStreamRequests.java | 2 +- .../grpc}/GrpcGetWorkStream.java | 29 +- .../grpc}/GrpcGetWorkerMetadataStream.java | 9 +- .../windmill/client/grpc/GrpcWindmillServer.java | 355 ++++++++++++ .../client/grpc/GrpcWindmillStreamFactory.java | 227 ++++++++ .../grpc/auth/VendoredCredentialsAdapter.java | 81 +++ .../VendoredRequestMetadataCallbackAdapter.java | 51 ++ .../grpc/observers}/DirectStreamObserver.java | 2 +- .../ForwardingClientResponseObserver.java | 2 +- .../grpc/observers}/StreamObserverFactory.java | 2 +- .../client/grpc/stubs/WindmillChannelFactory.java | 137 +++++ .../client/grpc/stubs/WindmillStubFactory.java | 73 +++ .../throttling/StreamingEngineThrottleTimers.java | 41 ++ .../throttling}/ThrottleTimer.java | 6 +- .../windmill/grpcclient/GrpcWindmillServer.java | 607 --------------------- .../worker/windmill/work/WorkItemReceiver.java | 34 ++ .../worker/windmill/work/budget/GetWorkBudget.java | 98 ++++ .../dataflow/worker/FakeWindmillServer.java | 24 +- .../{ => client}/WindmillStreamPoolTest.java | 2 +- .../grpc}/GrpcGetWorkerMetadataStreamTest.java | 9 +- .../grpc}/GrpcWindmillServerTest.java | 15 +- .../windmill/work/budget/GetWorkBudgetTest.java | 72 +++ 37 files changed, 1413 insertions(+), 686 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index ce06063c9b5..1ca9eba2b48 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -89,6 +89,9 @@ applyJavaNature( // Allow slf4j implementation worker for logging during pipeline execution "org/slf4j/impl/**" ], + generatedClassPatterns: [ + /^org\.apache\.beam\.runners\.dataflow\.worker\.windmill.*/ + ], shadowClosure: { // Each included dependency must also include all of its necessary transitive dependencies // or have them provided by the users pipeline during job submission. Typically a users diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java index 33b55647213..0e929249b3a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java @@ -29,8 +29,8 @@ import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamPool; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 811250ee785..11849e8b8c4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -104,9 +104,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStreamPool; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; import org.apache.beam.sdk.coders.Coder; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java index cc5b3302b01..bacfa1eef63 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java @@ -21,7 +21,7 @@ import java.io.IOException; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; -import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java index fe81eece138..8caa79cd3f7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerBase.java @@ -19,10 +19,10 @@ package org.apache.beam.runners.dataflow.worker.windmill; import java.io.IOException; import java.util.Set; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java index 1bb5359e06f..c327e68d7e9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java @@ -21,10 +21,10 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Set; import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; /** Stub for communicating with a Windmill server. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java similarity index 98% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index ea7efff7a06..4e47676989a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill; +package org.apache.beam.runners.dataflow.worker.windmill.client; import java.io.IOException; import java.io.PrintWriter; @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java similarity index 84% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java index 4dd4164fc4e..fa1f797a191 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill; +package org.apache.beam.runners.dataflow.worker.windmill.client; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.joda.time.Instant; /** Superclass for streams returned by streaming Windmill methods. */ @@ -41,16 +41,11 @@ public interface WindmillStream { /** Handle representing a stream of GetWork responses. */ @ThreadSafe interface GetWorkStream extends WindmillStream { - /** Functional interface for receiving WorkItems. */ - @FunctionalInterface - interface WorkItemReceiver { - void receiveWork( - String computation, - @Nullable Instant inputDataWatermark, - @Nullable Instant synchronizedProcessingTime, - Windmill.WorkItem workItem, - Collection<Windmill.LatencyAttribution> getWorkStreamLatencies); - } + /** Adjusts the {@link GetWorkBudget} for the stream. */ + void adjustBudget(long itemsDelta, long bytesDelta); + + /** Returns the remaining in-flight {@link GetWorkBudget}. */ + GetWorkBudget remainingBudget(); } /** Interface for streaming GetDataRequests to Windmill. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java similarity index 99% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java index 9cd4ab0ea4a..9f1b67edc1e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPool.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPool.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill; +package org.apache.beam.runners.dataflow.worker.windmill.client; import java.util.ArrayList; import java.util.HashMap; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java similarity index 98% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java index dbd3613ee4c..6a0d0a63d5a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/AppendableInputStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/AppendableInputStream.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import java.io.IOException; import java.io.InputStream; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java similarity index 99% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java index e6710993af9..221b18be164 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GetWorkTimingInfosTracker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import java.util.ArrayList; import java.util.Collection; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java similarity index 96% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java index 1bba40805de..5d0a5085fe1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -27,15 +27,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; -import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitResponse; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java similarity index 97% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java index 6b0e19cbb48..629006e2359 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcDeadlineClientInterceptor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDeadlineClientInterceptor.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import java.util.concurrent.TimeUnit; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java new file mode 100644 index 00000000000..ef9156f9c05 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */ +@ThreadSafe +class GrpcDispatcherClient { + private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class); + private final WindmillStubFactory windmillStubFactory; + + @GuardedBy("this") + private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs; + + @GuardedBy("this") + private final Set<HostAndPort> dispatcherEndpoints; + + @GuardedBy("this") + private final Random rand; + + private GrpcDispatcherClient( + WindmillStubFactory windmillStubFactory, + List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs, + Set<HostAndPort> dispatcherEndpoints, + Random rand) { + this.windmillStubFactory = windmillStubFactory; + this.dispatcherStubs = dispatcherStubs; + this.dispatcherEndpoints = dispatcherEndpoints; + this.rand = rand; + } + + static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) { + return new GrpcDispatcherClient( + windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random()); + } + + @VisibleForTesting + static GrpcDispatcherClient forTesting( + WindmillStubFactory windmillGrpcStubFactory, + List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs, + Set<HostAndPort> dispatcherEndpoints) { + Preconditions.checkArgument(dispatcherEndpoints.size() == dispatcherStubs.size()); + return new GrpcDispatcherClient( + windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new Random()); + } + + synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() { + Preconditions.checkState( + !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been set"); + + return (dispatcherStubs.size() == 1 + ? dispatcherStubs.get(0) + : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size()))); + } + + synchronized boolean isReady() { + return !dispatcherStubs.isEmpty(); + } + + synchronized void consumeWindmillDispatcherEndpoints( + ImmutableSet<HostAndPort> dispatcherEndpoints) { + Preconditions.checkArgument( + dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(), + "Cannot set dispatcher endpoints to nothing."); + if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) { + // The endpoints are equal don't recreate the stubs. + return; + } + + LOG.info("Creating a new windmill stub, endpoints: {}", dispatcherEndpoints); + if (!this.dispatcherEndpoints.isEmpty()) { + LOG.info("Previous windmill stub endpoints: {}", this.dispatcherEndpoints); + } + + resetDispatcherEndpoints(dispatcherEndpoints); + } + + private synchronized void resetDispatcherEndpoints( + ImmutableSet<HostAndPort> newDispatcherEndpoints) { + LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", newDispatcherEndpoints); + this.dispatcherStubs.clear(); + this.dispatcherEndpoints.clear(); + this.dispatcherEndpoints.addAll(newDispatcherEndpoints); + + dispatcherEndpoints.stream() + .map(this::createDispatcherStubForWindmillService) + .forEach(dispatcherStubs::add); + } + + private CloudWindmillServiceV1Alpha1Stub createDispatcherStubForWindmillService( + HostAndPort endpoint) { + if (LOCALHOST.equals(endpoint.getHost())) { + return CloudWindmillServiceV1Alpha1Grpc.newStub(localhostChannel(endpoint.getPort())); + } + + // Use an in-process stub if testing. + return windmillStubFactory.getKind() == WindmillStubFactory.Kind.IN_PROCESS + ? windmillStubFactory.inProcess().get() + : windmillStubFactory.remote().apply(WindmillServiceAddress.create(endpoint)); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java similarity index 95% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java index 238cc771dce..ea9cd7f0fa3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify; @@ -33,8 +33,6 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; -import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; @@ -43,9 +41,12 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataReq import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcGetDataStreamRequests.QueuedBatch; -import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcGetDataStreamRequests.QueuedRequest; +import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; import org.joda.time.Instant; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java similarity index 98% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java index 7da7b13958b..cda9537127d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetDataStreamRequests.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import com.google.auto.value.AutoOneOf; import java.util.ArrayList; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java similarity index 89% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java index 4660fe25b13..d7d9bfddffb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import java.io.IOException; import java.io.PrintWriter; @@ -27,16 +27,18 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; -import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; -import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkRequestExtension; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; @@ -44,7 +46,7 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class GrpcGetWorkStream +public final class GrpcGetWorkStream extends AbstractWindmillStream<StreamingGetWorkRequest, StreamingGetWorkResponseChunk> implements GetWorkStream { @@ -79,7 +81,7 @@ final class GrpcGetWorkStream this.inflightBytes = new AtomicLong(); } - static GrpcGetWorkStream create( + public static GrpcGetWorkStream create( Function< StreamObserver<StreamingGetWorkResponseChunk>, StreamObserver<StreamingGetWorkRequest>> @@ -190,6 +192,19 @@ final class GrpcGetWorkStream getWorkThrottleTimer.start(); } + @Override + public void adjustBudget(long itemsDelta, long bytesDelta) { + // no-op + } + + @Override + public GetWorkBudget remainingBudget() { + return GetWorkBudget.builder() + .setBytes(request.getMaxBytes() - inflightBytes.get()) + .setItems(request.getMaxItems() - inflightMessages.get()) + .build(); + } + private class WorkItemBuffer { private final GetWorkTimingInfosTracker workTimingInfosTracker; private String computation; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java similarity index 93% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java index 427fd412ec7..a403feddb45 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.io.PrintWriter; @@ -23,13 +23,14 @@ import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; -import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; import org.slf4j.Logger; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java new file mode 100644 index 00000000000..3a881df7146 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.LOCALHOST; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.inProcessChannel; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.localhostChannel; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** gRPC client for communicating with Streaming Engine. */ +@SuppressFBWarnings({ + // Very likely real potentials for bugs. + "JLM_JSR166_UTILCONCURRENT_MONITORENTER", // https://github.com/apache/beam/issues/19273 + "IS2_INCONSISTENT_SYNC" // https://github.com/apache/beam/issues/19271 +}) +@SuppressWarnings("nullness") // TODO(https://github.com/apache/beam/issues/20497 +public final class GrpcWindmillServer extends WindmillServerStub { + private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServer.class); + private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20; + private static final Duration MIN_BACKOFF = Duration.millis(1); + private static final Duration MAX_BACKOFF = Duration.standardSeconds(30); + private static final int NO_HEALTH_CHECK = -1; + private static final String GRPC_LOCALHOST = "grpc:localhost"; + + private final GrpcWindmillStreamFactory windmillStreamFactory; + private final GrpcDispatcherClient dispatcherClient; + private final StreamingDataflowWorkerOptions options; + private final StreamingEngineThrottleTimers throttleTimers; + private Duration maxBackoff; + private @Nullable WindmillApplianceGrpc.WindmillApplianceBlockingStub syncApplianceStub; + + private GrpcWindmillServer( + StreamingDataflowWorkerOptions options, GrpcDispatcherClient grpcDispatcherClient) { + this.options = options; + this.throttleTimers = StreamingEngineThrottleTimers.create(); + this.maxBackoff = MAX_BACKOFF; + this.windmillStreamFactory = + GrpcWindmillStreamFactory.of( + JobHeader.newBuilder() + .setJobId(options.getJobId()) + .setProjectId(options.getProject()) + .setWorkerId(options.getWorkerId()) + .build()) + .setWindmillMessagesBetweenIsReadyChecks( + options.getWindmillMessagesBetweenIsReadyChecks()) + .setMaxBackOffSupplier(() -> maxBackoff) + .setLogEveryNStreamFailures( + options.getWindmillServiceStreamingLogEveryNStreamFailures()) + .setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit()) + .build(); + windmillStreamFactory.scheduleHealthChecks( + options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()); + + this.dispatcherClient = grpcDispatcherClient; + this.syncApplianceStub = null; + } + + private static StreamingDataflowWorkerOptions testOptions(boolean enableStreamingEngine) { + StreamingDataflowWorkerOptions options = + PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class); + options.setProject("project"); + options.setJobId("job"); + options.setWorkerId("worker"); + List<String> experiments = + options.getExperiments() == null ? new ArrayList<>() : options.getExperiments(); + if (enableStreamingEngine) { + experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT); + } + options.setExperiments(experiments); + + options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE); + options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK); + options.setWindmillServiceStreamingLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_FAILURES); + + return options; + } + + /** Create new instance of {@link GrpcWindmillServer}. */ + public static GrpcWindmillServer create(StreamingDataflowWorkerOptions workerOptions) + throws IOException { + + GrpcWindmillServer grpcWindmillServer = + new GrpcWindmillServer( + workerOptions, + GrpcDispatcherClient.create( + WindmillStubFactory.remoteStubFactory( + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(), + workerOptions.getGcpCredential()))); + if (workerOptions.getWindmillServiceEndpoint() != null) { + grpcWindmillServer.configureWindmillServiceEndpoints(); + } else if (!workerOptions.isEnableStreamingEngine() + && workerOptions.getLocalWindmillHostport() != null) { + grpcWindmillServer.configureLocalHost(); + } + + return grpcWindmillServer; + } + + @VisibleForTesting + static GrpcWindmillServer newTestInstance(String name) { + ManagedChannel inProcessChannel = inProcessChannel(name); + CloudWindmillServiceV1Alpha1Stub stub = + CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel); + List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs = Lists.newArrayList(stub); + Set<HostAndPort> dispatcherEndpoints = Sets.newHashSet(HostAndPort.fromHost(name)); + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.forTesting( + WindmillStubFactory.inProcessStubFactory(name, unused -> inProcessChannel), + dispatcherStubs, + dispatcherEndpoints); + return new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */ true), dispatcherClient); + } + + @VisibleForTesting + static GrpcWindmillServer newApplianceTestInstance(Channel channel) { + GrpcWindmillServer testServer = + new GrpcWindmillServer( + testOptions(/* enableStreamingEngine= */ false), + // No-op, Appliance does not use Dispatcher to call Streaming Engine. + GrpcDispatcherClient.create(WindmillStubFactory.inProcessStubFactory("test"))); + testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel); + return testServer; + } + + private static WindmillApplianceGrpc.WindmillApplianceBlockingStub + createWindmillApplianceStubWithDeadlineInterceptor(Channel channel) { + return WindmillApplianceGrpc.newBlockingStub(channel) + .withInterceptors(GrpcDeadlineClientInterceptor.withDefaultUnaryRpcDeadline()); + } + + private static UnsupportedOperationException unsupportedUnaryRequestInStreamingEngineException( + String rpcName) { + return new UnsupportedOperationException( + String.format("Unary %s calls are not supported in Streaming Engine.", rpcName)); + } + + private void configureWindmillServiceEndpoints() { + Set<HostAndPort> endpoints = new HashSet<>(); + for (String endpoint : Splitter.on(',').split(options.getWindmillServiceEndpoint())) { + endpoints.add( + HostAndPort.fromString(endpoint).withDefaultPort(options.getWindmillServicePort())); + } + + dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints)); + } + + private void configureLocalHost() { + int portStart = options.getLocalWindmillHostport().lastIndexOf(':'); + String endpoint = options.getLocalWindmillHostport().substring(0, portStart); + Preconditions.checkState(GRPC_LOCALHOST.equals(endpoint)); + int port = Integer.parseInt(options.getLocalWindmillHostport().substring(portStart + 1)); + dispatcherClient.consumeWindmillDispatcherEndpoints( + ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port))); + initializeLocalHost(port); + } + + @Override + public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) { + dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints)); + } + + @Override + public boolean isReady() { + return dispatcherClient.isReady(); + } + + private synchronized void initializeLocalHost(int port) { + this.maxBackoff = Duration.millis(500); + if (options.isEnableStreamingEngine()) { + dispatcherClient.consumeWindmillDispatcherEndpoints( + ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port))); + } else { + this.syncApplianceStub = + createWindmillApplianceStubWithDeadlineInterceptor(localhostChannel(port)); + } + } + + @Override + public void appendSummaryHtml(PrintWriter writer) { + windmillStreamFactory.appendSummaryHtml(writer); + } + + private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) { + // Configure backoff to retry calls forever, with a maximum sane retry interval. + BackOff backoff = + FluentBackoff.DEFAULT.withInitialBackoff(MIN_BACKOFF).withMaxBackoff(maxBackoff).backoff(); + + int rpcErrors = 0; + while (true) { + try { + return function.get(); + } catch (StatusRuntimeException e) { + try { + if (++rpcErrors % 20 == 0) { + LOG.warn( + "Many exceptions calling gRPC. Last exception: {} with status {}", + e, + e.getStatus()); + } + if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { + throw new RpcException(e); + } + } catch (IOException | InterruptedException i) { + if (i instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + RpcException rpcException = new RpcException(e); + rpcException.addSuppressed(i); + throw rpcException; + } + } + } + } + + @Override + public GetWorkResponse getWork(GetWorkRequest request) { + if (syncApplianceStub != null) { + return callWithBackoff(() -> syncApplianceStub.getWork(request)); + } + + throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork")); + } + + @Override + public GetDataResponse getData(GetDataRequest request) { + if (syncApplianceStub != null) { + return callWithBackoff(() -> syncApplianceStub.getData(request)); + } + + throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData")); + } + + @Override + public CommitWorkResponse commitWork(CommitWorkRequest request) { + if (syncApplianceStub != null) { + return callWithBackoff(() -> syncApplianceStub.commitWork(request)); + } + throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork")); + } + + @Override + public GetWorkStream getWorkStream(GetWorkRequest request, WorkItemReceiver receiver) { + return windmillStreamFactory.createGetWorkStream( + dispatcherClient.getDispatcherStub(), + GetWorkRequest.newBuilder(request) + .setJobId(options.getJobId()) + .setProjectId(options.getProject()) + .setWorkerId(options.getWorkerId()) + .build(), + throttleTimers.getWorkThrottleTimer(), + receiver); + } + + @Override + public GetDataStream getDataStream() { + return windmillStreamFactory.createGetDataStream( + dispatcherClient.getDispatcherStub(), throttleTimers.getDataThrottleTimer()); + } + + @Override + public CommitWorkStream commitWorkStream() { + return windmillStreamFactory.createCommitWorkStream( + dispatcherClient.getDispatcherStub(), throttleTimers.commitWorkThrottleTimer()); + } + + @Override + public GetConfigResponse getConfig(GetConfigRequest request) { + if (syncApplianceStub != null) { + return callWithBackoff(() -> syncApplianceStub.getConfig(request)); + } + + throw new RpcException( + new UnsupportedOperationException("GetConfig not supported in Streaming Engine.")); + } + + @Override + public ReportStatsResponse reportStats(ReportStatsRequest request) { + if (syncApplianceStub != null) { + return callWithBackoff(() -> syncApplianceStub.reportStats(request)); + } + + throw new RpcException( + new UnsupportedOperationException("ReportStats not supported in Streaming Engine.")); + } + + @Override + public long getAndResetThrottleTime() { + return throttleTimers.getAndResetThrottleTime(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java new file mode 100644 index 00000000000..e474ebf18b2 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS; + +import com.google.auto.value.AutoBuilder; +import java.io.PrintWriter; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints; +import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Creates gRPC streaming connections to Windmill Service. Maintains a set of all currently opened + * RPC streams for health check/heartbeat requests to keep the streams alive. + */ +@ThreadSafe +public final class GrpcWindmillStreamFactory implements StatusDataProvider { + private static final Duration MIN_BACKOFF = Duration.millis(1); + private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardSeconds(30); + private static final int DEFAULT_LOG_EVERY_N_STREAM_FAILURES = 1; + private static final int DEFAULT_STREAMING_RPC_BATCH_LIMIT = Integer.MAX_VALUE; + private static final int DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS = 1; + + private final JobHeader jobHeader; + private final int logEveryNStreamFailures; + private final int streamingRpcBatchLimit; + private final int windmillMessagesBetweenIsReadyChecks; + private final Supplier<BackOff> grpcBackOff; + private final Set<AbstractWindmillStream<?, ?>> streamRegistry; + private final AtomicLong streamIdGenerator; + + GrpcWindmillStreamFactory( + JobHeader jobHeader, + int logEveryNStreamFailures, + int streamingRpcBatchLimit, + int windmillMessagesBetweenIsReadyChecks, + Supplier<Duration> maxBackOffSupplier) { + this.jobHeader = jobHeader; + this.logEveryNStreamFailures = logEveryNStreamFailures; + this.streamingRpcBatchLimit = streamingRpcBatchLimit; + this.windmillMessagesBetweenIsReadyChecks = windmillMessagesBetweenIsReadyChecks; + // Configure backoff to retry calls forever, with a maximum sane retry interval. + this.grpcBackOff = + Suppliers.memoize( + () -> + FluentBackoff.DEFAULT + .withInitialBackoff(MIN_BACKOFF) + .withMaxBackoff(maxBackOffSupplier.get()) + .backoff()); + this.streamRegistry = ConcurrentHashMap.newKeySet(); + this.streamIdGenerator = new AtomicLong(); + } + + /** + * Returns a new {@link Builder} for {@link GrpcWindmillStreamFactory} with default values set for + * the given {@link JobHeader}. + */ + public static GrpcWindmillStreamFactory.Builder of(JobHeader jobHeader) { + return new AutoBuilder_GrpcWindmillStreamFactory_Builder() + .setJobHeader(jobHeader) + .setWindmillMessagesBetweenIsReadyChecks(DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS) + .setMaxBackOffSupplier(() -> DEFAULT_MAX_BACKOFF) + .setLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_STREAM_FAILURES) + .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT); + } + + private static CloudWindmillServiceV1Alpha1Stub withDeadline( + CloudWindmillServiceV1Alpha1Stub stub) { + // Deadlines are absolute points in time, so generate a new one everytime this function is + // called. + return stub.withDeadlineAfter( + AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS); + } + + public GetWorkStream createGetWorkStream( + CloudWindmillServiceV1Alpha1Stub stub, + GetWorkRequest request, + ThrottleTimer getWorkThrottleTimer, + WorkItemReceiver processWorkItem) { + return GrpcGetWorkStream.create( + responseObserver -> withDeadline(stub).getWorkStream(responseObserver), + request, + grpcBackOff.get(), + newStreamObserverFactory(), + streamRegistry, + logEveryNStreamFailures, + getWorkThrottleTimer, + processWorkItem); + } + + public GetDataStream createGetDataStream( + CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer getDataThrottleTimer) { + return GrpcGetDataStream.create( + responseObserver -> withDeadline(stub).getDataStream(responseObserver), + grpcBackOff.get(), + newStreamObserverFactory(), + streamRegistry, + logEveryNStreamFailures, + getDataThrottleTimer, + jobHeader, + streamIdGenerator, + streamingRpcBatchLimit); + } + + public CommitWorkStream createCommitWorkStream( + CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer commitWorkThrottleTimer) { + return GrpcCommitWorkStream.create( + responseObserver -> withDeadline(stub).commitWorkStream(responseObserver), + grpcBackOff.get(), + newStreamObserverFactory(), + streamRegistry, + logEveryNStreamFailures, + commitWorkThrottleTimer, + jobHeader, + streamIdGenerator, + streamingRpcBatchLimit); + } + + public GetWorkerMetadataStream createGetWorkerMetadataStream( + CloudWindmillServiceV1Alpha1Stub stub, + ThrottleTimer getWorkerMetadataThrottleTimer, + Consumer<WindmillEndpoints> onNewWindmillEndpoints) { + return GrpcGetWorkerMetadataStream.create( + responseObserver -> withDeadline(stub).getWorkerMetadataStream(responseObserver), + grpcBackOff.get(), + newStreamObserverFactory(), + streamRegistry, + logEveryNStreamFailures, + jobHeader, + 0, + getWorkerMetadataThrottleTimer, + onNewWindmillEndpoints); + } + + private StreamObserverFactory newStreamObserverFactory() { + return StreamObserverFactory.direct( + DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, windmillMessagesBetweenIsReadyChecks); + } + + /** + * Schedules streaming RPC health checks to run on a background daemon thread, which will be + * cleaned up when the JVM shutdown. + */ + public void scheduleHealthChecks(int healthCheckInterval) { + if (healthCheckInterval < 0) { + return; + } + + new Timer("WindmillHealthCheckTimer") + .schedule( + new TimerTask() { + @Override + public void run() { + Instant reportThreshold = Instant.now().minus(Duration.millis(healthCheckInterval)); + for (AbstractWindmillStream<?, ?> stream : streamRegistry) { + stream.maybeSendHealthCheck(reportThreshold); + } + } + }, + 0, + healthCheckInterval); + } + + @Override + public void appendSummaryHtml(PrintWriter writer) { + writer.write("Active Streams:<br>"); + for (AbstractWindmillStream<?, ?> stream : streamRegistry) { + stream.appendSummaryHtml(writer); + writer.write("<br>"); + } + } + + @AutoBuilder(ofClass = GrpcWindmillStreamFactory.class) + interface Builder { + Builder setJobHeader(JobHeader jobHeader); + + Builder setLogEveryNStreamFailures(int logEveryNStreamFailures); + + Builder setStreamingRpcBatchLimit(int streamingRpcBatchLimit); + + Builder setWindmillMessagesBetweenIsReadyChecks(int windmillMessagesBetweenIsReadyChecks); + + Builder setMaxBackOffSupplier(Supplier<Duration> maxBackOff); + + GrpcWindmillStreamFactory build(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java new file mode 100644 index 00000000000..23f6fb801a4 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredCredentialsAdapter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * Create a wrapper around credentials that delegates to the underlying {@link + * com.google.auth.Credentials}. Note that this class should override every method that is not final + * and not static and call the delegate directly. + * + * <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation + * delegate to reduce maintenance burden. + */ +public class VendoredCredentialsAdapter + extends org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials { + + private final com.google.auth.Credentials credentials; + + public VendoredCredentialsAdapter(com.google.auth.Credentials credentials) { + this.credentials = credentials; + } + + @Override + public String getAuthenticationType() { + return credentials.getAuthenticationType(); + } + + @Override + public Map<String, List<String>> getRequestMetadata() throws IOException { + return credentials.getRequestMetadata(); + } + + @Override + public void getRequestMetadata( + final URI uri, + Executor executor, + final org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback callback) { + credentials.getRequestMetadata( + uri, executor, new VendoredRequestMetadataCallbackAdapter(callback)); + } + + @Override + public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { + return credentials.getRequestMetadata(uri); + } + + @Override + public boolean hasRequestMetadata() { + return credentials.hasRequestMetadata(); + } + + @Override + public boolean hasRequestMetadataOnly() { + return credentials.hasRequestMetadataOnly(); + } + + @Override + public void refresh() throws IOException { + credentials.refresh(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java new file mode 100644 index 00000000000..8b1b695287e --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/auth/VendoredRequestMetadataCallbackAdapter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth; + +import java.util.List; +import java.util.Map; + +/** + * Create a wrapper around credentials callback that delegates to the underlying vendored {@link + * com.google.auth.RequestMetadataCallback}. Note that this class should override every method that + * is not final and not static and call the delegate directly. + * + * <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation + * delegate to reduce maintenance burden. + */ +public class VendoredRequestMetadataCallbackAdapter + implements com.google.auth.RequestMetadataCallback { + + private final org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback + callback; + + VendoredRequestMetadataCallbackAdapter( + org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback callback) { + this.callback = callback; + } + + @Override + public void onSuccess(Map<String, List<String>> metadata) { + callback.onSuccess(metadata); + } + + @Override + public void onFailure(Throwable exception) { + callback.onFailure(exception); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java similarity index 98% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java index 3c7798126e5..5fb22476ab3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java similarity index 96% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java index a1f80598d89..007717d03b5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/ForwardingClientResponseObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/ForwardingClientResponseObserver.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java similarity index 97% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java index e0878b7b0b9..e3f12687638 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/StreamObserverFactory.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers; import java.util.function.Function; import org.apache.beam.sdk.fn.stream.AdvancingPhaser; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java new file mode 100644 index 00000000000..48cf8ff3f76 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import java.net.Inet6Address; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLException; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; + +/** Utility class used to create different RPC Channels. */ +public final class WindmillChannelFactory { + public static final String LOCALHOST = "localhost"; + private static final int DEFAULT_GRPC_PORT = 443; + + private WindmillChannelFactory() {} + + public static ManagedChannel inProcessChannel(String channelName) { + return InProcessChannelBuilder.forName(channelName).directExecutor().build(); + } + + public static Channel localhostChannel(int port) { + return NettyChannelBuilder.forAddress(LOCALHOST, port) + .maxInboundMessageSize(Integer.MAX_VALUE) + .negotiationType(NegotiationType.PLAINTEXT) + .build(); + } + + static Channel remoteChannel( + WindmillServiceAddress windmillServiceAddress, int windmillServiceRpcChannelTimeoutSec) { + switch (windmillServiceAddress.getKind()) { + case IPV6: + return remoteChannel(windmillServiceAddress.ipv6(), windmillServiceRpcChannelTimeoutSec); + case GCP_SERVICE_ADDRESS: + return remoteChannel( + windmillServiceAddress.gcpServiceAddress(), windmillServiceRpcChannelTimeoutSec); + // switch is exhaustive will never happen. + default: + throw new UnsupportedOperationException( + "Only IPV6 and GCP_SERVICE_ADDRESS are supported WindmillServiceAddresses."); + } + } + + public static Channel remoteChannel( + HostAndPort endpoint, int windmillServiceRpcChannelTimeoutSec) { + try { + return createRemoteChannel( + NettyChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort()), + windmillServiceRpcChannelTimeoutSec); + } catch (SSLException sslException) { + throw new WindmillChannelCreationException(endpoint, sslException); + } + } + + public static Channel remoteChannel( + Inet6Address directEndpoint, int port, int windmillServiceRpcChannelTimeoutSec) { + try { + return createRemoteChannel( + NettyChannelBuilder.forAddress(new InetSocketAddress(directEndpoint, port)), + windmillServiceRpcChannelTimeoutSec); + } catch (SSLException sslException) { + throw new WindmillChannelCreationException(directEndpoint.toString(), sslException); + } + } + + public static Channel remoteChannel( + Inet6Address directEndpoint, int windmillServiceRpcChannelTimeoutSec) { + try { + return createRemoteChannel( + NettyChannelBuilder.forAddress(new InetSocketAddress(directEndpoint, DEFAULT_GRPC_PORT)), + windmillServiceRpcChannelTimeoutSec); + } catch (SSLException sslException) { + throw new WindmillChannelCreationException(directEndpoint.toString(), sslException); + } + } + + @SuppressWarnings("nullness") + private static Channel createRemoteChannel( + NettyChannelBuilder channelBuilder, int windmillServiceRpcChannelTimeoutSec) + throws SSLException { + if (windmillServiceRpcChannelTimeoutSec > 0) { + channelBuilder + .keepAliveTime(windmillServiceRpcChannelTimeoutSec, TimeUnit.SECONDS) + .keepAliveTimeout(windmillServiceRpcChannelTimeoutSec, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true); + } + + return channelBuilder + .flowControlWindow(10 * 1024 * 1024) + .maxInboundMessageSize(Integer.MAX_VALUE) + .maxInboundMetadataSize(1024 * 1024) + .negotiationType(NegotiationType.TLS) + // Set ciphers(null) to not use GCM, which is disabled for Dataflow + // due to it being horribly slow. + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .build(); + } + + public static class WindmillChannelCreationException extends IllegalStateException { + private WindmillChannelCreationException(HostAndPort endpoint, SSLException sourceException) { + super( + String.format( + "Exception thrown when trying to create channel to endpoint={host:%s; port:%d}", + endpoint.getHost(), endpoint.getPort()), + sourceException); + } + + WindmillChannelCreationException(String directEndpoint, Throwable sourceException) { + super( + String.format( + "Exception thrown when trying to create channel to endpoint={%s}", directEndpoint), + sourceException); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java new file mode 100644 index 00000000000..0c7719b0bc1 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; + +import com.google.auth.Credentials; +import com.google.auto.value.AutoOneOf; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.auth.MoreCallCredentials; + +/** + * Used to create stubs to talk to Streaming Engine. Stubs are either in-process for testing, or + * remote. + */ +@AutoOneOf(WindmillStubFactory.Kind.class) +public abstract class WindmillStubFactory { + + public static WindmillStubFactory inProcessStubFactory( + String testName, Function<String, ManagedChannel> channelFactory) { + return AutoOneOf_WindmillStubFactory.inProcess( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(channelFactory.apply(testName))); + } + + public static WindmillStubFactory inProcessStubFactory(String testName) { + return AutoOneOf_WindmillStubFactory.inProcess( + () -> + CloudWindmillServiceV1Alpha1Grpc.newStub( + WindmillChannelFactory.inProcessChannel(testName))); + } + + public static WindmillStubFactory remoteStubFactory( + int rpcChannelTimeoutSec, Credentials gcpCredentials) { + return AutoOneOf_WindmillStubFactory.remote( + directEndpoint -> + CloudWindmillServiceV1Alpha1Grpc.newStub( + remoteChannel(directEndpoint, rpcChannelTimeoutSec)) + .withCallCredentials( + MoreCallCredentials.from(new VendoredCredentialsAdapter(gcpCredentials)))); + } + + public abstract Kind getKind(); + + public abstract Supplier<CloudWindmillServiceV1Alpha1Stub> inProcess(); + + public abstract Function<WindmillServiceAddress, CloudWindmillServiceV1Alpha1Stub> remote(); + + public enum Kind { + IN_PROCESS, + REMOTE + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java new file mode 100644 index 00000000000..6b8dd272037 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/StreamingEngineThrottleTimers.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.throttling; + +import com.google.auto.value.AutoValue; + +@AutoValue +public abstract class StreamingEngineThrottleTimers { + + public static StreamingEngineThrottleTimers create() { + return new AutoValue_StreamingEngineThrottleTimers( + new ThrottleTimer(), new ThrottleTimer(), new ThrottleTimer()); + } + + public long getAndResetThrottleTime() { + return getWorkThrottleTimer().getAndResetThrottleTime() + + getDataThrottleTimer().getAndResetThrottleTime() + + commitWorkThrottleTimer().getAndResetThrottleTime(); + } + + public abstract ThrottleTimer getWorkThrottleTimer(); + + public abstract ThrottleTimer getDataThrottleTimer(); + + public abstract ThrottleTimer commitWorkThrottleTimer(); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java similarity index 94% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java index 237339aff39..f660112721b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/ThrottleTimer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.throttling; import org.joda.time.Instant; @@ -25,7 +25,7 @@ import org.joda.time.Instant; * CommitWork are both blocked for x, totalTime will be 2x. However, if 2 GetWork streams are both * blocked for x totalTime will be x. All methods are thread safe. */ -class ThrottleTimer { +public final class ThrottleTimer { // This is -1 if not currently being throttled or the time in // milliseconds when throttling for this type started. private long startTime = -1; @@ -36,7 +36,7 @@ class ThrottleTimer { /** * Starts the timer if it has not been started and does nothing if it has already been started. */ - synchronized void start() { + public synchronized void start() { if (!throttled()) { // This timer is not started yet so start it now. startTime = Instant.now().getMillis(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java deleted file mode 100644 index 19cb90297df..00000000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java +++ /dev/null @@ -1,607 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; -import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; -import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; -import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; -import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataResponse; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsRequest; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream.WorkItemReceiver; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.sdk.util.BackOffUtils; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Sleeper; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.auth.MoreCallCredentials; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** gRPC client for communicating with Streaming Engine. */ -// Very likely real potential for bugs - https://github.com/apache/beam/issues/19273 -// Very likely real potential for bugs - https://github.com/apache/beam/issues/19271 -@SuppressFBWarnings({"JLM_JSR166_UTILCONCURRENT_MONITORENTER", "IS2_INCONSISTENT_SYNC"}) -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public final class GrpcWindmillServer extends WindmillServerStub { - private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServer.class); - - // If a connection cannot be established, gRPC will fail fast so this deadline can be relatively - // high. - private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300; - private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20; - private static final String LOCALHOST = "localhost"; - private static final Duration MIN_BACKOFF = Duration.millis(1); - private static final Duration MAX_BACKOFF = Duration.standardSeconds(30); - private static final AtomicLong nextId = new AtomicLong(0); - private static final int NO_HEALTH_CHECK = -1; - - private final StreamingDataflowWorkerOptions options; - private final int streamingRpcBatchLimit; - private final List<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub> stubList; - private final ThrottleTimer getWorkThrottleTimer; - private final ThrottleTimer getDataThrottleTimer; - private final ThrottleTimer commitWorkThrottleTimer; - private final Random rand; - private final Set<AbstractWindmillStream<?, ?>> streamRegistry; - private ImmutableSet<HostAndPort> endpoints; - private int logEveryNStreamFailures; - private Duration maxBackoff = MAX_BACKOFF; - private WindmillApplianceGrpc.WindmillApplianceBlockingStub syncApplianceStub = null; - - private GrpcWindmillServer(StreamingDataflowWorkerOptions options) { - this.options = options; - this.streamingRpcBatchLimit = options.getWindmillServiceStreamingRpcBatchLimit(); - this.stubList = new ArrayList<>(); - this.logEveryNStreamFailures = options.getWindmillServiceStreamingLogEveryNStreamFailures(); - this.endpoints = ImmutableSet.of(); - this.getWorkThrottleTimer = new ThrottleTimer(); - this.getDataThrottleTimer = new ThrottleTimer(); - this.commitWorkThrottleTimer = new ThrottleTimer(); - this.rand = new Random(); - this.streamRegistry = Collections.newSetFromMap(new ConcurrentHashMap<>()); - } - - private static StreamingDataflowWorkerOptions testOptions(boolean enableStreamingEngine) { - StreamingDataflowWorkerOptions options = - PipelineOptionsFactory.create().as(StreamingDataflowWorkerOptions.class); - options.setProject("project"); - options.setJobId("job"); - options.setWorkerId("worker"); - List<String> experiments = - options.getExperiments() == null ? new ArrayList<>() : options.getExperiments(); - if (enableStreamingEngine) { - experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT); - } - options.setExperiments(experiments); - - options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE); - options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK); - options.setWindmillServiceStreamingLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_FAILURES); - - return options; - } - - /** Create new instance of {@link GrpcWindmillServer}. */ - public static GrpcWindmillServer create(StreamingDataflowWorkerOptions workerOptions) - throws IOException { - GrpcWindmillServer grpcWindmillServer = new GrpcWindmillServer(workerOptions); - if (workerOptions.getWindmillServiceEndpoint() != null) { - grpcWindmillServer.configureWindmillServiceEndpoints(); - } else if (!workerOptions.isEnableStreamingEngine() - && workerOptions.getLocalWindmillHostport() != null) { - grpcWindmillServer.configureLocalHost(); - } - - if (workerOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs() > 0) { - grpcWindmillServer.scheduleHealthCheckTimer( - workerOptions, () -> grpcWindmillServer.streamRegistry); - } - - return grpcWindmillServer; - } - - @VisibleForTesting - static GrpcWindmillServer newTestInstance(String name) { - GrpcWindmillServer testServer = - new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */ true)); - testServer.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel(name))); - return testServer; - } - - @VisibleForTesting - static GrpcWindmillServer newApplianceTestInstance(Channel channel) { - GrpcWindmillServer testServer = - new GrpcWindmillServer(testOptions(/* enableStreamingEngine= */ false)); - testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel); - return testServer; - } - - private static WindmillApplianceGrpc.WindmillApplianceBlockingStub - createWindmillApplianceStubWithDeadlineInterceptor(Channel channel) { - return WindmillApplianceGrpc.newBlockingStub(channel) - .withInterceptors(GrpcDeadlineClientInterceptor.withDefaultUnaryRpcDeadline()); - } - - private static Channel inProcessChannel(String name) { - return InProcessChannelBuilder.forName(name).directExecutor().build(); - } - - private static Channel localhostChannel(int port) { - return NettyChannelBuilder.forAddress(LOCALHOST, port) - .maxInboundMessageSize(Integer.MAX_VALUE) - .negotiationType(NegotiationType.PLAINTEXT) - .build(); - } - - private static UnsupportedOperationException unsupportedUnaryRequestInStreamingEngineException( - String rpcName) { - return new UnsupportedOperationException( - String.format("Unary %s calls are not supported in Streaming Engine.", rpcName)); - } - - private void scheduleHealthCheckTimer( - StreamingDataflowWorkerOptions options, Supplier<Set<AbstractWindmillStream<?, ?>>> streams) { - new Timer("WindmillHealthCheckTimer") - .schedule( - new HealthCheckTimerTask(options, streams), - 0, - options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()); - } - - private void configureWindmillServiceEndpoints() throws IOException { - Set<HostAndPort> endpoints = new HashSet<>(); - for (String endpoint : Splitter.on(',').split(options.getWindmillServiceEndpoint())) { - endpoints.add( - HostAndPort.fromString(endpoint).withDefaultPort(options.getWindmillServicePort())); - } - initializeWindmillService(endpoints); - } - - private void configureLocalHost() { - int portStart = options.getLocalWindmillHostport().lastIndexOf(':'); - String endpoint = options.getLocalWindmillHostport().substring(0, portStart); - assert ("grpc:localhost".equals(endpoint)); - int port = Integer.parseInt(options.getLocalWindmillHostport().substring(portStart + 1)); - this.endpoints = ImmutableSet.of(HostAndPort.fromParts(LOCALHOST, port)); - initializeLocalHost(port); - } - - @Override - public synchronized void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) - throws IOException { - Preconditions.checkNotNull(endpoints); - if (endpoints.equals(this.endpoints)) { - // The endpoints are equal don't recreate the stubs. - return; - } - LOG.info("Creating a new windmill stub, endpoints: {}", endpoints); - if (this.endpoints != null) { - LOG.info("Previous windmill stub endpoints: {}", this.endpoints); - } - initializeWindmillService(endpoints); - } - - @Override - public synchronized boolean isReady() { - return !stubList.isEmpty(); - } - - private synchronized void initializeLocalHost(int port) { - this.logEveryNStreamFailures = 1; - this.maxBackoff = Duration.millis(500); - Channel channel = localhostChannel(port); - if (options.isEnableStreamingEngine()) { - this.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(channel)); - } else { - this.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel); - } - } - - private synchronized void initializeWindmillService(Set<HostAndPort> endpoints) - throws IOException { - LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", endpoints); - this.stubList.clear(); - this.endpoints = ImmutableSet.copyOf(endpoints); - for (HostAndPort endpoint : this.endpoints) { - if (LOCALHOST.equals(endpoint.getHost())) { - initializeLocalHost(endpoint.getPort()); - } else { - this.stubList.add( - CloudWindmillServiceV1Alpha1Grpc.newStub(remoteChannel(endpoint)) - .withCallCredentials( - MoreCallCredentials.from( - new VendoredCredentialsAdapter(options.getGcpCredential())))); - } - } - } - - private Channel remoteChannel(HostAndPort endpoint) throws IOException { - NettyChannelBuilder builder = - NettyChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort()); - int timeoutSec = options.getWindmillServiceRpcChannelAliveTimeoutSec(); - if (timeoutSec > 0) { - builder - .keepAliveTime(timeoutSec, TimeUnit.SECONDS) - .keepAliveTimeout(timeoutSec, TimeUnit.SECONDS) - .keepAliveWithoutCalls(true); - } - return builder - .flowControlWindow(10 * 1024 * 1024) - .maxInboundMessageSize(Integer.MAX_VALUE) - .maxInboundMetadataSize(1024 * 1024) - .negotiationType(NegotiationType.TLS) - // Set ciphers(null) to not use GCM, which is disabled for Dataflow - // due to it being horribly slow. - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .build(); - } - - /** - * Stubs returned from this method do not (and should not) have {@link - * org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Deadline}(s) set since they represent an absolute - * point in time. {@link org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Deadline}(s) should not be - * treated as a timeout which represents a relative point in time. - * - * @see <a href=https://grpc.io/blog/deadlines/>Official gRPC deadline documentation for more - * details.</a> - */ - private synchronized CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub() { - if (stubList.isEmpty()) { - throw new RuntimeException("windmillServiceEndpoint has not been set"); - } - - return stubList.size() == 1 ? stubList.get(0) : stubList.get(rand.nextInt(stubList.size())); - } - - @Override - public void appendSummaryHtml(PrintWriter writer) { - writer.write("Active Streams:<br>"); - for (AbstractWindmillStream<?, ?> stream : streamRegistry) { - stream.appendSummaryHtml(writer); - writer.write("<br>"); - } - } - - // Configure backoff to retry calls forever, with a maximum sane retry interval. - private BackOff grpcBackoff() { - return FluentBackoff.DEFAULT - .withInitialBackoff(MIN_BACKOFF) - .withMaxBackoff(maxBackoff) - .backoff(); - } - - private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) { - BackOff backoff = grpcBackoff(); - int rpcErrors = 0; - while (true) { - try { - return function.get(); - } catch (StatusRuntimeException e) { - try { - if (++rpcErrors % 20 == 0) { - LOG.warn( - "Many exceptions calling gRPC. Last exception: {} with status {}", - e, - e.getStatus()); - } - if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { - throw new RpcException(e); - } - } catch (IOException | InterruptedException i) { - if (i instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - RpcException rpcException = new RpcException(e); - rpcException.addSuppressed(i); - throw rpcException; - } - } - } - } - - @Override - public GetWorkResponse getWork(GetWorkRequest request) { - if (syncApplianceStub != null) { - return callWithBackoff(() -> syncApplianceStub.getWork(request)); - } - - throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetWork")); - } - - @Override - public GetDataResponse getData(GetDataRequest request) { - if (syncApplianceStub != null) { - return callWithBackoff(() -> syncApplianceStub.getData(request)); - } - - throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("GetData")); - } - - @Override - public CommitWorkResponse commitWork(CommitWorkRequest request) { - if (syncApplianceStub != null) { - return callWithBackoff(() -> syncApplianceStub.commitWork(request)); - } - throw new RpcException(unsupportedUnaryRequestInStreamingEngineException("CommitWork")); - } - - private StreamObserverFactory newStreamObserverFactory() { - return StreamObserverFactory.direct( - DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, options.getWindmillMessagesBetweenIsReadyChecks()); - } - - @Override - public GetWorkStream getWorkStream(GetWorkRequest request, WorkItemReceiver receiver) { - GetWorkRequest getWorkRequest = - GetWorkRequest.newBuilder(request) - .setJobId(options.getJobId()) - .setProjectId(options.getProject()) - .setWorkerId(options.getWorkerId()) - .build(); - - return GrpcGetWorkStream.create( - responseObserver -> - stub() - // Deadlines are absolute points in time, so generate a new one everytime this - // function is called. - .withDeadlineAfter( - AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS) - .getWorkStream(responseObserver), - getWorkRequest, - grpcBackoff(), - newStreamObserverFactory(), - streamRegistry, - logEveryNStreamFailures, - getWorkThrottleTimer, - receiver); - } - - @Override - public GetDataStream getDataStream() { - return GrpcGetDataStream.create( - responseObserver -> - stub() - // Deadlines are absolute points in time, so generate a new one everytime this - // function is called. - .withDeadlineAfter( - AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS) - .getDataStream(responseObserver), - grpcBackoff(), - newStreamObserverFactory(), - streamRegistry, - logEveryNStreamFailures, - getDataThrottleTimer, - makeHeader(), - nextId, - streamingRpcBatchLimit); - } - - @Override - public CommitWorkStream commitWorkStream() { - return GrpcCommitWorkStream.create( - responseObserver -> - stub() - // Deadlines are absolute points in time, so generate a new one everytime this - // function is called. - .withDeadlineAfter( - AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS) - .commitWorkStream(responseObserver), - grpcBackoff(), - newStreamObserverFactory(), - streamRegistry, - logEveryNStreamFailures, - commitWorkThrottleTimer, - makeHeader(), - nextId, - streamingRpcBatchLimit); - } - - @Override - public GetConfigResponse getConfig(GetConfigRequest request) { - if (syncApplianceStub != null) { - return callWithBackoff(() -> syncApplianceStub.getConfig(request)); - } - - throw new RpcException( - new UnsupportedOperationException("GetConfig not supported in Streaming Engine.")); - } - - @Override - public ReportStatsResponse reportStats(ReportStatsRequest request) { - if (syncApplianceStub != null) { - return callWithBackoff(() -> syncApplianceStub.reportStats(request)); - } - - throw new RpcException( - new UnsupportedOperationException("ReportStats not supported in Streaming Engine.")); - } - - @Override - public long getAndResetThrottleTime() { - return getWorkThrottleTimer.getAndResetThrottleTime() - + getDataThrottleTimer.getAndResetThrottleTime() - + commitWorkThrottleTimer.getAndResetThrottleTime(); - } - - private JobHeader makeHeader() { - return JobHeader.newBuilder() - .setJobId(options.getJobId()) - .setProjectId(options.getProject()) - .setWorkerId(options.getWorkerId()) - .build(); - } - - /** - * Create a wrapper around credentials callback that delegates to the underlying vendored {@link - * com.google.auth.RequestMetadataCallback}. Note that this class should override every method - * that is not final and not static and call the delegate directly. - * - * <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation - * delegate to reduce maintenance burden. - */ - private static class VendoredRequestMetadataCallbackAdapter - implements com.google.auth.RequestMetadataCallback { - - private final org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback - callback; - - private VendoredRequestMetadataCallbackAdapter( - org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback callback) { - this.callback = callback; - } - - @Override - public void onSuccess(Map<String, List<String>> metadata) { - callback.onSuccess(metadata); - } - - @Override - public void onFailure(Throwable exception) { - callback.onFailure(exception); - } - } - - /** - * Create a wrapper around credentials that delegates to the underlying {@link - * com.google.auth.Credentials}. Note that this class should override every method that is not - * final and not static and call the delegate directly. - * - * <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation - * delegate to reduce maintenance burden. - */ - private static class VendoredCredentialsAdapter - extends org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials { - - private final com.google.auth.Credentials credentials; - - private VendoredCredentialsAdapter(com.google.auth.Credentials credentials) { - this.credentials = credentials; - } - - @Override - public String getAuthenticationType() { - return credentials.getAuthenticationType(); - } - - @Override - public Map<String, List<String>> getRequestMetadata() throws IOException { - return credentials.getRequestMetadata(); - } - - @Override - public void getRequestMetadata( - final URI uri, - Executor executor, - final org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback - callback) { - credentials.getRequestMetadata( - uri, executor, new VendoredRequestMetadataCallbackAdapter(callback)); - } - - @Override - public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { - return credentials.getRequestMetadata(uri); - } - - @Override - public boolean hasRequestMetadata() { - return credentials.hasRequestMetadata(); - } - - @Override - public boolean hasRequestMetadataOnly() { - return credentials.hasRequestMetadataOnly(); - } - - @Override - public void refresh() throws IOException { - credentials.refresh(); - } - } - - private static class HealthCheckTimerTask extends TimerTask { - private final StreamingDataflowWorkerOptions options; - private final Supplier<Set<AbstractWindmillStream<?, ?>>> streams; - - public HealthCheckTimerTask( - StreamingDataflowWorkerOptions options, - Supplier<Set<AbstractWindmillStream<?, ?>>> streams) { - this.options = options; - this.streams = streams; - } - - @Override - public void run() { - Instant reportThreshold = - Instant.now() - .minus(Duration.millis(options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())); - for (AbstractWindmillStream<?, ?> stream : streams.get()) { - stream.maybeSendHealthCheck(reportThreshold); - } - } - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java new file mode 100644 index 00000000000..307dfdfa17b --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work; + +import java.util.Collection; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** Functional interface for receiving WorkItems. */ +@FunctionalInterface +public interface WorkItemReceiver { + void receiveWork( + String computation, + @Nullable Instant inputDataWatermark, + @Nullable Instant synchronizedProcessingTime, + Windmill.WorkItem workItem, + Collection<Windmill.LatencyAttribution> getWorkStreamLatencies); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java new file mode 100644 index 00000000000..0038e3e9cc6 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +/** + * Budget of items and bytes for fetching {@link + * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via {@link + * WindmillStream.GetWorkStream}. Used to control how "much" work is returned from Windmill. + */ +@AutoValue +public abstract class GetWorkBudget { + public static GetWorkBudget.Builder builder() { + return new AutoValue_GetWorkBudget.Builder(); + } + + /** {@link GetWorkBudget} of 0. */ + public static GetWorkBudget noBudget() { + return builder().setItems(0).setBytes(0).build(); + } + + public static GetWorkBudget from(GetWorkRequest getWorkRequest) { + return builder() + .setItems(getWorkRequest.getMaxItems()) + .setBytes(getWorkRequest.getMaxBytes()) + .build(); + } + + /** + * Adds the given bytes and items or the current budget, returning a new {@link GetWorkBudget}. + * Does not drop below 0. + */ + public GetWorkBudget add(long items, long bytes) { + Preconditions.checkArgument(items >= 0 && bytes >= 0); + return GetWorkBudget.builder().setBytes(bytes() + bytes).setItems(items() + items).build(); + } + + public GetWorkBudget add(GetWorkBudget other) { + return add(other.items(), other.bytes()); + } + + /** + * Subtracts the given bytes and items or the current budget, returning a new {@link + * GetWorkBudget}. Does not drop below 0. + */ + public GetWorkBudget subtract(long items, long bytes) { + Preconditions.checkArgument(items >= 0 && bytes >= 0); + return GetWorkBudget.builder().setBytes(bytes() - bytes).setItems(items() - items).build(); + } + + public GetWorkBudget subtract(GetWorkBudget other) { + return subtract(other.items(), other.bytes()); + } + + /** Budget of bytes for GetWork. Does not drop below 0. */ + public abstract long bytes(); + + /** Budget of items for GetWork. Does not drop below 0. */ + public abstract long items(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setBytes(long bytes); + + public abstract Builder setItems(long budget); + + abstract long items(); + + abstract long bytes(); + + abstract GetWorkBudget autoBuild(); + + public final GetWorkBudget build() { + setItems(Math.max(0, items())); + setBytes(Math.max(0, bytes())); + return autoBuild(); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index 4700217dc8a..092f5e59a13 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -53,9 +53,11 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribut import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.State; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; @@ -198,8 +200,7 @@ class FakeWindmillServer extends WindmillServerStub { } @Override - public GetWorkStream getWorkStream( - Windmill.GetWorkRequest request, GetWorkStream.WorkItemReceiver receiver) { + public GetWorkStream getWorkStream(Windmill.GetWorkRequest request, WorkItemReceiver receiver) { LOG.debug("getWorkStream: {}", request.toString()); Instant startTime = Instant.now(); final CountDownLatch done = new CountDownLatch(1); @@ -209,6 +210,19 @@ class FakeWindmillServer extends WindmillServerStub { done.countDown(); } + @Override + public void adjustBudget(long itemsDelta, long bytesDelta) { + // no-op. + } + + @Override + public GetWorkBudget remainingBudget() { + return GetWorkBudget.builder() + .setItems(request.getMaxItems()) + .setBytes(request.getMaxBytes()) + .build(); + } + @Override public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException { while (done.getCount() > 0) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java similarity index 99% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java index 9924bb7d2b2..264540531bf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillStreamPoolTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill; +package org.apache.beam.runners.dataflow.worker.windmill.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java similarity index 96% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java index 45ed3381a8b..e3b07bf7aa4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java @@ -15,10 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import static com.google.common.truth.Truth.assertThat; -import static org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS; +import static org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.verify; @@ -33,13 +33,14 @@ import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; -import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints; +import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java similarity index 98% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index 53afc6990e4..d9f4b72716c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -68,9 +68,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Value; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.CommitWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; @@ -99,10 +99,7 @@ import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Unit tests for {@link - * org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer}. - */ +/** Unit tests for {@link GrpcWindmillServer}. */ @RunWith(JUnit4.class) @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) @@ -114,7 +111,7 @@ public class GrpcWindmillServerTest { private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); @Rule public ErrorCollector errorCollector = new ErrorCollector(); private Server server; - private org.apache.beam.runners.dataflow.worker.windmill.grpcclient.GrpcWindmillServer client; + private GrpcWindmillServer client; private int remainingErrors = 20; @Before diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java new file mode 100644 index 00000000000..76d50839785 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GetWorkBudgetTest { + + @Test + public void testCreateWithNoBudget() { + GetWorkBudget getWorkBudget = GetWorkBudget.noBudget(); + assertEquals(0, getWorkBudget.items()); + assertEquals(0, getWorkBudget.bytes()); + } + + @Test + public void testBuild_itemsAndBytesNeverBelowZero() { + GetWorkBudget getWorkBudget = GetWorkBudget.builder().setItems(-10).setBytes(-10).build(); + assertEquals(0, getWorkBudget.items()); + assertEquals(0, getWorkBudget.bytes()); + } + + @Test + public void testAdd_doesNotAllowNegativeParameters() { + GetWorkBudget getWorkBudget = GetWorkBudget.builder().setItems(1).setBytes(1).build(); + assertThrows(IllegalArgumentException.class, () -> getWorkBudget.add(-1, -1)); + } + + @Test + public void testSubtract_itemsAndBytesNeverBelowZero() { + GetWorkBudget getWorkBudget = GetWorkBudget.builder().setItems(1).setBytes(1).build(); + GetWorkBudget subtracted = getWorkBudget.subtract(10, 10); + assertEquals(0, subtracted.items()); + assertEquals(0, subtracted.bytes()); + } + + @Test + public void testSubtractGetWorkBudget_itemsAndBytesNeverBelowZero() { + GetWorkBudget getWorkBudget = GetWorkBudget.builder().setItems(1).setBytes(1).build(); + GetWorkBudget subtracted = + getWorkBudget.subtract(GetWorkBudget.builder().setItems(10).setBytes(10).build()); + assertEquals(0, subtracted.items()); + assertEquals(0, subtracted.bytes()); + } + + @Test + public void testSubtract_doesNotAllowNegativeParameters() { + GetWorkBudget getWorkBudget = GetWorkBudget.builder().setItems(1).setBytes(1).build(); + assertThrows(IllegalArgumentException.class, () -> getWorkBudget.subtract(-1, -1)); + } +}