[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5431 ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r168193217 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -163,29 +210,130 @@ protected void runCluster(Configuration configuration) throws Exception { blobServer, heartbeatServices, metricRegistry); + + // TODO: Make shutDownAndTerminate non blocking to not use the global executor + dispatcher.getTerminationFuture().whenCompleteAsync( + (Boolean success, Throwable throwable) -> { + if (throwable != null) { + LOG.info("Could not properly terminate the Dispatcher.", throwable); + } + + shutDownAndTerminate( + SUCCESS_RETURN_CODE, + ApplicationStatus.SUCCEEDED, + true); + }); } } protected void initializeServices(Configuration configuration) throws Exception { - assert(Thread.holdsLock(lock)); LOG.info("Initializing cluster services."); - final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); - // TODO: Add support for port ranges - final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT)); - - commonRpcService = createRpcService(configuration, bindAddress, portRange); - haServices = createHaServices(configuration, commonRpcService.getExecutor()); - blobServer = new BlobServer(configuration, haServices.createBlobStore()); - blobServer.start(); - heartbeatServices = createHeartbeatServices(configuration); - metricRegistry = createMetricRegistry(configuration); - - // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint - // start the MetricQueryService - final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); - metricRegistry.startQueryService(actorSystem, null); + synchronized (lock) { + final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); + // TODO: Add support for port ranges + final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT)); + + commonRpcService = createRpcService(configuration, bindAddress, portRange); + haServices = createHaServices(configuration, commonRpcService.getExecutor()); + blobServer = new BlobServer(configuration, haServices.createBlobStore()); + blobServer.start(); + heartbeatServices = createHeartbeatServices(configuration); + metricRegistry = createMetricRegistry(configuration); + + // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint + // start the MetricQueryService + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); + } + } + + protected void startClusterComponents( + Configuration configuration, --- End diff -- Good catch. Will fix it. ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r168132269 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for the {@link MiniDispatcher}. + */ +@Category(Flip6.class) +public class MiniDispatcherTest extends TestLogger { + + private static final JobGraph jobGraph = new JobGraph(); + + private static final Time timeout = Time.seconds(10L); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static TestingRpcService rpcService; + + private static Configuration configuration; + + private static BlobServer blobServer; + + private MiniDispatcher miniDispatcher; + + private CompletableFuture jobGraphFuture; + + private TestingLeaderElectionService dispatcherLeaderElectionService; + + @BeforeClass + public static void setupClass() throws IOException { + rpcService = new TestingRpcService(); + configuration = new Configuration(); + + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(configuration, new VoidBlobStore()); + } + + @Before + public void setup() throws Exc
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r168126480 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -163,29 +210,130 @@ protected void runCluster(Configuration configuration) throws Exception { blobServer, heartbeatServices, metricRegistry); + + // TODO: Make shutDownAndTerminate non blocking to not use the global executor + dispatcher.getTerminationFuture().whenCompleteAsync( + (Boolean success, Throwable throwable) -> { + if (throwable != null) { + LOG.info("Could not properly terminate the Dispatcher.", throwable); + } + + shutDownAndTerminate( + SUCCESS_RETURN_CODE, + ApplicationStatus.SUCCEEDED, + true); + }); } } protected void initializeServices(Configuration configuration) throws Exception { - assert(Thread.holdsLock(lock)); LOG.info("Initializing cluster services."); - final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); - // TODO: Add support for port ranges - final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT)); - - commonRpcService = createRpcService(configuration, bindAddress, portRange); - haServices = createHaServices(configuration, commonRpcService.getExecutor()); - blobServer = new BlobServer(configuration, haServices.createBlobStore()); - blobServer.start(); - heartbeatServices = createHeartbeatServices(configuration); - metricRegistry = createMetricRegistry(configuration); - - // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint - // start the MetricQueryService - final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); - metricRegistry.startQueryService(actorSystem, null); + synchronized (lock) { + final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); + // TODO: Add support for port ranges + final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT)); + + commonRpcService = createRpcService(configuration, bindAddress, portRange); + haServices = createHaServices(configuration, commonRpcService.getExecutor()); + blobServer = new BlobServer(configuration, haServices.createBlobStore()); + blobServer.start(); + heartbeatServices = createHeartbeatServices(configuration); + metricRegistry = createMetricRegistry(configuration); + + // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint + // start the MetricQueryService + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); + } + } + + protected void startClusterComponents( + Configuration configuration, --- End diff -- nit: indentation ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167853480 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) throws FlinkException { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } - - terminationFuture.complete(true); } if (exception != null) { throw new FlinkException("Could not properly shut down the cluster services.", exception); } } + protected void stopClusterComponents() throws Exception { + synchronized (lock) { + Throwable exception = null; + + if (webMonitorEndpoint != null) { + webMonitorEndpoint.shutdown(Time.seconds(10L)); + } + + if (dispatcherLeaderRetrievalService != null) { + try { + dispatcherLeaderRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (dispatcher != null) { + try { + dispatcher.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManagerRetrievalService != null) { + try { + resourceManagerRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManager != null) { + try { + resourceManager.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (archivedExecutionGraphStore != null) { + try { + archivedExecutionGraphStore.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (transientBlobCache != null) { + try { + transientBlobCache.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the session cluster entry point.", exception); + } + } + } + @Override public void onFatalError(Throwable exception) { LOG.error("Fatal error occurred in the cluster entrypoint.", exception); System.exit(RUNTIME_FAILURE_RETURN_CODE); } - protected abstract void startClusterComponents( + // -- + // Internal methods + // -- + + private void shutDownAndTerminate( + int returnCode, + ApplicationStatus applicationStatus, + @Nullable String diagnostics, --- End diff -- True at the moment it won't be used. Initially I left it in because of the Yarn based cluster entrypoints. The old implementation actively deregisters from yarn passing this as additional information. For the moment, I'll remove it. ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167853004 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -300,4 +587,12 @@ protected static ClusterConfiguration parseArguments(String[] args) { protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) { return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir()); } + + /** +* Execution mode of the {@link MiniDispatcher}. +*/ + public enum ExecutionMode { + NORMAL, // waits until the job result has been served --- End diff -- True. Will do. ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167852786 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -63,6 +89,10 @@ */ public abstract class ClusterEntrypoint implements FatalErrorHandler { + public static final ConfigOption EXECUTION_MODE = ConfigOptions + .key("cluster.execution-mode") --- End diff -- There are other configuration parameters as well like `local.number-taskmanager`. In the old Yarn code, we used environment variables to pass certain settings. I think the configuration is a better place. However, I agree that this might be a bit confusing. I'll add the prefix `internal` to the key. ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167851271 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for the {@link MiniDispatcher}. + */ +@Category(Flip6.class) +public class MiniDispatcherTest extends TestLogger { + + private static final JobGraph jobGraph = new JobGraph(); + + private static final Time timeout = Time.seconds(10L); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static TestingRpcService rpcService; + + private static Configuration configuration; + + private static BlobServer blobServer; + + private MiniDispatcher miniDispatcher; + + private CompletableFuture jobGraphFuture; + + private TestingLeaderElectionService dispatcherLeaderElectionService; + + @BeforeClass + public static void setupClass() throws IOException { + rpcService = new TestingRpcService(); + configuration = new Configuration(); + + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(configuration, new VoidBlobStore()); + } + + @Before + public void setup() t
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167850583 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for the {@link MiniDispatcher}. + */ +@Category(Flip6.class) +public class MiniDispatcherTest extends TestLogger { + + private static final JobGraph jobGraph = new JobGraph(); + + private static final Time timeout = Time.seconds(10L); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static TestingRpcService rpcService; + + private static Configuration configuration; + + private static BlobServer blobServer; + + private MiniDispatcher miniDispatcher; + + private CompletableFuture jobGraphFuture; + + private TestingLeaderElectionService dispatcherLeaderElectionService; + + @BeforeClass + public static void setupClass() throws IOException { + rpcService = new TestingRpcService(); + configuration = new Configuration(); + + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(configuration, new VoidBlobStore()); + } + + @Before + public void setup() t
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167850253 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -671,13 +674,68 @@ public void jobFinishedByOther() { log.info("Job {} was finished by other JobManager.", jobId); runAsync( - () -> { - try { - removeJob(jobId, false); - } catch (Exception e) { - log.warn("Could not properly remove job {} from the dispatcher.", jobId, e); - } - }); + () -> Dispatcher.this.jobFinishedByOther(jobId)); } } + + //-- + // Factories + //-- + + /** +* Factory for a {@link JobManagerRunner}. +*/ + @FunctionalInterface + public interface JobManagerRunnerFactory { + JobManagerRunner createJobManagerRunner( + ResourceID resourceId, + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + BlobServer blobServer, + JobManagerServices jobManagerServices, + MetricRegistry metricRegistry, + OnCompletionActions onCompleteActions, + FatalErrorHandler fatalErrorHandler, + @Nullable String restAddress) throws Exception; + } + + /** +* Singleton default factory for {@link JobManagerRunner}. +*/ + public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { + INSTANCE { --- End diff -- True. Will change it. ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167610671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) throws FlinkException { exception = ExceptionUtils.firstOrSuppressed(t, exception); } } - - terminationFuture.complete(true); } if (exception != null) { throw new FlinkException("Could not properly shut down the cluster services.", exception); } } + protected void stopClusterComponents() throws Exception { + synchronized (lock) { + Throwable exception = null; + + if (webMonitorEndpoint != null) { + webMonitorEndpoint.shutdown(Time.seconds(10L)); + } + + if (dispatcherLeaderRetrievalService != null) { + try { + dispatcherLeaderRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (dispatcher != null) { + try { + dispatcher.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManagerRetrievalService != null) { + try { + resourceManagerRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (resourceManager != null) { + try { + resourceManager.shutDown(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (archivedExecutionGraphStore != null) { + try { + archivedExecutionGraphStore.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (transientBlobCache != null) { + try { + transientBlobCache.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + + if (exception != null) { + throw new FlinkException("Could not properly shut down the session cluster entry point.", exception); + } + } + } + @Override public void onFatalError(Throwable exception) { LOG.error("Fatal error occurred in the cluster entrypoint.", exception); System.exit(RUNTIME_FAILURE_RETURN_CODE); } - protected abstract void startClusterComponents( + // -- + // Internal methods + // -- + + private void shutDownAndTerminate( + int returnCode, + ApplicationStatus applicationStatus, + @Nullable String diagnostics, --- End diff -- `diagnostics` is unused. You already log errors so I don't think this argument is needed. ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167608001 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -300,4 +587,12 @@ protected static ClusterConfiguration parseArguments(String[] args) { protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) { return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir()); } + + /** +* Execution mode of the {@link MiniDispatcher}. +*/ + public enum ExecutionMode { + NORMAL, // waits until the job result has been served --- End diff -- Should also be a Javadoc comment: ``` /** * Execution mode of the {@link MiniDispatcher}. */ public enum ExecutionMode { /** * Waits until the job result has been served. */ NORMAL, /** * Directly stops after the job has finished. */ DETACHED } ``` ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167601541 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for the {@link MiniDispatcher}. + */ +@Category(Flip6.class) +public class MiniDispatcherTest extends TestLogger { + + private static final JobGraph jobGraph = new JobGraph(); + + private static final Time timeout = Time.seconds(10L); + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static TestingRpcService rpcService; + + private static Configuration configuration; + + private static BlobServer blobServer; + + private MiniDispatcher miniDispatcher; + + private CompletableFuture jobGraphFuture; + + private TestingLeaderElectionService dispatcherLeaderElectionService; + + @BeforeClass + public static void setupClass() throws IOException { + rpcService = new TestingRpcService(); + configuration = new Configuration(); + + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(configuration, new VoidBlobStore()); + } + + @Before + public void setup() throws Exc
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167606814 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -63,6 +89,10 @@ */ public abstract class ClusterEntrypoint implements FatalErrorHandler { + public static final ConfigOption EXECUTION_MODE = ConfigOptions + .key("cluster.execution-mode") --- End diff -- Is it the first time that `ConfigOptions` are used for something that the user should/cannot not configure? ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r167593183 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -671,13 +674,68 @@ public void jobFinishedByOther() { log.info("Job {} was finished by other JobManager.", jobId); runAsync( - () -> { - try { - removeJob(jobId, false); - } catch (Exception e) { - log.warn("Could not properly remove job {} from the dispatcher.", jobId, e); - } - }); + () -> Dispatcher.this.jobFinishedByOther(jobId)); } } + + //-- + // Factories + //-- + + /** +* Factory for a {@link JobManagerRunner}. +*/ + @FunctionalInterface + public interface JobManagerRunnerFactory { + JobManagerRunner createJobManagerRunner( + ResourceID resourceId, + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + BlobServer blobServer, + JobManagerServices jobManagerServices, + MetricRegistry metricRegistry, + OnCompletionActions onCompleteActions, + FatalErrorHandler fatalErrorHandler, + @Nullable String restAddress) throws Exception; + } + + /** +* Singleton default factory for {@link JobManagerRunner}. +*/ + public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { + INSTANCE { --- End diff -- Since there is only one instance, it is ok to write: ``` public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { INSTANCE; @Override public JobManagerRunner createJobManagerRunner( ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompleteActions, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress) throws Exception { return new JobManagerRunner( resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerServices, metricRegistry, onCompleteActions, fatalErrorHandler, restAddress); } } ``` Saves one level of indentation. ---
[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5431 [FLINK-8608] [flip6] Implement MiniDispatcher for job mode ## What is the purpose of the change The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. ## Brief change log - Introduce `SingleJobSubmittedJobGraphStore` for the `MiniDispatcher` - Refactored `JobClusterEntrypoint` and `SessionClusterEntrypoint` such that most of the cluster component logic moved to `ClusterEntrypoint` - Renamed `JobMasterRestEndpoint` into `MiniDispatcherRestEndpoint` - Initialize `MiniDispatcher` with single job retrieved by the `JobClusterEntrypoint` - Terminate `MiniDispatcher` after job completion if the its execution mode is `detached` ## Verifying this change - Added `MiniDispatcherTest` - The tests of `MiniDispatcher` are also valid for the `MiniDispatcher` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink implementMiniDispatcher Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5431.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5431 commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8 Author: Till Rohrmann Date: 2018-02-08T13:34:54Z [FLINK-8608] [flip6] Implement MiniDispatcher for job mode The MiniDispatcher is responsible for submitting the single job with which a job mode cluster is started. Once the job has completed and if the cluster has been started in detached mode, the MiniDispatcher will terminate. In order to reduce code duplication, the MiniDispatcher is a sub class of the Dispatcher which is started with a single job submitted job graph store. ---