scwhittle commented on code in PR #28835: URL: https://github.com/apache/beam/pull/28835#discussion_r1352262902
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/WindmillChannelFactory.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.util; + +import java.net.Inet6Address; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLException; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.GrpcSslContexts; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NegotiationType; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyChannelBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; + +/** Utilities for creating {@link Channel} for gRPC stubs. */ +public final class WindmillChannelFactory { Review Comment: move refactoring to separate PR? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/VendoredCredentialsAdapter.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.util; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.Credentials; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +/** + * Create a wrapper around credentials that delegates to the underlying {@link + * com.google.auth.Credentials}. Note that this class should override every method that is not final + * and not static and call the delegate directly. + * + * <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation + * delegate to reduce maintenance burden. + */ +class VendoredCredentialsAdapter extends Credentials { Review Comment: move to refactoring PR? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java: ########## @@ -67,12 +70,33 @@ public abstract class WindmillServerStub implements StatusDataProvider { public abstract GetWorkStream getWorkStream( Windmill.GetWorkRequest request, WorkItemReceiver receiver); + public GetWorkStream getWorkStream( + CloudWindmillServiceV1Alpha1Stub stub, Review Comment: these don't make sense to me on this interface. IsReady/setWindmillServiceEndpoints etc don't have effect if we are passing in the stub. It seems like instead perhaps the WindmillStreamSender could just create streams itself instead of going through this class? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill; + +import com.google.auto.value.AutoValue; +import java.util.Optional; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; + +@AutoValue +public abstract class WindmillConnection { Review Comment: Do we need this class? It seems like we can just have a map from Endpoint to streams based upon the endpoint with a shared stub without needing to keep this object around. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.budget; + +import com.google.auto.value.AutoValue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** + * Budget of items and bytes for fetching {@link + * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via {@link + * org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}. Used to control + * how "much" work is returned from Windmill. + */ +@AutoValue +public abstract class GetWorkBudget { + public static GetWorkBudget.Builder builder() { + return new AutoValue_GetWorkBudget.Builder(); + } + + /** {@link GetWorkBudget} of 0. */ + public static GetWorkBudget noBudget() { + return builder().setItems(0).setBytes(0).build(); + } + + /** + * Adds the given bytes and items or the current budget, returning a new {@link GetWorkBudget}. + */ + public GetWorkBudget add(long items, long bytes) { + return GetWorkBudget.builder().setBytes(bytes() + bytes).setItems(items() + items).build(); + } + + /** + * Subtracts the given bytes and items or the current budget, returning a new {@link + * GetWorkBudget}. + */ + public GetWorkBudget subtract(long items, long bytes) { + return GetWorkBudget.builder().setBytes(bytes() - bytes).setItems(items() - items).build(); Review Comment: do we want to go negative? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java: ########## @@ -141,6 +142,7 @@ private Optional<WindmillEndpoints> extractWindmillEndpointsFrom( @Override protected synchronized void onNewStream() { + LOG.info("New GetWorkerMetadataStream started for {}", workerMetadataRequest); Review Comment: this is going to happen every 5 minutes which seems a little spammy I think seeing this on the status page woudl be enough since we capture it. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/ProcessWorkItemClient.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work; + +import com.google.api.services.dataflow.model.WorkItem; +import java.io.PrintWriter; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import org.apache.beam.runners.dataflow.worker.streaming.Commit; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalData; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse; + +/** + * A client context to process {@link WorkItem} and route all subsequent Windmill WorkItem API calls + * to the same backend worker. Wraps the {@link WorkItem}. + */ +public interface ProcessWorkItemClient { + + /** {@link WorkItem} being processed. */ + WorkItem workItem(); + + /** Refreshes the active work for each computation owned by the backend Windmill worker. */ + void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> activeWorkPerComputation); Review Comment: I don't think that this should be part of the per-work item client. The information related to this particular workItem for refreshing is the computation+work_token. The batching is across many work items and done at a higher level. Since KeyedGetDataRequest/Commit contain key+work_token, they are not necessarily tied to the workItem by the interface. An alternative would be to change this to expose the streams instead of wrapping the stream methods. WorkItem workItem(); GetDataStream getDataStream(); CommitWorkStream commitWorkStream(); Then the streams can be used directly for getting data or committing. For heartbeats, we can iterate over all the clients building batches by their corresponding GetDataStream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
