[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5431


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r168193217
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -163,29 +210,130 @@ protected void runCluster(Configuration 
configuration) throws Exception {
blobServer,
heartbeatServices,
metricRegistry);
+
+   // TODO: Make shutDownAndTerminate non blocking to not 
use the global executor
+   dispatcher.getTerminationFuture().whenCompleteAsync(
+   (Boolean success, Throwable throwable) -> {
+   if (throwable != null) {
+   LOG.info("Could not properly 
terminate the Dispatcher.", throwable);
+   }
+
+   shutDownAndTerminate(
+   SUCCESS_RETURN_CODE,
+   ApplicationStatus.SUCCEEDED,
+   true);
+   });
}
}
 
protected void initializeServices(Configuration configuration) throws 
Exception {
-   assert(Thread.holdsLock(lock));
 
LOG.info("Initializing cluster services.");
 
-   final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
-   // TODO: Add support for port ranges
-   final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
-
-   commonRpcService = createRpcService(configuration, bindAddress, 
portRange);
-   haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
-   blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
-   blobServer.start();
-   heartbeatServices = createHeartbeatServices(configuration);
-   metricRegistry = createMetricRegistry(configuration);
-
-   // TODO: This is a temporary hack until we have ported the 
MetricQueryService to the new RpcEndpoint
-   // start the MetricQueryService
-   final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
-   metricRegistry.startQueryService(actorSystem, null);
+   synchronized (lock) {
+   final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
+   // TODO: Add support for port ranges
+   final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
+
+   commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
+   haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
+   blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
+   blobServer.start();
+   heartbeatServices = 
createHeartbeatServices(configuration);
+   metricRegistry = createMetricRegistry(configuration);
+
+   // TODO: This is a temporary hack until we have ported 
the MetricQueryService to the new RpcEndpoint
+   // start the MetricQueryService
+   final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
+   metricRegistry.startQueryService(actorSystem, null);
+   }
+   }
+
+   protected void startClusterComponents(
+   Configuration configuration,
--- End diff --

Good catch. Will fix it.


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r168132269
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link MiniDispatcher}.
+ */
+@Category(Flip6.class)
+public class MiniDispatcherTest extends TestLogger {
+
+   private static final JobGraph jobGraph = new JobGraph();
+
+   private static final Time timeout = Time.seconds(10L);
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static TestingRpcService rpcService;
+
+   private static Configuration configuration;
+
+   private static BlobServer blobServer;
+
+   private MiniDispatcher miniDispatcher;
+
+   private CompletableFuture jobGraphFuture;
+
+   private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+   @BeforeClass
+   public static void setupClass() throws IOException {
+   rpcService = new TestingRpcService();
+   configuration = new Configuration();
+
+   configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+   blobServer = new BlobServer(configuration, new VoidBlobStore());
+   }
+
+   @Before
+   public void setup() throws Exc

[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r168126480
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -163,29 +210,130 @@ protected void runCluster(Configuration 
configuration) throws Exception {
blobServer,
heartbeatServices,
metricRegistry);
+
+   // TODO: Make shutDownAndTerminate non blocking to not 
use the global executor
+   dispatcher.getTerminationFuture().whenCompleteAsync(
+   (Boolean success, Throwable throwable) -> {
+   if (throwable != null) {
+   LOG.info("Could not properly 
terminate the Dispatcher.", throwable);
+   }
+
+   shutDownAndTerminate(
+   SUCCESS_RETURN_CODE,
+   ApplicationStatus.SUCCEEDED,
+   true);
+   });
}
}
 
protected void initializeServices(Configuration configuration) throws 
Exception {
-   assert(Thread.holdsLock(lock));
 
LOG.info("Initializing cluster services.");
 
-   final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
-   // TODO: Add support for port ranges
-   final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
-
-   commonRpcService = createRpcService(configuration, bindAddress, 
portRange);
-   haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
-   blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
-   blobServer.start();
-   heartbeatServices = createHeartbeatServices(configuration);
-   metricRegistry = createMetricRegistry(configuration);
-
-   // TODO: This is a temporary hack until we have ported the 
MetricQueryService to the new RpcEndpoint
-   // start the MetricQueryService
-   final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
-   metricRegistry.startQueryService(actorSystem, null);
+   synchronized (lock) {
+   final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
+   // TODO: Add support for port ranges
+   final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
+
+   commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
+   haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
+   blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
+   blobServer.start();
+   heartbeatServices = 
createHeartbeatServices(configuration);
+   metricRegistry = createMetricRegistry(configuration);
+
+   // TODO: This is a temporary hack until we have ported 
the MetricQueryService to the new RpcEndpoint
+   // start the MetricQueryService
+   final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
+   metricRegistry.startQueryService(actorSystem, null);
+   }
+   }
+
+   protected void startClusterComponents(
+   Configuration configuration,
--- End diff --

nit: indentation


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167853480
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) 
throws FlinkException {
exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
-
-   terminationFuture.complete(true);
}
 
if (exception != null) {
throw new FlinkException("Could not properly shut down 
the cluster services.", exception);
}
}
 
+   protected void stopClusterComponents() throws Exception {
+   synchronized (lock) {
+   Throwable exception = null;
+
+   if (webMonitorEndpoint != null) {
+   webMonitorEndpoint.shutdown(Time.seconds(10L));
+   }
+
+   if (dispatcherLeaderRetrievalService != null) {
+   try {
+   dispatcherLeaderRetrievalService.stop();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (dispatcher != null) {
+   try {
+   dispatcher.shutDown();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (resourceManagerRetrievalService != null) {
+   try {
+   resourceManagerRetrievalService.stop();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (resourceManager != null) {
+   try {
+   resourceManager.shutDown();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (archivedExecutionGraphStore != null) {
+   try {
+   archivedExecutionGraphStore.close();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (transientBlobCache != null) {
+   try {
+   transientBlobCache.close();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (exception != null) {
+   throw new FlinkException("Could not properly 
shut down the session cluster entry point.", exception);
+   }
+   }
+   }
+
@Override
public void onFatalError(Throwable exception) {
LOG.error("Fatal error occurred in the cluster entrypoint.", 
exception);
 
System.exit(RUNTIME_FAILURE_RETURN_CODE);
}
 
-   protected abstract void startClusterComponents(
+   // --
+   // Internal methods
+   // --
+
+   private void shutDownAndTerminate(
+   int returnCode,
+   ApplicationStatus applicationStatus,
+   @Nullable String diagnostics,
--- End diff --

True at the moment it won't be used. Initially I left it in because of the 
Yarn based cluster entrypoints. The old implementation actively deregisters 
from yarn passing this as additional information. For the moment, I'll remove 
it.


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167853004
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -300,4 +587,12 @@ protected static ClusterConfiguration 
parseArguments(String[] args) {
protected static Configuration loadConfiguration(ClusterConfiguration 
clusterConfiguration) {
return 
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir());
}
+
+   /**
+* Execution mode of the {@link MiniDispatcher}.
+*/
+   public enum ExecutionMode {
+   NORMAL, // waits until the job result has been served
--- End diff --

True. Will do.


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167852786
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -63,6 +89,10 @@
  */
 public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
+   public static final ConfigOption EXECUTION_MODE = ConfigOptions
+   .key("cluster.execution-mode")
--- End diff --

There are other configuration parameters as well like 
`local.number-taskmanager`. In the old Yarn code, we used environment variables 
to pass certain settings. I think the configuration is a better place. However, 
I agree that this might be a bit confusing. I'll add the prefix `internal` to 
the key.


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167851271
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link MiniDispatcher}.
+ */
+@Category(Flip6.class)
+public class MiniDispatcherTest extends TestLogger {
+
+   private static final JobGraph jobGraph = new JobGraph();
+
+   private static final Time timeout = Time.seconds(10L);
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static TestingRpcService rpcService;
+
+   private static Configuration configuration;
+
+   private static BlobServer blobServer;
+
+   private MiniDispatcher miniDispatcher;
+
+   private CompletableFuture jobGraphFuture;
+
+   private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+   @BeforeClass
+   public static void setupClass() throws IOException {
+   rpcService = new TestingRpcService();
+   configuration = new Configuration();
+
+   configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+   blobServer = new BlobServer(configuration, new VoidBlobStore());
+   }
+
+   @Before
+   public void setup() t

[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167850583
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link MiniDispatcher}.
+ */
+@Category(Flip6.class)
+public class MiniDispatcherTest extends TestLogger {
+
+   private static final JobGraph jobGraph = new JobGraph();
+
+   private static final Time timeout = Time.seconds(10L);
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static TestingRpcService rpcService;
+
+   private static Configuration configuration;
+
+   private static BlobServer blobServer;
+
+   private MiniDispatcher miniDispatcher;
+
+   private CompletableFuture jobGraphFuture;
+
+   private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+   @BeforeClass
+   public static void setupClass() throws IOException {
+   rpcService = new TestingRpcService();
+   configuration = new Configuration();
+
+   configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+   blobServer = new BlobServer(configuration, new VoidBlobStore());
+   }
+
+   @Before
+   public void setup() t

[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167850253
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -671,13 +674,68 @@ public void jobFinishedByOther() {
log.info("Job {} was finished by other JobManager.", 
jobId);
 
runAsync(
-   () -> {
-   try {
-   removeJob(jobId, false);
-   } catch (Exception e) {
-   log.warn("Could not properly 
remove job {} from the dispatcher.", jobId, e);
-   }
-   });
+   () -> 
Dispatcher.this.jobFinishedByOther(jobId));
}
}
+
+   //--
+   // Factories
+   //--
+
+   /**
+* Factory for a {@link JobManagerRunner}.
+*/
+   @FunctionalInterface
+   public interface JobManagerRunnerFactory {
+   JobManagerRunner createJobManagerRunner(
+   ResourceID resourceId,
+   JobGraph jobGraph,
+   Configuration configuration,
+   RpcService rpcService,
+   HighAvailabilityServices highAvailabilityServices,
+   HeartbeatServices heartbeatServices,
+   BlobServer blobServer,
+   JobManagerServices jobManagerServices,
+   MetricRegistry metricRegistry,
+   OnCompletionActions onCompleteActions,
+   FatalErrorHandler fatalErrorHandler,
+   @Nullable String restAddress) throws Exception;
+   }
+
+   /**
+* Singleton default factory for {@link JobManagerRunner}.
+*/
+   public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
+   INSTANCE {
--- End diff --

True. Will change it.


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167610671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -262,32 +430,151 @@ protected void shutDown(boolean cleanupHaData) 
throws FlinkException {
exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
}
}
-
-   terminationFuture.complete(true);
}
 
if (exception != null) {
throw new FlinkException("Could not properly shut down 
the cluster services.", exception);
}
}
 
+   protected void stopClusterComponents() throws Exception {
+   synchronized (lock) {
+   Throwable exception = null;
+
+   if (webMonitorEndpoint != null) {
+   webMonitorEndpoint.shutdown(Time.seconds(10L));
+   }
+
+   if (dispatcherLeaderRetrievalService != null) {
+   try {
+   dispatcherLeaderRetrievalService.stop();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (dispatcher != null) {
+   try {
+   dispatcher.shutDown();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (resourceManagerRetrievalService != null) {
+   try {
+   resourceManagerRetrievalService.stop();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (resourceManager != null) {
+   try {
+   resourceManager.shutDown();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (archivedExecutionGraphStore != null) {
+   try {
+   archivedExecutionGraphStore.close();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (transientBlobCache != null) {
+   try {
+   transientBlobCache.close();
+   } catch (Throwable t) {
+   exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+   }
+   }
+
+   if (exception != null) {
+   throw new FlinkException("Could not properly 
shut down the session cluster entry point.", exception);
+   }
+   }
+   }
+
@Override
public void onFatalError(Throwable exception) {
LOG.error("Fatal error occurred in the cluster entrypoint.", 
exception);
 
System.exit(RUNTIME_FAILURE_RETURN_CODE);
}
 
-   protected abstract void startClusterComponents(
+   // --
+   // Internal methods
+   // --
+
+   private void shutDownAndTerminate(
+   int returnCode,
+   ApplicationStatus applicationStatus,
+   @Nullable String diagnostics,
--- End diff --

`diagnostics` is unused. You already log errors so I don't think this 
argument is needed.


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167608001
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -300,4 +587,12 @@ protected static ClusterConfiguration 
parseArguments(String[] args) {
protected static Configuration loadConfiguration(ClusterConfiguration 
clusterConfiguration) {
return 
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir());
}
+
+   /**
+* Execution mode of the {@link MiniDispatcher}.
+*/
+   public enum ExecutionMode {
+   NORMAL, // waits until the job result has been served
--- End diff --

Should also be a Javadoc comment:
```
/**
 * Execution mode of the {@link MiniDispatcher}.
 */
public enum ExecutionMode {
/**
 * Waits until the job result has been served.
 */
NORMAL,
/**
 * Directly stops after the job has finished.
 */
DETACHED 
}
```


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167601541
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the {@link MiniDispatcher}.
+ */
+@Category(Flip6.class)
+public class MiniDispatcherTest extends TestLogger {
+
+   private static final JobGraph jobGraph = new JobGraph();
+
+   private static final Time timeout = Time.seconds(10L);
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static TestingRpcService rpcService;
+
+   private static Configuration configuration;
+
+   private static BlobServer blobServer;
+
+   private MiniDispatcher miniDispatcher;
+
+   private CompletableFuture jobGraphFuture;
+
+   private TestingLeaderElectionService dispatcherLeaderElectionService;
+
+   @BeforeClass
+   public static void setupClass() throws IOException {
+   rpcService = new TestingRpcService();
+   configuration = new Configuration();
+
+   configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+   blobServer = new BlobServer(configuration, new VoidBlobStore());
+   }
+
+   @Before
+   public void setup() throws Exc

[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167606814
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -63,6 +89,10 @@
  */
 public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
+   public static final ConfigOption EXECUTION_MODE = ConfigOptions
+   .key("cluster.execution-mode")
--- End diff --

Is it the first time that `ConfigOptions` are used for something that the 
user should/cannot not configure?


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5431#discussion_r167593183
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -671,13 +674,68 @@ public void jobFinishedByOther() {
log.info("Job {} was finished by other JobManager.", 
jobId);
 
runAsync(
-   () -> {
-   try {
-   removeJob(jobId, false);
-   } catch (Exception e) {
-   log.warn("Could not properly 
remove job {} from the dispatcher.", jobId, e);
-   }
-   });
+   () -> 
Dispatcher.this.jobFinishedByOther(jobId));
}
}
+
+   //--
+   // Factories
+   //--
+
+   /**
+* Factory for a {@link JobManagerRunner}.
+*/
+   @FunctionalInterface
+   public interface JobManagerRunnerFactory {
+   JobManagerRunner createJobManagerRunner(
+   ResourceID resourceId,
+   JobGraph jobGraph,
+   Configuration configuration,
+   RpcService rpcService,
+   HighAvailabilityServices highAvailabilityServices,
+   HeartbeatServices heartbeatServices,
+   BlobServer blobServer,
+   JobManagerServices jobManagerServices,
+   MetricRegistry metricRegistry,
+   OnCompletionActions onCompleteActions,
+   FatalErrorHandler fatalErrorHandler,
+   @Nullable String restAddress) throws Exception;
+   }
+
+   /**
+* Singleton default factory for {@link JobManagerRunner}.
+*/
+   public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
+   INSTANCE {
--- End diff --

Since there is only one instance, it is ok to write:
```
public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
INSTANCE;

@Override
public JobManagerRunner createJobManagerRunner(
ResourceID resourceId,
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
BlobServer blobServer,
JobManagerServices jobManagerServices,
MetricRegistry metricRegistry,
OnCompletionActions onCompleteActions,
FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress) throws Exception {
return new JobManagerRunner(
resourceId,
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerServices,
metricRegistry,
onCompleteActions,
fatalErrorHandler,
restAddress);
}
}
```
Saves one level of indentation. 


---


[GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5431

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

## What is the purpose of the change

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.

## Brief change log

- Introduce `SingleJobSubmittedJobGraphStore` for the `MiniDispatcher`
- Refactored `JobClusterEntrypoint` and `SessionClusterEntrypoint` such 
that most of the cluster component logic moved to `ClusterEntrypoint`
- Renamed `JobMasterRestEndpoint` into `MiniDispatcherRestEndpoint`
- Initialize `MiniDispatcher` with single job retrieved by the 
`JobClusterEntrypoint`
- Terminate `MiniDispatcher` after job completion if the its execution mode 
is `detached`

## Verifying this change

- Added `MiniDispatcherTest`
- The tests of `MiniDispatcher` are also valid for the `MiniDispatcher`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink implementMiniDispatcher

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5431


commit db57b574b0a2bacfa5aa164082c1eeb8f5258ed8
Author: Till Rohrmann 
Date:   2018-02-08T13:34:54Z

[FLINK-8608] [flip6] Implement MiniDispatcher for job mode

The MiniDispatcher is responsible for submitting the single job with which
a job mode cluster is started. Once the job has completed and if the cluster
has been started in detached mode, the MiniDispatcher will terminate.

In order to reduce code duplication, the MiniDispatcher is a sub class of 
the
Dispatcher which is started with a single job submitted job graph store.




---