scwhittle commented on code in PR #31784: URL: https://github.com/apache/beam/pull/31784#discussion_r1688451038
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/RefreshableWork.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Instant; + +/** View of {@link Work} that exposes an interface for work refreshing. */ +@Internal +public interface RefreshableWork { + + WorkId id(); + + ShardedKey getShardedKey(); + + boolean isRefreshable(Instant refreshDeadline); Review Comment: the meaning of refreshDeadline isn't particularly clear. We could add a comment but I'm wondering if it makes more sense to have this determination outside of refreshablework class and instead when we are generating refreshablework. Maybe should we just expose the start time or last refresh time and just compare when generating refreshablework? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/GetDataClient.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.io.PrintWriter; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse; +import org.apache.beam.sdk.annotations.Internal; + +/** Client for streaming backend GetData API. */ +@Internal +public interface GetDataClient { + KeyedGetDataResponse getStateData(String computation, KeyedGetDataRequest request); Review Comment: would be good to add comments to interface, for example // Blocking fetch of requested data from windmill backend for the given computation. does it wrap alle exceptions with GetDataException? if so document that if it would want to be caught specifically. BTW is that necessary or should callers themselves just try/catch around the call and know it was from the client? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/WorkRefreshClient.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.util.Map; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeats; + +/** Client for requesting work refresh via heartbeats. */ +public interface WorkRefreshClient { Review Comment: I don't think this needs to be an interface, we can have a single implementation that handles both the appliance and SE cases because the HeartbeatSender is different for those implementations. So I would: - remove this interface and current appliance impl - change the fanoutworkrefreshclient to execute single heartbeat sender inline (will also benefit SE cases where fanout not needed as well). - could keep the fanoutworkrefreshclient as separate class or if it's really just a single method move it into whatever used to be using it. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcherFactory.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.sideinput; + +import java.util.function.Function; +import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; +import org.apache.beam.sdk.annotations.Internal; + +/** + * Factory class for generating {@link SideInputStateFetcher} instances that share a {@link + * SideInputCache}. + */ +@Internal +public final class SideInputStateFetcherFactory { Review Comment: this seems like overkill to me, I don't see any other implementations other than what is done internally to StreamingWorkScheduler. Can this be removed and just be a private cache member and method in that class to create SideInputFetchers? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -255,6 +266,28 @@ public final Instant startTime() { return new Instant(startTimeMs.get()); } + @Override + public String backendWorkerToken() { + return backendWorkerToken; + } + + @Override + public void shutdown() { + if (isShutdown.compareAndSet(false, true)) { + halfClose(); Review Comment: it's not clear to me we want to halfclose here, it seems it may be better to do requestObserver.onError(...) also if we've already called halfclose it may be invalid to call it again. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/HeartbeatSender.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.refresh; + +/** Interface for sending heartbeats. */ Review Comment: add comment that batching is performed by HeartbeatSender equality ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.WindmillComputationKey; +import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeats; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Appliance implementation of {@link GetDataClient}. */ +@Internal +@ThreadSafe +public final class ApplianceGetDataClient implements GetDataClient, WorkRefreshClient { + private static final int MAX_READS_PER_BATCH = 60; + private static final int MAX_ACTIVE_READS = 10; + + private final ApplianceWindmillClient windmillClient; + private final ThrottlingGetDataMetricTracker getDataMetricTracker; + + @GuardedBy("this") + private final List<ReadBatch> pendingReadBatches; + + @GuardedBy("this") + private int activeReadThreads; + + public ApplianceGetDataClient( + ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker getDataMetricTracker) { + this.windmillClient = windmillClient; + this.getDataMetricTracker = getDataMetricTracker; + this.pendingReadBatches = new ArrayList<>(); + this.activeReadThreads = 0; + } + + public static GetDataClient create( Review Comment: constructor is public so I think this factory method could be removed ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java: ########## @@ -94,7 +97,16 @@ public void start() { @Override public void commit(Commit commit) { - commitQueue.put(commit); + if (commit.work().isFailed() || !isRunning.get()) { Review Comment: the log should only be if !isRunning the commit work could be failed due to heartbeat response and the log woudl be inaccurate ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/GetDataClient.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.io.PrintWriter; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse; +import org.apache.beam.sdk.annotations.Internal; + +/** Client for streaming backend GetData API. */ +@Internal +public interface GetDataClient { + KeyedGetDataResponse getStateData(String computation, KeyedGetDataRequest request); + + GlobalData getSideInputData(GlobalDataRequest request); + + default void printHtml(PrintWriter writer) {} Review Comment: rm default? might as well force implementations to add something ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java: ########## @@ -32,15 +33,37 @@ /** Superclass for streams returned by streaming Windmill methods. */ @ThreadSafe public interface WindmillStream { + + /** An identifier for the backend worker where the stream is sending/receiving RPCs. */ + String backendWorkerToken(); + /** Indicates that no more requests will be sent. */ - void close(); + void halfClose(); /** Waits for the server to close its end of the connection, with timeout. */ boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException; /** Returns when the stream was opened. */ Instant startTime(); + /** + * Shutdown the stream. There should be no further interactions with the stream once this has been + * called. + */ + void shutdown(); + + /** Reflects that {@link #shutdown()} was explicitly called. */ + boolean isShutdown(); Review Comment: should this be removed? shutdown indicates that no further interaction with the stream should happen after shutdown. So if external callers are checking shutdown state before doing something it seems racy. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -238,12 +248,15 @@ private StreamingDataflowWorker( dispatchThread.setName("DispatchThread"); this.clientId = clientId; this.windmillServer = windmillServer; - this.metricTrackingWindmillServer = - MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor) - .setUseStreamingRequests(windmillServiceEnabled) - .setUseSeparateHeartbeatStreams(options.getUseSeparateWindmillHeartbeatStreams()) - .setNumGetDataStreams(options.getWindmillGetDataStreamCount()) - .build(); + + ThrottlingGetDataMetricTracker getDataMetricTracker = + new ThrottlingGetDataMetricTracker(memoryMonitor); + + WindmillStreamPool<GetDataStream> getDataStreamPool = Review Comment: move into the if (windmillServiceEnabled) below it's not used otherwise and windmillServer::getDataStream isn't valid in non-SE. could move the heartbeat initialization into the same if/else as well ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.WindmillComputationKey; +import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeats; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Appliance implementation of {@link GetDataClient}. */ +@Internal +@ThreadSafe +public final class ApplianceGetDataClient implements GetDataClient, WorkRefreshClient { + private static final int MAX_READS_PER_BATCH = 60; + private static final int MAX_ACTIVE_READS = 10; + + private final ApplianceWindmillClient windmillClient; + private final ThrottlingGetDataMetricTracker getDataMetricTracker; + + @GuardedBy("this") + private final List<ReadBatch> pendingReadBatches; + + @GuardedBy("this") + private int activeReadThreads; + + public ApplianceGetDataClient( + ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker getDataMetricTracker) { + this.windmillClient = windmillClient; + this.getDataMetricTracker = getDataMetricTracker; + this.pendingReadBatches = new ArrayList<>(); + this.activeReadThreads = 0; + } + + public static GetDataClient create( + ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker getDataMetricTracker) { + return new ApplianceGetDataClient(windmillClient, getDataMetricTracker); + } + + @Override + public Windmill.KeyedGetDataResponse getStateData( + String computation, Windmill.KeyedGetDataRequest request) { + try (AutoCloseable ignored = + getDataMetricTracker.trackSingleCallWithThrottling( + ThrottlingGetDataMetricTracker.Type.STATE)) { + SettableFuture<Windmill.KeyedGetDataResponse> response = SettableFuture.create(); + ReadBatch batch = addToReadBatch(new QueueEntry(computation, request, response)); + if (batch != null) { + issueReadBatch(batch); + } + return response.get(); + } catch (Exception e) { + throw new GetDataException( + "Error occurred fetching state for computation=" + + computation + + ", key=" + + request.getShardingKey(), + e); + } + } + + @Override + public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) { + try (AutoCloseable ignored = + getDataMetricTracker.trackSingleCallWithThrottling( + ThrottlingGetDataMetricTracker.Type.STATE)) { Review Comment: this shoudl be SIDE_INPUT since we just have STATE and SIDE_INPUT, I wonder if it wuold be less error prone to have the public methods on the tracker be trackGetDataWithThrottling and trackGetSideInputWithThrottling instead of the enum. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java: ########## @@ -32,15 +33,37 @@ /** Superclass for streams returned by streaming Windmill methods. */ @ThreadSafe public interface WindmillStream { + + /** An identifier for the backend worker where the stream is sending/receiving RPCs. */ + String backendWorkerToken(); + /** Indicates that no more requests will be sent. */ - void close(); + void halfClose(); /** Waits for the server to close its end of the connection, with timeout. */ boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException; /** Returns when the stream was opened. */ Instant startTime(); + /** + * Shutdown the stream. There should be no further interactions with the stream once this has been + * called. + */ + void shutdown(); + + /** Reflects that {@link #shutdown()} was explicitly called. */ + boolean isShutdown(); + + Type streamType(); Review Comment: why is this needed? I don't see any callers and generally it is a code-smell because it means that if we add a type we have to update scattered usages of the type where if instead whatever callers are switching on the type of was part of the interface everything is grouped. https://code-smells.com/object-orientation-abusers/switch-statement ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/sideinput/SideInputStateFetcher.java: ########## @@ -64,13 +62,6 @@ public class SideInputStateFetcher { private final Function<GlobalDataRequest, GlobalData> fetchGlobalDataFn; private long bytesRead = 0L; - public SideInputStateFetcher( - Function<GlobalDataRequest, GlobalData> fetchGlobalDataFn, - DataflowStreamingPipelineOptions options) { - this(fetchGlobalDataFn, SideInputCache.create(options)); - } - - @VisibleForTesting SideInputStateFetcher( Review Comment: should this be public? or could the class itself be package private? mark class Internal ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/FanOutWorkRefreshClient.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeats; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link WorkRefreshClient} that fans out heartbeats to all {@link HeartbeatSender}(s) in parallel + * passed into {@link #refreshActiveWork(Map)} + */ +@Internal +public final class FanOutWorkRefreshClient implements WorkRefreshClient { + private static final Logger LOG = LoggerFactory.getLogger(FanOutWorkRefreshClient.class); + private static final String FAN_OUT_REFRESH_WORK_EXECUTOR_NAME = + "FanOutActiveWorkRefreshExecutor"; Review Comment: I think the format allows for specifying a number for number of thread in executor, might as well add it if it might help debug something. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/DirectGetDataClient.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; + +/** {@link GetDataClient} that fetches data directly from a specific {@link GetDataStream}. */ +@Internal +public final class DirectGetDataClient implements GetDataClient { + + private final GetDataStream directGetDataStream; + private final Function<String, GetDataStream> sideInputGetDataStreamFactory; + private final ThrottlingGetDataMetricTracker getDataMetricTracker; + + private DirectGetDataClient( + GetDataStream directGetDataStream, + Function<String, GetDataStream> sideInputGetDataStreamFactory, + ThrottlingGetDataMetricTracker getDataMetricTracker) { + this.directGetDataStream = directGetDataStream; + this.sideInputGetDataStreamFactory = sideInputGetDataStreamFactory; + this.getDataMetricTracker = getDataMetricTracker; + } + + public static GetDataClient create( + GetDataStream getDataStream, + Function<String, GetDataStream> sideInputGetDataStreamFactory, + ThrottlingGetDataMetricTracker getDataMetricTracker) { + return new DirectGetDataClient( + getDataStream, sideInputGetDataStreamFactory, getDataMetricTracker); + } + + @Override + public Windmill.KeyedGetDataResponse getStateData( + String computation, Windmill.KeyedGetDataRequest request) { + if (directGetDataStream.isShutdown()) { + throw new WorkItemCancelledException(request.getShardingKey()); + } + + try (AutoCloseable ignored = + getDataMetricTracker.trackSingleCallWithThrottling( + ThrottlingGetDataMetricTracker.Type.STATE)) { + return directGetDataStream.requestKeyedData(computation, request); + } catch (Exception e) { + if (directGetDataStream.isShutdown()) { + throw new WorkItemCancelledException(request.getShardingKey()); + } + + throw new GetDataException( + "Error occurred fetching state for computation=" + + computation + + ", key=" + + request.getShardingKey(), + e); + } + } + + @Override + public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) { + GetDataStream sideInputGetDataStream = + sideInputGetDataStreamFactory.apply(request.getDataId().getTag()); + if (sideInputGetDataStream.isShutdown()) { Review Comment: ditto, rm and just rely on below ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/FanOutWorkRefreshClient.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeats; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link WorkRefreshClient} that fans out heartbeats to all {@link HeartbeatSender}(s) in parallel + * passed into {@link #refreshActiveWork(Map)} + */ +@Internal +public final class FanOutWorkRefreshClient implements WorkRefreshClient { + private static final Logger LOG = LoggerFactory.getLogger(FanOutWorkRefreshClient.class); + private static final String FAN_OUT_REFRESH_WORK_EXECUTOR_NAME = + "FanOutActiveWorkRefreshExecutor"; + + private final ThrottlingGetDataMetricTracker getDataMetricTracker; + private final ExecutorService fanOutActiveWorkRefreshExecutor; + + public FanOutWorkRefreshClient(ThrottlingGetDataMetricTracker getDataMetricTracker) { + this.getDataMetricTracker = getDataMetricTracker; + this.fanOutActiveWorkRefreshExecutor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + // FanOutWorkRefreshClient runs as a background process, don't let failures crash + // the worker. + .setUncaughtExceptionHandler( + (t, e) -> LOG.error("Unexpected failure in {}", t.getName(), e)) + .setNameFormat(FAN_OUT_REFRESH_WORK_EXECUTOR_NAME) + .build()); + } + + @Override + public void refreshActiveWork(Map<HeartbeatSender, Heartbeats> heartbeats) { Review Comment: see other comment for details But here I think if there is just a single heartbeatsender you should run inline without having to use the executor since we're blocking anyway ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ThrottlingGetDataMetricTracker.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import com.google.auto.value.AutoValue; +import java.io.PrintWriter; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; + +/** + * Wraps GetData calls that tracks metrics for the number of in-flight requests and throttles + * requests when memory pressure is high. + */ +@Internal +@ThreadSafe +public final class ThrottlingGetDataMetricTracker { + private final MemoryMonitor gcThrashingMonitor; + private final GetDataMetrics getDataMetrics; + + public ThrottlingGetDataMetricTracker(MemoryMonitor gcThrashingMonitor) { + this.gcThrashingMonitor = gcThrashingMonitor; + this.getDataMetrics = GetDataMetrics.create(); + } + + /** + * Tracks a GetData call. If there is memory pressure, may throttle requests. Returns an {@link + * AutoCloseable} that will decrement the metric after the call is finished. + */ + public AutoCloseable trackSingleCallWithThrottling(Type callType) { + gcThrashingMonitor.waitForResources(callType.debugName); + AtomicInteger getDataMetricTracker = getDataMetrics.getMetricFor(callType); + getDataMetricTracker.getAndIncrement(); + return getDataMetricTracker::getAndDecrement; + } + + /** + * Tracks heartbeat request metrics. Returns an {@link AutoCloseable} that will decrement the + * metric after the call is finished. + */ + public AutoCloseable trackHeartbeats(int numHeartbeats) { + getDataMetrics + .activeHeartbeats() + .getAndUpdate(currentActiveHeartbeats -> currentActiveHeartbeats + numHeartbeats); + return () -> + getDataMetrics.activeHeartbeats().getAndUpdate(existing -> existing - numHeartbeats); + } + + public void printHtml(PrintWriter writer) { + writer.println("Active Fetches:"); + getDataMetrics.printMetrics(writer); + } + + @VisibleForTesting + GetDataMetrics.ReadOnlySnapshot getMetricsSnapshot() { + return getDataMetrics.snapshot(); + } + + public enum Type { + STATE("GetStateData"), + SIDE_INPUT("GetSideInputData"), + HEARTBEAT("RefreshActiveWork"); + private final String debugName; + + Type(String debugName) { + this.debugName = debugName; + } + + public final String debugName() { + return debugName; + } + } + + @AutoValue + abstract static class GetDataMetrics { Review Comment: I don't think this adds much since nothing is ever set differently and ThrottlingGetDataMetricTracker is already a pretty focused class. It would be less code to just inline the AtomicIntegers as members in the tracker ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.WindmillComputationKey; +import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeats; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Appliance implementation of {@link GetDataClient}. */ +@Internal +@ThreadSafe +public final class ApplianceGetDataClient implements GetDataClient, WorkRefreshClient { + private static final int MAX_READS_PER_BATCH = 60; + private static final int MAX_ACTIVE_READS = 10; + + private final ApplianceWindmillClient windmillClient; + private final ThrottlingGetDataMetricTracker getDataMetricTracker; + + @GuardedBy("this") + private final List<ReadBatch> pendingReadBatches; + + @GuardedBy("this") + private int activeReadThreads; + + public ApplianceGetDataClient( + ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker getDataMetricTracker) { + this.windmillClient = windmillClient; + this.getDataMetricTracker = getDataMetricTracker; + this.pendingReadBatches = new ArrayList<>(); + this.activeReadThreads = 0; + } + + public static GetDataClient create( + ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker getDataMetricTracker) { + return new ApplianceGetDataClient(windmillClient, getDataMetricTracker); + } + + @Override + public Windmill.KeyedGetDataResponse getStateData( + String computation, Windmill.KeyedGetDataRequest request) { + try (AutoCloseable ignored = + getDataMetricTracker.trackSingleCallWithThrottling( + ThrottlingGetDataMetricTracker.Type.STATE)) { + SettableFuture<Windmill.KeyedGetDataResponse> response = SettableFuture.create(); + ReadBatch batch = addToReadBatch(new QueueEntry(computation, request, response)); + if (batch != null) { + issueReadBatch(batch); + } + return response.get(); + } catch (Exception e) { + throw new GetDataException( + "Error occurred fetching state for computation=" + + computation + + ", key=" + + request.getShardingKey(), + e); + } + } + + @Override + public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) { + try (AutoCloseable ignored = + getDataMetricTracker.trackSingleCallWithThrottling( + ThrottlingGetDataMetricTracker.Type.STATE)) { + return windmillClient + .getData(Windmill.GetDataRequest.newBuilder().addGlobalDataFetchRequests(request).build()) + .getGlobalData(0); + } catch (Exception e) { + throw new GetDataException( + "Error occurred fetching side input for tag=" + request.getDataId(), e); + } + } + + /** + * Appliance sends heartbeats (used to refresh active work) as KeyedGetDataRequests. So we must + * translate the HeartbeatRequest to a KeyedGetDataRequest. + */ + @Override + public void refreshActiveWork(Map<HeartbeatSender, Heartbeats> heartbeats) { + Map.Entry<HeartbeatSender, Heartbeats> heartbeat = + Iterables.getOnlyElement(heartbeats.entrySet()); + HeartbeatSender heartbeatSender = heartbeat.getKey(); + Heartbeats heartbeatToSend = heartbeat.getValue(); + + if (heartbeatToSend.heartbeatRequests().isEmpty()) { + return; + } + + try (AutoCloseable ignored = getDataMetricTracker.trackHeartbeats(heartbeatToSend.size())) { + heartbeatSender.sendHeartbeats(heartbeatToSend); + } catch (Exception e) { + throw new GetDataException("Error occurred refreshing heartbeats=" + heartbeatToSend, e); + } + } + + @Override + public synchronized void printHtml(PrintWriter writer) { + getDataMetricTracker.printHtml(writer); + writer.println(" Read threads: " + activeReadThreads); + writer.println(" Pending read batches: " + pendingReadBatches.size()); + } + + private void issueReadBatch(ReadBatch batch) { + try { + Preconditions.checkState(batch.startRead.get()); Review Comment: // Possibly block until the batch is allowed to start. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/DirectGetDataClient.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; + +/** {@link GetDataClient} that fetches data directly from a specific {@link GetDataStream}. */ +@Internal +public final class DirectGetDataClient implements GetDataClient { Review Comment: nit: instead of Direct how about StreamGetDataClient? and instead of StreamingEngineGetDataClient it is StreamPoolGetDataClient? direct is a little confusing if it is going to dispatcher but with a token, and StreamingEngineGetDataClient is confusing since this is also streaming engine. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/DirectGetDataClient.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; + +/** {@link GetDataClient} that fetches data directly from a specific {@link GetDataStream}. */ +@Internal +public final class DirectGetDataClient implements GetDataClient { + + private final GetDataStream directGetDataStream; + private final Function<String, GetDataStream> sideInputGetDataStreamFactory; + private final ThrottlingGetDataMetricTracker getDataMetricTracker; + + private DirectGetDataClient( + GetDataStream directGetDataStream, + Function<String, GetDataStream> sideInputGetDataStreamFactory, + ThrottlingGetDataMetricTracker getDataMetricTracker) { + this.directGetDataStream = directGetDataStream; + this.sideInputGetDataStreamFactory = sideInputGetDataStreamFactory; + this.getDataMetricTracker = getDataMetricTracker; + } + + public static GetDataClient create( + GetDataStream getDataStream, + Function<String, GetDataStream> sideInputGetDataStreamFactory, + ThrottlingGetDataMetricTracker getDataMetricTracker) { + return new DirectGetDataClient( + getDataStream, sideInputGetDataStreamFactory, getDataMetricTracker); + } + + @Override + public Windmill.KeyedGetDataResponse getStateData( + String computation, Windmill.KeyedGetDataRequest request) { + if (directGetDataStream.isShutdown()) { Review Comment: rm and just rely on requestKeyedData throwing exception. It's racy to check here so the request below would have to handle it anyway ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java: ########## @@ -101,25 +105,33 @@ private GrpcDirectGetWorkStream( Set<AbstractWindmillStream<?, ?>> streamRegistry, int logEveryNStreamFailures, ThrottleTimer getWorkThrottleTimer, - Supplier<GetDataStream> getDataStream, + Supplier<HeartbeatSender> heartbeatSender, + Supplier<GetDataClient> getDataClient, Supplier<WorkCommitter> workCommitter, WorkItemScheduler workItemScheduler) { super( - startGetWorkRpcFn, backoff, streamObserverFactory, streamRegistry, logEveryNStreamFailures); + startGetWorkRpcFn, + backoff, + streamObserverFactory, + streamRegistry, + logEveryNStreamFailures, + backendWorkerToken); this.request = request; this.getWorkThrottleTimer = getWorkThrottleTimer; this.workItemScheduler = workItemScheduler; this.workItemBuffers = new ConcurrentHashMap<>(); // Use the same GetDataStream and CommitWorkStream instances to process all the work in this Review Comment: rm comment, taken care at higher level ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java: ########## @@ -103,5 +115,32 @@ private void invalidateStuckCommits() { } } - protected abstract void refreshActiveWork(); + private void refreshActiveWork() { + Instant refreshDeadline = clock.get().minus(Duration.millis(activeWorkRefreshPeriodMillis)); + + Map<HeartbeatSender, Heartbeats.Builder> heartbeatsBySender = new HashMap<>(); + + // Aggregate the heartbeats across computations by HeartbeatSender for correct fan out. + for (ComputationState computationState : computations.get()) { + for (RefreshableWork work : computationState.getRefreshableWork(refreshDeadline)) { + heartbeatsBySender + .computeIfAbsent(work.heartbeatSender(), ignored -> Heartbeats.builder()) + .addWork(work) Review Comment: the chaining confused me here since I thought the heartbeat requests were scoped to the work added could createHeartbeatRequest be moved into HeartbeatsBuilder.addWork() and addHeartbeatRequest be removed? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.getdata; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.WindmillComputationKey; +import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeats; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Appliance implementation of {@link GetDataClient}. */ +@Internal +@ThreadSafe +public final class ApplianceGetDataClient implements GetDataClient, WorkRefreshClient { + private static final int MAX_READS_PER_BATCH = 60; + private static final int MAX_ACTIVE_READS = 10; + + private final ApplianceWindmillClient windmillClient; + private final ThrottlingGetDataMetricTracker getDataMetricTracker; + + @GuardedBy("this") + private final List<ReadBatch> pendingReadBatches; + + @GuardedBy("this") + private int activeReadThreads; + + public ApplianceGetDataClient( + ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker getDataMetricTracker) { + this.windmillClient = windmillClient; + this.getDataMetricTracker = getDataMetricTracker; + this.pendingReadBatches = new ArrayList<>(); + this.activeReadThreads = 0; + } + + public static GetDataClient create( + ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker getDataMetricTracker) { + return new ApplianceGetDataClient(windmillClient, getDataMetricTracker); + } + + @Override + public Windmill.KeyedGetDataResponse getStateData( + String computation, Windmill.KeyedGetDataRequest request) { + try (AutoCloseable ignored = + getDataMetricTracker.trackSingleCallWithThrottling( + ThrottlingGetDataMetricTracker.Type.STATE)) { + SettableFuture<Windmill.KeyedGetDataResponse> response = SettableFuture.create(); + ReadBatch batch = addToReadBatch(new QueueEntry(computation, request, response)); + if (batch != null) { + issueReadBatch(batch); + } + return response.get(); + } catch (Exception e) { + throw new GetDataException( + "Error occurred fetching state for computation=" + + computation + + ", key=" + + request.getShardingKey(), + e); + } + } + + @Override + public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) { + try (AutoCloseable ignored = + getDataMetricTracker.trackSingleCallWithThrottling( + ThrottlingGetDataMetricTracker.Type.STATE)) { + return windmillClient + .getData(Windmill.GetDataRequest.newBuilder().addGlobalDataFetchRequests(request).build()) + .getGlobalData(0); + } catch (Exception e) { + throw new GetDataException( + "Error occurred fetching side input for tag=" + request.getDataId(), e); + } + } + + /** + * Appliance sends heartbeats (used to refresh active work) as KeyedGetDataRequests. So we must + * translate the HeartbeatRequest to a KeyedGetDataRequest. + */ + @Override + public void refreshActiveWork(Map<HeartbeatSender, Heartbeats> heartbeats) { + Map.Entry<HeartbeatSender, Heartbeats> heartbeat = + Iterables.getOnlyElement(heartbeats.entrySet()); + HeartbeatSender heartbeatSender = heartbeat.getKey(); + Heartbeats heartbeatToSend = heartbeat.getValue(); + + if (heartbeatToSend.heartbeatRequests().isEmpty()) { + return; + } + + try (AutoCloseable ignored = getDataMetricTracker.trackHeartbeats(heartbeatToSend.size())) { + heartbeatSender.sendHeartbeats(heartbeatToSend); + } catch (Exception e) { + throw new GetDataException("Error occurred refreshing heartbeats=" + heartbeatToSend, e); + } + } + + @Override + public synchronized void printHtml(PrintWriter writer) { + getDataMetricTracker.printHtml(writer); + writer.println(" Read threads: " + activeReadThreads); + writer.println(" Pending read batches: " + pendingReadBatches.size()); + } + + private void issueReadBatch(ReadBatch batch) { + try { + Preconditions.checkState(batch.startRead.get()); + } catch (InterruptedException e) { + // We don't expect this thread to be interrupted. To simplify handling, we just fall through + // to issuing the call. + assert (false); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + // startRead is a SettableFuture so this should never occur. + throw new AssertionError("Should not have exception on startRead", e); + } + Map<WindmillComputationKey, SettableFuture<Windmill.KeyedGetDataResponse>> pendingResponses = + new HashMap<>(batch.reads.size()); + Map<String, ComputationGetDataRequest.Builder> computationBuilders = new HashMap<>(); + for (QueueEntry entry : batch.reads) { + ComputationGetDataRequest.Builder computationBuilder = + computationBuilders.computeIfAbsent( + entry.computation, k -> ComputationGetDataRequest.newBuilder().setComputationId(k)); + + computationBuilder.addRequests(entry.request); + pendingResponses.put( + WindmillComputationKey.create( + entry.computation, entry.request.getKey(), entry.request.getShardingKey()), + entry.response); + } + + // Build the full GetDataRequest from the KeyedGetDataRequests pulled from the queue. + Windmill.GetDataRequest.Builder builder = Windmill.GetDataRequest.newBuilder(); + for (ComputationGetDataRequest.Builder computationBuilder : computationBuilders.values()) { + builder.addRequests(computationBuilder); + } + + try { + Windmill.GetDataResponse response = windmillClient.getData(builder.build()); + // Dispatch the per-key responses back to the waiting threads. + for (Windmill.ComputationGetDataResponse computationResponse : response.getDataList()) { + for (Windmill.KeyedGetDataResponse keyResponse : computationResponse.getDataList()) { + pendingResponses + .get( + WindmillComputationKey.create( + computationResponse.getComputationId(), + keyResponse.getKey(), + keyResponse.getShardingKey())) + .set(keyResponse); + } + } + } catch (RuntimeException e) { + // Fan the exception out to the reads. + for (QueueEntry entry : batch.reads) { + entry.response.setException(e); + } + } finally { + synchronized (this) { + Preconditions.checkState(activeReadThreads >= 1); + if (pendingReadBatches.isEmpty()) { + activeReadThreads--; + } else { + // Notify the thread responsible for issuing the next batch read. + ReadBatch startBatch = pendingReadBatches.remove(0); + startBatch.startRead.set(true); + } + } + } + } + + /** + * Adds the entry to a read batch for sending to the windmill server. If a non-null batch is + * returned, this thread will be responsible for sending the batch and should wait for the batch + * startRead to be notified. If null is returned, the entry was added to a read batch that will be + * issued by another thread. + */ + private @Nullable ReadBatch addToReadBatch(QueueEntry entry) { + synchronized (this) { + ReadBatch batch; + if (activeReadThreads < MAX_ACTIVE_READS) { + assert (pendingReadBatches.isEmpty()); + activeReadThreads += 1; + // fall through to below synchronized block + } else if (pendingReadBatches.isEmpty() + || pendingReadBatches.get(pendingReadBatches.size() - 1).reads.size() + >= MAX_READS_PER_BATCH) { + // This is the first read of a batch, it will be responsible for sending the batch. + batch = new ReadBatch(); + pendingReadBatches.add(batch); + batch.reads.add(entry); + return batch; + } else { + // This fits within an existing batch, it will be sent by the first blocking thread in the + // batch. + pendingReadBatches.get(pendingReadBatches.size() - 1).reads.add(entry); + return null; + } + } + ReadBatch batch = new ReadBatch(); + batch.reads.add(entry); + batch.startRead.set(true); + return batch; + } + + private static final class ReadBatch { + ArrayList<QueueEntry> reads = new ArrayList<>(); + SettableFuture<Boolean> startRead = SettableFuture.create(); Review Comment: nit: does SettableFuture<Void> work? we never set to false, we're just using the future to trigger. Void would capture the value doesn't matter better. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/FixedStreamHeartbeatSender.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.refresh; + +import java.util.Objects; +import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; +import org.apache.beam.sdk.annotations.Internal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link HeartbeatSender} implementation that sends heartbeats directly on the underlying stream if + * the stream is not closed. + * + * @implNote + * <p>{@link #equals(Object)} and {@link #hashCode()} implementations delegate to internal + * {@link GetDataStream} implementations so that requests can be grouped and sent on the same + * stream instance. + * <p>This class is a stateless decorator to the underlying stream. + */ +@Internal +public final class FixedStreamHeartbeatSender implements HeartbeatSender { + private static final Logger LOG = LoggerFactory.getLogger(FixedStreamHeartbeatSender.class); + private final GetDataStream getDataStream; + + private FixedStreamHeartbeatSender(GetDataStream getDataStream) { + this.getDataStream = getDataStream; + } + + public static FixedStreamHeartbeatSender create(GetDataStream getDataStream) { + return new FixedStreamHeartbeatSender(getDataStream); + } + + @Override + public void sendHeartbeats(Heartbeats heartbeats) { + if (getDataStream.isShutdown()) { Review Comment: instead of isShutdown check shoudl it be try/catch around getDataStream.refreshActiveWork? That would also log and fail the work items in the case we started refreshing but then stream was shutdown. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java: ########## @@ -85,7 +88,7 @@ public static StreamingEngineWorkCommitter create( @Override @SuppressWarnings("FutureReturnValueIgnored") public void start() { - if (!commitSenders.isShutdown()) { + if (isRunning.compareAndSet(false, true) && !commitSenders.isShutdown()) { Review Comment: I think for some things like this which we expect to just be started once and live for ever, it can be simpler to just prevent some behavior instead of trying to support it. If start can be called multiple times, one caller could stop while the other doesn't but hte end state is very different start1/start2/stop1 -> stopped start1/stop1/start2 -> started So it seems safer to just prevent it since we only really expect these methods to be called by the main startup path and then in teardown for tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org