This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cbe2b9e2dde [#31926] [java] call provision service when creating 
external workers. (#32198)
cbe2b9e2dde is described below

commit cbe2b9e2dded772c3c114f1c1994f31301849ceb
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Thu Aug 15 15:59:21 2024 -0700

    [#31926] [java] call provision service when creating external workers. 
(#32198)
    
    * [#31926] [java] call provision service when creating external workers.
    
    * spotless apply
    
    ---------
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 .../beam/fn/harness/ExternalWorkerService.java     | 44 +++++++++++++++++++---
 1 file changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java
index 8ef3f96b00b..dc42f7e1c78 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ExternalWorkerService.java
@@ -20,12 +20,17 @@ package org.apache.beam.fn.harness;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
 import java.util.Collections;
+import java.util.Set;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerResponse;
 import 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase;
+import org.apache.beam.model.fnexecution.v1.ProvisionApi;
+import org.apache.beam.model.fnexecution.v1.ProvisionServiceGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.channel.AddHarnessIdInterceptor;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.sdk.fn.server.FnService;
 import org.apache.beam.sdk.fn.server.GrpcFnServer;
 import org.apache.beam.sdk.fn.server.ServerFactory;
@@ -35,6 +40,8 @@ import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.util.construction.Environments;
 import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,17 +68,42 @@ public class ExternalWorkerService extends 
BeamFnExternalWorkerPoolImplBase impl
         request.getWorkerId(),
         request.getControlEndpoint().getUrl());
     LOG.debug("Worker request {}.", request);
+
+    Endpoints.ApiServiceDescriptor loggingEndpoint = 
request.getLoggingEndpoint();
+    Endpoints.ApiServiceDescriptor controlEndpoint = 
request.getControlEndpoint();
+    Set<String> runnerCapabilites = Collections.emptySet();
+    if (request.hasProvisionEndpoint()) {
+      ManagedChannelFactory channelFactory =
+          ManagedChannelFactory.createDefault()
+              .withInterceptors(
+                  
ImmutableList.of(AddHarnessIdInterceptor.create(request.getWorkerId())));
+
+      ProvisionServiceGrpc.ProvisionServiceBlockingStub provisionStub =
+          ProvisionServiceGrpc.newBlockingStub(
+              channelFactory.forDescriptor(request.getProvisionEndpoint()));
+      ProvisionApi.ProvisionInfo provisionInfo =
+          provisionStub
+              
.getProvisionInfo(ProvisionApi.GetProvisionInfoRequest.newBuilder().build())
+              .getInfo();
+
+      runnerCapabilites = 
Sets.newHashSet(provisionInfo.getRunnerCapabilitiesList());
+      if (provisionInfo.hasControlEndpoint()) {
+        controlEndpoint = provisionInfo.getControlEndpoint();
+      }
+      if (provisionInfo.hasLoggingEndpoint()) {
+        loggingEndpoint = provisionInfo.getLoggingEndpoint();
+      }
+    }
+    // Lambda closured variables must be final.
+    final Endpoints.ApiServiceDescriptor logEndpoint = loggingEndpoint;
+    final Endpoints.ApiServiceDescriptor ctrlEndpoint = controlEndpoint;
+    final Set<String> capabilities = runnerCapabilites;
     Thread th =
         new Thread(
             () -> {
               try {
                 FnHarness.main(
-                    request.getWorkerId(),
-                    options,
-                    Collections.emptySet(),
-                    request.getLoggingEndpoint(),
-                    request.getControlEndpoint(),
-                    null);
+                    request.getWorkerId(), options, capabilities, logEndpoint, 
ctrlEndpoint, null);
                 LOG.info("Successfully started worker {}.", 
request.getWorkerId());
               } catch (Exception exn) {
                 LOG.error(String.format("Failed to start worker %s.", 
request.getWorkerId()), exn);

Reply via email to