This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f6b00a2  [BEAM-8137] Add Main method to ExternalWorkerService (#14942)
f6b00a2 is described below

commit f6b00a2fa1fef920ff1e034b85efd65314c0e8b2
Author: Ke Wu <k...@linkedin.com>
AuthorDate: Wed Jun 9 15:30:51 2021 -0700

    [BEAM-8137] Add Main method to ExternalWorkerService (#14942)
    
    * [BEAM-8137] Add Main method to ExternalWorkerService
    
    1. Add main method to ExternalWorkerService to support launching worker 
pool from Java SDK Container.
    
    * Add more javadoc/comments
    
    * Add more javadoc/comments
    
    * Update to sleep for Long.MAX_VALUE
    
    * Update format
    
    * Update LOG.error
    
    * Fix checker error
    
    * Update error message
    
    Co-authored-by: Ke Wu <k...@kwu-mn1.linkedin.biz>
---
 .../runners/core/construction/Environments.java    | 20 ++++----
 .../runners/portability/ExternalWorkerService.java | 60 +++++++++++++++++++++-
 2 files changed, 68 insertions(+), 12 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index d7ac42e..473ae4e 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -404,6 +404,16 @@ public class Environments {
     return String.format("%s-%s%s", fileName, encodedHash, suffix);
   }
 
+  public static String getExternalServiceAddress(PortablePipelineOptions 
options) {
+    String environmentConfig = options.getDefaultEnvironmentConfig();
+    String environmentOption =
+        PortablePipelineOptions.getEnvironmentOption(options, 
externalServiceAddressOption);
+    if (environmentConfig != null && !environmentConfig.isEmpty()) {
+      return environmentConfig;
+    }
+    return environmentOption;
+  }
+
   private static File zipDirectory(File directory) throws IOException {
     File zipFile = File.createTempFile(directory.getName(), ".zip");
     try (FileOutputStream fos = new FileOutputStream(zipFile)) {
@@ -454,16 +464,6 @@ public class Environments {
     return environmentOption;
   }
 
-  private static String getExternalServiceAddress(PortablePipelineOptions 
options) {
-    String environmentConfig = options.getDefaultEnvironmentConfig();
-    String environmentOption =
-        PortablePipelineOptions.getEnvironmentOption(options, 
externalServiceAddressOption);
-    if (environmentConfig != null && !environmentConfig.isEmpty()) {
-      return environmentConfig;
-    }
-    return environmentOption;
-  }
-
   private static Map<String, String> 
getProcessVariables(PortablePipelineOptions options) {
     ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
     String assignments =
diff --git 
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
 
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
index 9fbfaac..000ff64 100644
--- 
a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
+++ 
b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.portability;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
 import java.util.Collections;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerRequest;
@@ -24,10 +26,15 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StartWorkerResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StopWorkerResponse;
 import 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory;
 public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase 
implements FnService {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ExternalWorkerService.class);
+  private static final String PIPELINE_OPTIONS_ENV_VAR = "PIPELINE_OPTIONS";
 
   private final PipelineOptions options;
   private final ServerFactory serverFactory = ServerFactory.createDefault();
@@ -90,10 +98,58 @@ public class ExternalWorkerService extends 
BeamFnExternalWorkerPoolImplBase impl
   public void close() {}
 
   public GrpcFnServer<ExternalWorkerService> start() throws Exception {
-    GrpcFnServer<ExternalWorkerService> server =
-        GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    final String externalServiceAddress =
+        
Environments.getExternalServiceAddress(options.as(PortablePipelineOptions.class));
+    GrpcFnServer<ExternalWorkerService> server;
+    if (externalServiceAddress.isEmpty()) {
+      server = GrpcFnServer.allocatePortAndCreateFor(this, serverFactory);
+    } else {
+      server =
+          GrpcFnServer.create(
+              this,
+              
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(externalServiceAddress).build(),
+              serverFactory);
+    }
     LOG.debug(
         "Listening for worker start requests at {}.", 
server.getApiServiceDescriptor().getUrl());
     return server;
   }
+
+  /**
+   * Worker pool entry point.
+   *
+   * <p>The worker pool exposes an RPC service that is used with EXTERNAL 
environment to start and
+   * stop the SDK workers.
+   *
+   * <p>The worker pool uses threads for parallelism;
+   *
+   * <p>This entry point is used by the Java SDK container in worker pool mode 
and expects the
+   * following environment variables:
+   *
+   * <ul>
+   *   <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. It 
needs to be known
+   *       up-front and matches the running job. See {@link PipelineOptions} 
for further details.
+   * </ul>
+   */
+  public static void main(String[] args) {
+    LOG.info("Starting external worker service");
+    final String optionsEnv =
+        checkArgumentNotNull(
+            System.getenv(PIPELINE_OPTIONS_ENV_VAR),
+            "No pipeline options provided in environment variables " + 
PIPELINE_OPTIONS_ENV_VAR);
+    LOG.info("Pipeline options {}", optionsEnv);
+    PipelineOptions options = PipelineOptionsTranslation.fromJson(optionsEnv);
+
+    try (GrpcFnServer<ExternalWorkerService> server = new 
ExternalWorkerService(options).start()) {
+      LOG.info(
+          "External worker service started at address: {}",
+          server.getApiServiceDescriptor().getUrl());
+      // Wait to keep ExternalWorkerService running
+      Sleeper.DEFAULT.sleep(Long.MAX_VALUE);
+    } catch (Exception e) {
+      LOG.error("Error running worker service", e);
+    } finally {
+      LOG.info("External worker service stopped.");
+    }
+  }
 }

Reply via email to