[ 
https://issues.apache.org/jira/browse/BEAM-5288?focusedWorklogId=147781&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147781
 ]

ASF GitHub Bot logged work on BEAM-5288:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Sep/18 20:42
            Start Date: 25/Sep/18 20:42
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on a change in pull request #6441: 
[BEAM-5288] Support environment pipeline option in Java and Python.
URL: https://github.com/apache/beam/pull/6441#discussion_r220342692
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/LazyJobBundleFactory.java
 ##########
 @@ -53,92 +55,81 @@
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A base for a {@link JobBundleFactory} for which the implementation can 
specify a custom {@link
+ * A {@link JobBundleFactory} for which the implementation can specify a 
custom {@link
  * EnvironmentFactory} for environment management. Note that returned {@link 
StageBundleFactory
  * stage bundle factories} are not thread-safe. Instead, a new stage factory 
should be created for
- * each client.
+ * each client. {@link LazyJobBundleFactory} initializes the Environment 
lazily when the forStage is
+ * called for a stage. This factory is not capable of handling a mixed types 
of environment.
  */
 @ThreadSafe
-public abstract class JobBundleFactoryBase implements JobBundleFactory {
-  private static final Logger LOG = 
LoggerFactory.getLogger(JobBundleFactoryBase.class);
+public class LazyJobBundleFactory implements JobBundleFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LazyJobBundleFactory.class);
 
   private final IdGenerator stageIdGenerator;
-  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  private final GrpcFnServer<GrpcLoggingService> loggingServer;
-  private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-
   private final LoadingCache<Environment, WrappedSdkHarnessClient> 
environmentCache;
+  // Using environment as the initialization marker.
+  private Environment environment;
+  private ExecutorService executor;
+  private GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  private GrpcFnServer<GrpcLoggingService> loggingServer;
+  private GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+  private GrpcFnServer<GrpcDataService> dataServer;
+  private GrpcFnServer<GrpcStateService> stateServer;
+  private MapControlClientPool clientPool;
+  private EnvironmentFactory environmentFactory;
 
-  JobBundleFactoryBase(JobInfo jobInfo) throws Exception {
-    ServerFactory serverFactory = getServerFactory();
-    IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
-    ControlClientPool clientPool = MapControlClientPool.create();
+  public static LazyJobBundleFactory create(
+      JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> 
environmentFactoryProviderMap) {
+    return new LazyJobBundleFactory(jobInfo, environmentFactoryProviderMap);
+  }
 
-    GrpcFnServer<FnApiControlClientPoolService> controlServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            FnApiControlClientPoolService.offeringClientsToPool(
-                clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
-            serverFactory);
-    GrpcFnServer<GrpcLoggingService> loggingServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
-    GrpcFnServer<ArtifactRetrievalService> retrievalServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            BeamFileSystemArtifactRetrievalService.create(), serverFactory);
-    GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), 
serverFactory);
-    EnvironmentFactory environmentFactory =
-        getEnvironmentFactory(
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer,
-            clientPool.getSource(),
-            IdGenerators.incrementingLongs());
+  LazyJobBundleFactory(
+      JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> 
environmentFactoryMap) {
+    IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
     this.stageIdGenerator = stageIdGenerator;
-    this.controlServer = controlServer;
-    this.loggingServer = loggingServer;
-    this.retrievalServer = retrievalServer;
-    this.provisioningServer = provisioningServer;
-    this.environmentCache = createEnvironmentCache(environmentFactory, 
serverFactory);
+    this.environmentCache =
+        createEnvironmentCache(
+            (environment) -> {
+              synchronized (this) {
+                checkAndInitialize(jobInfo, environmentFactoryMap, 
environment);
 
 Review comment:
   I thought about it but avoided it because of
   * This will result in interpretation of python/go pipeline options in 
flink-java. As the options are not standardized across the languages, this 
approach seems to be very fragile.
   * Pipeline options are not always populated, especially in case of default 
options.
   * This will make future support of mixed environment difficult in future.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 147781)
    Time Spent: 12h 20m  (was: 12h 10m)

> Modify Environment to support non-dockerized SDK harness deployments 
> ---------------------------------------------------------------------
>
>                 Key: BEAM-5288
>                 URL: https://issues.apache.org/jira/browse/BEAM-5288
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Maximilian Michels
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> As of mailing discussions and BEAM-5187, it has become clear that we need to 
> extend the Environment information. In addition to the Docker environment, 
> the extended environment holds deployment options for 1) a process-based 
> environment, 2) an externally managed environment.
> The proto definition, as of now, looks as follows:
> {noformat}
>  message Environment {
>    // (Required) The URN of the payload
>    string urn = 1;
>    // (Optional) The data specifying any parameters to the URN. If
>    // the URN does not require any arguments, this may be omitted.
>    bytes payload = 2;
>  }
>  message StandardEnvironments {
>    enum Environments {
>      DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"];
>      PROCESS = 1 [(beam_urn) = "beam:env:process:v1"];
>      EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"];
>    }
>  }
>  // The payload of a Docker image
>  message DockerPayload {
>    string container_image = 1;  // implicitly linux_amd64.
>  }
>  message ProcessPayload {
>    string os = 1;  // "linux", "darwin", ..
>    string arch = 2;  // "amd64", ..
>    string command = 3; // process to execute
>    map<string, string> env = 4; // environment variables
>  }
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to