[ https://issues.apache.org/jira/browse/BEAM-5288?focusedWorklogId=147781&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147781 ]
ASF GitHub Bot logged work on BEAM-5288: ---------------------------------------- Author: ASF GitHub Bot Created on: 25/Sep/18 20:42 Start Date: 25/Sep/18 20:42 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6441: [BEAM-5288] Support environment pipeline option in Java and Python. URL: https://github.com/apache/beam/pull/6441#discussion_r220342692 ########## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/LazyJobBundleFactory.java ########## @@ -53,92 +55,81 @@ import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.function.ThrowingFunction; import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; import org.apache.beam.sdk.util.WindowedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A base for a {@link JobBundleFactory} for which the implementation can specify a custom {@link + * A {@link JobBundleFactory} for which the implementation can specify a custom {@link * EnvironmentFactory} for environment management. Note that returned {@link StageBundleFactory * stage bundle factories} are not thread-safe. Instead, a new stage factory should be created for - * each client. + * each client. {@link LazyJobBundleFactory} initializes the Environment lazily when the forStage is + * called for a stage. This factory is not capable of handling a mixed types of environment. */ @ThreadSafe -public abstract class JobBundleFactoryBase implements JobBundleFactory { - private static final Logger LOG = LoggerFactory.getLogger(JobBundleFactoryBase.class); +public class LazyJobBundleFactory implements JobBundleFactory { + private static final Logger LOG = LoggerFactory.getLogger(LazyJobBundleFactory.class); private final IdGenerator stageIdGenerator; - private final GrpcFnServer<FnApiControlClientPoolService> controlServer; - private final GrpcFnServer<GrpcLoggingService> loggingServer; - private final GrpcFnServer<ArtifactRetrievalService> retrievalServer; - private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer; - private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache; + // Using environment as the initialization marker. + private Environment environment; + private ExecutorService executor; + private GrpcFnServer<FnApiControlClientPoolService> controlServer; + private GrpcFnServer<GrpcLoggingService> loggingServer; + private GrpcFnServer<ArtifactRetrievalService> retrievalServer; + private GrpcFnServer<StaticGrpcProvisionService> provisioningServer; + private GrpcFnServer<GrpcDataService> dataServer; + private GrpcFnServer<GrpcStateService> stateServer; + private MapControlClientPool clientPool; + private EnvironmentFactory environmentFactory; - JobBundleFactoryBase(JobInfo jobInfo) throws Exception { - ServerFactory serverFactory = getServerFactory(); - IdGenerator stageIdGenerator = IdGenerators.incrementingLongs(); - ControlClientPool clientPool = MapControlClientPool.create(); + public static LazyJobBundleFactory create( + JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) { + return new LazyJobBundleFactory(jobInfo, environmentFactoryProviderMap); + } - GrpcFnServer<FnApiControlClientPoolService> controlServer = - GrpcFnServer.allocatePortAndCreateFor( - FnApiControlClientPoolService.offeringClientsToPool( - clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), - serverFactory); - GrpcFnServer<GrpcLoggingService> loggingServer = - GrpcFnServer.allocatePortAndCreateFor( - GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory); - GrpcFnServer<ArtifactRetrievalService> retrievalServer = - GrpcFnServer.allocatePortAndCreateFor( - BeamFileSystemArtifactRetrievalService.create(), serverFactory); - GrpcFnServer<StaticGrpcProvisionService> provisioningServer = - GrpcFnServer.allocatePortAndCreateFor( - StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory); - EnvironmentFactory environmentFactory = - getEnvironmentFactory( - controlServer, - loggingServer, - retrievalServer, - provisioningServer, - clientPool.getSource(), - IdGenerators.incrementingLongs()); + LazyJobBundleFactory( + JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) { + IdGenerator stageIdGenerator = IdGenerators.incrementingLongs(); this.stageIdGenerator = stageIdGenerator; - this.controlServer = controlServer; - this.loggingServer = loggingServer; - this.retrievalServer = retrievalServer; - this.provisioningServer = provisioningServer; - this.environmentCache = createEnvironmentCache(environmentFactory, serverFactory); + this.environmentCache = + createEnvironmentCache( + (environment) -> { + synchronized (this) { + checkAndInitialize(jobInfo, environmentFactoryMap, environment); Review comment: I thought about it but avoided it because of * This will result in interpretation of python/go pipeline options in flink-java. As the options are not standardized across the languages, this approach seems to be very fragile. * Pipeline options are not always populated, especially in case of default options. * This will make future support of mixed environment difficult in future. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 147781) Time Spent: 12h 20m (was: 12h 10m) > Modify Environment to support non-dockerized SDK harness deployments > --------------------------------------------------------------------- > > Key: BEAM-5288 > URL: https://issues.apache.org/jira/browse/BEAM-5288 > Project: Beam > Issue Type: New Feature > Components: beam-model > Reporter: Maximilian Michels > Assignee: Ankur Goenka > Priority: Major > Time Spent: 12h 20m > Remaining Estimate: 0h > > As of mailing discussions and BEAM-5187, it has become clear that we need to > extend the Environment information. In addition to the Docker environment, > the extended environment holds deployment options for 1) a process-based > environment, 2) an externally managed environment. > The proto definition, as of now, looks as follows: > {noformat} > message Environment { > // (Required) The URN of the payload > string urn = 1; > // (Optional) The data specifying any parameters to the URN. If > // the URN does not require any arguments, this may be omitted. > bytes payload = 2; > } > message StandardEnvironments { > enum Environments { > DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"]; > PROCESS = 1 [(beam_urn) = "beam:env:process:v1"]; > EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"]; > } > } > // The payload of a Docker image > message DockerPayload { > string container_image = 1; // implicitly linux_amd64. > } > message ProcessPayload { > string os = 1; // "linux", "darwin", .. > string arch = 2; // "amd64", .. > string command = 3; // process to execute > map<string, string> env = 4; // environment variables > } > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)