This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c50e3fc [BEAM-6488] Portable Flink runner support for running cross-language transforms (#7709) c50e3fc is described below commit c50e3fc96020125c1afd3afb09bca500ee110987 Author: Heejong Lee <heej...@gmail.com> AuthorDate: Fri Feb 8 20:31:44 2019 -0800 [BEAM-6488] Portable Flink runner support for running cross-language transforms (#7709) Multi-language support in DefaultJobBundleFactory --- .../control/DefaultJobBundleFactory.java | 199 ++++++++++++--------- .../control/DefaultJobBundleFactoryTest.java | 28 ++- 2 files changed, 133 insertions(+), 94 deletions(-) diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index 0764aa0..5881f4c 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -17,12 +17,12 @@ */ package org.apache.beam.runners.fnexecution.control; +import com.google.auto.value.AutoValue; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; @@ -66,25 +66,17 @@ import org.slf4j.LoggerFactory; * EnvironmentFactory} for environment management. Note that returned {@link StageBundleFactory * stage bundle factories} are not thread-safe. Instead, a new stage factory should be created for * each client. {@link DefaultJobBundleFactory} initializes the Environment lazily when the forStage - * is called for a stage. This factory is not capable of handling mixed types of environment. + * is called for a stage. */ @ThreadSafe public class DefaultJobBundleFactory implements JobBundleFactory { private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class); - private final IdGenerator stageIdGenerator; private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache; - // Using environment as the initialization marker. - private Environment environment; - private ExecutorService executor; - private GrpcFnServer<FnApiControlClientPoolService> controlServer; - private GrpcFnServer<GrpcLoggingService> loggingServer; - private GrpcFnServer<ArtifactRetrievalService> retrievalServer; - private GrpcFnServer<StaticGrpcProvisionService> provisioningServer; - private GrpcFnServer<GrpcDataService> dataServer; - private GrpcFnServer<GrpcStateService> stateServer; - private MapControlClientPool clientPool; - private EnvironmentFactory environmentFactory; + private final Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap; + private final ExecutorService executor; + private final MapControlClientPool clientPool; + private final IdGenerator stageIdGenerator; public static DefaultJobBundleFactory create( JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) { @@ -94,42 +86,42 @@ public class DefaultJobBundleFactory implements JobBundleFactory { DefaultJobBundleFactory( JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) { IdGenerator stageIdGenerator = IdGenerators.incrementingLongs(); + this.environmentFactoryProviderMap = environmentFactoryMap; + this.executor = Executors.newCachedThreadPool(); + this.clientPool = MapControlClientPool.create(); this.stageIdGenerator = stageIdGenerator; this.environmentCache = - createEnvironmentCache( - environment -> { - synchronized (this) { - checkAndInitialize(jobInfo, environmentFactoryMap, environment); - } - return environmentFactory.createEnvironment(environment); - }); + createEnvironmentCache(serverFactory -> createServerInfo(jobInfo, serverFactory)); } @VisibleForTesting DefaultJobBundleFactory( - EnvironmentFactory environmentFactory, + Map<String, EnvironmentFactory.Provider> environmentFactoryMap, IdGenerator stageIdGenerator, GrpcFnServer<FnApiControlClientPoolService> controlServer, GrpcFnServer<GrpcLoggingService> loggingServer, GrpcFnServer<ArtifactRetrievalService> retrievalServer, GrpcFnServer<StaticGrpcProvisionService> provisioningServer, GrpcFnServer<GrpcDataService> dataServer, - GrpcFnServer<GrpcStateService> stateServer) - throws Exception { + GrpcFnServer<GrpcStateService> stateServer) { + this.environmentFactoryProviderMap = environmentFactoryMap; this.executor = Executors.newCachedThreadPool(); + this.clientPool = MapControlClientPool.create(); this.stageIdGenerator = stageIdGenerator; - this.controlServer = controlServer; - this.loggingServer = loggingServer; - this.retrievalServer = retrievalServer; - this.provisioningServer = provisioningServer; - this.dataServer = dataServer; - this.stateServer = stateServer; - this.environmentCache = - createEnvironmentCache(env -> environmentFactory.createEnvironment(env)); + ServerInfo serverInfo = + new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder() + .setControlServer(controlServer) + .setLoggingServer(loggingServer) + .setRetrievalServer(retrievalServer) + .setProvisioningServer(provisioningServer) + .setDataServer(dataServer) + .setStateServer(stateServer) + .build(); + this.environmentCache = createEnvironmentCache(serverFactory -> serverInfo); } private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCache( - ThrowingFunction<Environment, RemoteEnvironment> environmentCreator) { + ThrowingFunction<ServerFactory, ServerInfo> serverInfoCreator) { return CacheBuilder.newBuilder() .removalListener( (RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> { @@ -145,8 +137,21 @@ public class DefaultJobBundleFactory implements JobBundleFactory { new CacheLoader<Environment, WrappedSdkHarnessClient>() { @Override public WrappedSdkHarnessClient load(Environment environment) throws Exception { + EnvironmentFactory.Provider environmentFactoryProvider = + environmentFactoryProviderMap.get(environment.getUrn()); + ServerFactory serverFactory = environmentFactoryProvider.getServerFactory(); + ServerInfo serverInfo = serverInfoCreator.apply(serverFactory); + + EnvironmentFactory environmentFactory = + environmentFactoryProvider.createEnvironmentFactory( + serverInfo.getControlServer(), + serverInfo.getLoggingServer(), + serverInfo.getRetrievalServer(), + serverInfo.getProvisioningServer(), + clientPool, + stageIdGenerator); return WrappedSdkHarnessClient.wrapping( - environmentCreator.apply(environment), dataServer); + environmentFactory.createEnvironment(environment), serverInfo); } }); } @@ -161,12 +166,13 @@ public class DefaultJobBundleFactory implements JobBundleFactory { ProcessBundleDescriptors.fromExecutableStage( stageIdGenerator.getId(), executableStage, - dataServer.getApiServiceDescriptor(), - stateServer.getApiServiceDescriptor()); + wrappedClient.getServerInfo().getDataServer().getApiServiceDescriptor(), + wrappedClient.getServerInfo().getStateServer().getApiServiceDescriptor()); } catch (IOException e) { throw new RuntimeException(e); } - return SimpleStageBundleFactory.create(wrappedClient, processBundleDescriptor, stateServer); + return SimpleStageBundleFactory.create( + wrappedClient, processBundleDescriptor, wrappedClient.getServerInfo().getStateServer()); } @Override @@ -176,14 +182,6 @@ public class DefaultJobBundleFactory implements JobBundleFactory { environmentCache.invalidateAll(); environmentCache.cleanUp(); - // Tear down common servers. - stateServer.close(); - dataServer.close(); - controlServer.close(); - loggingServer.close(); - retrievalServer.close(); - provisioningServer.close(); - executor.shutdown(); } @@ -269,85 +267,114 @@ public class DefaultJobBundleFactory implements JobBundleFactory { private final RemoteEnvironment environment; private final SdkHarnessClient client; + private final ServerInfo serverInfo; - static WrappedSdkHarnessClient wrapping( - RemoteEnvironment environment, GrpcFnServer<GrpcDataService> dataServer) { + static WrappedSdkHarnessClient wrapping(RemoteEnvironment environment, ServerInfo serverInfo) { SdkHarnessClient client = SdkHarnessClient.usingFnApiClient( - environment.getInstructionRequestHandler(), dataServer.getService()); - return new WrappedSdkHarnessClient(environment, client); + environment.getInstructionRequestHandler(), serverInfo.getDataServer().getService()); + return new WrappedSdkHarnessClient(environment, client, serverInfo); } - private WrappedSdkHarnessClient(RemoteEnvironment environment, SdkHarnessClient client) { + private WrappedSdkHarnessClient( + RemoteEnvironment environment, SdkHarnessClient client, ServerInfo serverInfo) { this.environment = environment; this.client = client; + this.serverInfo = serverInfo; } SdkHarnessClient getClient() { return client; } + ServerInfo getServerInfo() { + return serverInfo; + } + @Override public void close() throws Exception { try (AutoCloseable envCloser = environment) { // Wrap resources in try-with-resources to ensure all are cleaned up. } + try (AutoCloseable stateServer = serverInfo.getStateServer(); + AutoCloseable dateServer = serverInfo.getDataServer(); + AutoCloseable controlServer = serverInfo.getControlServer(); + AutoCloseable loggingServer = serverInfo.getLoggingServer(); + AutoCloseable retrievalServer = serverInfo.getRetrievalServer(); + AutoCloseable provisioningServer = serverInfo.getProvisioningServer()) {} // TODO: Wait for executor shutdown? } } - @GuardedBy("this") - private void checkAndInitialize( - JobInfo jobInfo, - Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap, - Environment environment) + private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory) throws IOException { - Preconditions.checkNotNull(environment, "Environment can not be null"); - if (this.environment != null) { - Preconditions.checkArgument( - this.environment.getUrn().equals(environment.getUrn()), - "Unsupported: Mixing environment types (%s, %s) is not supported for a job.", - this.environment.getUrn(), - environment.getUrn()); - // Nothing to do. Already initialized. - return; - } - - EnvironmentFactory.Provider environmentFactoryProvider = - environmentFactoryProviderMap.get(environment.getUrn()); - ServerFactory serverFactory = environmentFactoryProvider.getServerFactory(); + Preconditions.checkNotNull(serverFactory, "serverFactory can not be null"); - this.clientPool = MapControlClientPool.create(); - this.executor = Executors.newCachedThreadPool(); - this.controlServer = + GrpcFnServer<FnApiControlClientPoolService> controlServer = GrpcFnServer.allocatePortAndCreateFor( FnApiControlClientPoolService.offeringClientsToPool( clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory); - this.loggingServer = + GrpcFnServer<GrpcLoggingService> loggingServer = GrpcFnServer.allocatePortAndCreateFor( GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory); - this.retrievalServer = + GrpcFnServer<ArtifactRetrievalService> retrievalServer = GrpcFnServer.allocatePortAndCreateFor( BeamFileSystemArtifactRetrievalService.create(), serverFactory); - this.provisioningServer = + GrpcFnServer<StaticGrpcProvisionService> provisioningServer = GrpcFnServer.allocatePortAndCreateFor( StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory); - this.dataServer = + GrpcFnServer<GrpcDataService> dataServer = GrpcFnServer.allocatePortAndCreateFor( GrpcDataService.create(executor, OutboundObserverFactory.serverDirect()), serverFactory); - this.stateServer = + GrpcFnServer<GrpcStateService> stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory); - this.environmentFactory = - environmentFactoryProvider.createEnvironmentFactory( - controlServer, - loggingServer, - retrievalServer, - provisioningServer, - clientPool, - stageIdGenerator); - this.environment = environment; + ServerInfo serverInfo = + new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder() + .setControlServer(controlServer) + .setLoggingServer(loggingServer) + .setRetrievalServer(retrievalServer) + .setProvisioningServer(provisioningServer) + .setDataServer(dataServer) + .setStateServer(stateServer) + .build(); + return serverInfo; + } + + /** A container for EnvironmentFactory and its corresponding Grpc servers. */ + @AutoValue + public abstract static class ServerInfo { + abstract GrpcFnServer<FnApiControlClientPoolService> getControlServer(); + + abstract GrpcFnServer<GrpcLoggingService> getLoggingServer(); + + abstract GrpcFnServer<ArtifactRetrievalService> getRetrievalServer(); + + abstract GrpcFnServer<StaticGrpcProvisionService> getProvisioningServer(); + + abstract GrpcFnServer<GrpcDataService> getDataServer(); + + abstract GrpcFnServer<GrpcStateService> getStateServer(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setControlServer(GrpcFnServer<FnApiControlClientPoolService> server); + + abstract Builder setLoggingServer(GrpcFnServer<GrpcLoggingService> server); + + abstract Builder setRetrievalServer(GrpcFnServer<ArtifactRetrievalService> server); + + abstract Builder setProvisioningServer(GrpcFnServer<StaticGrpcProvisionService> server); + + abstract Builder setDataServer(GrpcFnServer<GrpcDataService> server); + + abstract Builder setStateServer(GrpcFnServer<GrpcStateService> server); + + abstract ServerInfo build(); + } } } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java index 5c90c31..20c7578 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.fn.IdGenerators; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; -import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -82,6 +81,15 @@ public class DefaultJobBundleFactoryTest { private final IdGenerator stageIdGenerator = IdGenerators.incrementingLongs(); private final InstructionResponse instructionResponse = InstructionResponse.newBuilder().setInstructionId("instruction-id").build(); + private final EnvironmentFactory.Provider envFactoryProvider = + (GrpcFnServer<FnApiControlClientPoolService> controlServiceServer, + GrpcFnServer<GrpcLoggingService> loggingServiceServer, + GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer, + GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, + ControlClientPool clientPool, + IdGenerator idGenerator) -> envFactory; + private final Map<String, EnvironmentFactory.Provider> envFactoryProviderMap = + ImmutableMap.of(environment.getUrn(), envFactoryProvider); @Before public void setUpMocks() throws Exception { @@ -100,7 +108,7 @@ public class DefaultJobBundleFactoryTest { public void createsCorrectEnvironment() throws Exception { try (DefaultJobBundleFactory bundleFactory = new DefaultJobBundleFactory( - envFactory, + envFactoryProviderMap, stageIdGenerator, controlServer, loggingServer, @@ -164,7 +172,7 @@ public class DefaultJobBundleFactoryTest { verify(envFactoryA, Mockito.times(0)).createEnvironment(environmentAA); bundleFactory.forStage(getExecutableStage(environmentAA)); - verify(environmentProviderFactoryA, Mockito.times(1)) + verify(environmentProviderFactoryA, Mockito.times(2)) .createEnvironmentFactory(any(), any(), any(), any(), any(), any()); verify(environmentProviderFactoryB, Mockito.times(0)) .createEnvironmentFactory(any(), any(), any(), any(), any(), any()); @@ -174,7 +182,7 @@ public class DefaultJobBundleFactoryTest { } @Test - public void failedCreatingMultipleEnvironmentFromMultipleTypes() throws Exception { + public void creatingMultipleEnvironmentFromMultipleTypes() throws Exception { ServerFactory serverFactory = ServerFactory.createDefault(); Environment environmentA = Environment.newBuilder().setUrn("env:urn:a").build(); @@ -206,16 +214,17 @@ public class DefaultJobBundleFactoryTest { JobInfo.create("testJob", "testJob", "token", Struct.getDefaultInstance()), environmentFactoryProviderMap)) { bundleFactory.forStage(getExecutableStage(environmentB)); - thrown.expectCause(Matchers.any(IllegalArgumentException.class)); bundleFactory.forStage(getExecutableStage(environmentA)); } + verify(envFactoryA).createEnvironment(environmentA); + verify(envFactoryB).createEnvironment(environmentB); } @Test public void closesEnvironmentOnCleanup() throws Exception { DefaultJobBundleFactory bundleFactory = new DefaultJobBundleFactory( - envFactory, + envFactoryProviderMap, stageIdGenerator, controlServer, loggingServer, @@ -233,7 +242,7 @@ public class DefaultJobBundleFactoryTest { public void cachesEnvironment() throws Exception { try (DefaultJobBundleFactory bundleFactory = new DefaultJobBundleFactory( - envFactory, + envFactoryProviderMap, stageIdGenerator, controlServer, loggingServer, @@ -258,6 +267,9 @@ public class DefaultJobBundleFactoryTest { Environment envFoo = Environment.newBuilder().setUrn("dummy:urn:another").build(); RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class); InstructionRequestHandler fooInstructionHandler = mock(InstructionRequestHandler.class); + Map<String, EnvironmentFactory.Provider> envFactoryProviderMapFoo = + ImmutableMap.of( + environment.getUrn(), envFactoryProvider, envFoo.getUrn(), envFactoryProvider); when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo); when(remoteEnvFoo.getInstructionRequestHandler()).thenReturn(fooInstructionHandler); // Don't bother creating a distinct instruction response because we don't use it here. @@ -266,7 +278,7 @@ public class DefaultJobBundleFactoryTest { try (DefaultJobBundleFactory bundleFactory = new DefaultJobBundleFactory( - envFactory, + envFactoryProviderMapFoo, stageIdGenerator, controlServer, loggingServer,