This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cbe2b9e2dde [#31926] [java] call provision service when creating external workers. (#32198) cbe2b9e2dde is described below commit cbe2b9e2dded772c3c114f1c1994f31301849ceb Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Thu Aug 15 15:59:21 2024 -0700 [#31926] [java] call provision service when creating external workers. (#32198) * [#31926] [java] call provision service when creating external workers. * spotless apply --------- Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com> --- .../beam/fn/harness/ExternalWorkerService.java | 44 +++++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java index 8ef3f96b00b..dc42f7e1c78 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java @@ -20,12 +20,17 @@ package org.apache.beam.fn.harness; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import java.util.Collections; +import java.util.Set; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerResponse; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerResponse; import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase; +import org.apache.beam.model.fnexecution.v1.ProvisionApi; +import org.apache.beam.model.fnexecution.v1.ProvisionServiceGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.fn.channel.AddHarnessIdInterceptor; +import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; import org.apache.beam.sdk.fn.server.FnService; import org.apache.beam.sdk.fn.server.GrpcFnServer; import org.apache.beam.sdk.fn.server.ServerFactory; @@ -35,6 +40,8 @@ import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.util.construction.Environments; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,17 +68,42 @@ public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase impl request.getWorkerId(), request.getControlEndpoint().getUrl()); LOG.debug("Worker request {}.", request); + + Endpoints.ApiServiceDescriptor loggingEndpoint = request.getLoggingEndpoint(); + Endpoints.ApiServiceDescriptor controlEndpoint = request.getControlEndpoint(); + Set<String> runnerCapabilites = Collections.emptySet(); + if (request.hasProvisionEndpoint()) { + ManagedChannelFactory channelFactory = + ManagedChannelFactory.createDefault() + .withInterceptors( + ImmutableList.of(AddHarnessIdInterceptor.create(request.getWorkerId()))); + + ProvisionServiceGrpc.ProvisionServiceBlockingStub provisionStub = + ProvisionServiceGrpc.newBlockingStub( + channelFactory.forDescriptor(request.getProvisionEndpoint())); + ProvisionApi.ProvisionInfo provisionInfo = + provisionStub + .getProvisionInfo(ProvisionApi.GetProvisionInfoRequest.newBuilder().build()) + .getInfo(); + + runnerCapabilites = Sets.newHashSet(provisionInfo.getRunnerCapabilitiesList()); + if (provisionInfo.hasControlEndpoint()) { + controlEndpoint = provisionInfo.getControlEndpoint(); + } + if (provisionInfo.hasLoggingEndpoint()) { + loggingEndpoint = provisionInfo.getLoggingEndpoint(); + } + } + // Lambda closured variables must be final. + final Endpoints.ApiServiceDescriptor logEndpoint = loggingEndpoint; + final Endpoints.ApiServiceDescriptor ctrlEndpoint = controlEndpoint; + final Set<String> capabilities = runnerCapabilites; Thread th = new Thread( () -> { try { FnHarness.main( - request.getWorkerId(), - options, - Collections.emptySet(), - request.getLoggingEndpoint(), - request.getControlEndpoint(), - null); + request.getWorkerId(), options, capabilities, logEndpoint, ctrlEndpoint, null); LOG.info("Successfully started worker {}.", request.getWorkerId()); } catch (Exception exn) { LOG.error(String.format("Failed to start worker %s.", request.getWorkerId()), exn);