[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227619
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -717,7 +645,26 @@ public int run(
yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
+
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   acceptInteractiveInput);
+   } catch (Exception e) {
+   LOG.info("Could not properly close the 
Yarn application status monitor.", e);
--- End diff --

Same here. Catch block could be avoided.


---


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159226314
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   true);
+   } catch (Exception e) {
--- End diff --

Closing `YarnApplicationStatusMonitor` should not throw any checked 
exceptions. If you change the signature, this catch block won't be needed.


---


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159224695
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -743,6 +690,142 @@ private void logAndSysout(String message) {
System.out.println(message);
}
 
+   public static void main(final String[] args) throws Exception {
+   final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
+
+   final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   final Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
+   SecurityUtils.install(new 
SecurityConfiguration(flinkConfiguration));
+   int retCode = 
SecurityUtils.getInstalledContext().runSecured(new Callable() {
+   @Override
+   public Integer call() {
+   return cli.run(args, flinkConfiguration, 
configurationDirectory);
+   }
+   });
+   System.exit(retCode);
+   }
+
+   private static void runInteractiveCli(
+   YarnClusterClient clusterClient,
+   YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+   boolean readConsoleInput) {
+   try (BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in))) {
+   boolean continueRepl = true;
+   int numTaskmanagers = 0;
+   long unknownStatusSince = System.currentTimeMillis();
+
+   while (continueRepl) {
+
+   final ApplicationStatus applicationStatus = 
yarnApplicationStatusMonitor.getApplicationStatusNow();
+
+   switch (applicationStatus) {
+   case FAILED:
+   case CANCELED:
+   System.err.println("The Flink 
Yarn cluster has failed.");
+   continueRepl = false;
+   break;
+   case UNKNOWN:
+   if (unknownStatusSince < 0L) {
+   unknownStatusSince = 
System.currentTimeMillis();
+   }
+
+   if ((System.currentTimeMillis() 
- unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) {
+   System.err.println("The 
Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
+   continueRepl = false;
+   } else {
+   continueRepl = 
repStep(in, readConsoleInput);
+   }
+   break;
+   case SUCCEEDED:
+   if (unknownStatusSince > 0L) {
+   unknownStatusSince = 
-1L;
+   }
+
+   // -- check if 
there are updates by the cluster ---
+   try {
+   final 
GetClusterStatusResponse status = clusterClient.getClusterStatus();
+
+   if (status != null && 
numTaskmanagers != status.numRegisteredTaskManagers()) {
+   
System.err.println("Number of connected TaskManagers changed to " +
+   
status.numRegisteredTaskManagers() + ". " +
+   "Slots 
available: " + status.totalNumberOfSlots());
+   numTaskmanagers 
= status.numRegisteredTaskManagers();
+   }
+   } catch (Exception e) {
+   LOG.warn("Could not 
retrieve the current cluster status. Skipping current retrieval attempt ...", 
e);
+   }
+
+   
printClusterMessages(clusterClient);
 

[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159228367
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -717,7 +645,26 @@ public int run(
yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
+
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+   runInteractiveCli(
+   yarnCluster,
+   yarnApplicationStatusMonitor,
+   acceptInteractiveInput);
--- End diff --

The code block looks duplicated except for this flag.


---


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159230885
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -743,6 +690,142 @@ private void logAndSysout(String message) {
System.out.println(message);
}
 
+   public static void main(final String[] args) throws Exception {
+   final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
+
+   final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   final Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
+   SecurityUtils.install(new 
SecurityConfiguration(flinkConfiguration));
+   int retCode = 
SecurityUtils.getInstalledContext().runSecured(new Callable() {
+   @Override
+   public Integer call() {
+   return cli.run(args, flinkConfiguration, 
configurationDirectory);
+   }
+   });
+   System.exit(retCode);
+   }
+
+   private static void runInteractiveCli(
+   YarnClusterClient clusterClient,
+   YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+   boolean readConsoleInput) {
+   try (BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in))) {
+   boolean continueRepl = true;
+   int numTaskmanagers = 0;
+   long unknownStatusSince = System.currentTimeMillis();
--- End diff --

nit: `System.nanoTime()` should be preferred to measure elapsed time 
because it does not depend on wall clock, i.e., it is not affected by the user 
changing the system's time: https://stackoverflow.com/a/351571
However, if you use `nanoTime()`, the trick in line `729` with negative 
`unknownStatusSince` won't work.


---


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159225871
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class which monitors the specified yarn application status 
periodically.
+ */
+public class YarnApplicationStatusMonitor implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationStatusMonitor.class);
+
+   private static final long UPDATE_INTERVAL = 1000L;
+
+   private final YarnClient yarnClient;
+
+   private final ApplicationId yarnApplicationId;
+
+   private final ScheduledFuture applicationStatusUpdateFuture;
+
+   private volatile ApplicationStatus applicationStatus;
+
+   public YarnApplicationStatusMonitor(
+   YarnClient yarnClient,
+   ApplicationId yarnApplicationId,
+   ScheduledExecutor scheduledExecutor) {
+   this.yarnClient = Preconditions.checkNotNull(yarnClient);
+   this.yarnApplicationId = 
Preconditions.checkNotNull(yarnApplicationId);
+
+   applicationStatusUpdateFuture = 
scheduledExecutor.scheduleWithFixedDelay(
+   this::updateApplicationStatus,
+   UPDATE_INTERVAL,
+   UPDATE_INTERVAL,
+   TimeUnit.MILLISECONDS);
+
+   applicationStatus = ApplicationStatus.UNKNOWN;
+   }
+
+   public ApplicationStatus getApplicationStatusNow() {
+   return applicationStatus;
+   }
+
+   @Override
+   public void close() throws Exception {
+   applicationStatusUpdateFuture.cancel(false);
--- End diff --

There is no need to declare `throws Exception` here because `cancel()` does 
not throw any checked exceptions.


---


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227955
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
--- End diff --

I think the executor could as well be in the Monitor. If needed in the 
future, one could provide a constructor that accepts an external executor 
(e.g., for unit tests).


---


[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...

2018-01-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5215#discussion_r159227421
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -660,7 +570,25 @@ public int run(
"yarn application -kill " + 
applicationId.getOpt());
yarnCluster.disconnect();
} else {
-   runInteractiveCli(yarnCluster, true);
+   ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+   try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+   yarnDescriptor.getYarnClient(),
+   yarnCluster.getApplicationId(),
+   new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
--- End diff --

Why do we need to use the `ScheduledExecutor` interface from Flink? Why not 
use Java's `ScheduledExecutorService` directly?


---


[GitHub] flink pull request #5223: [FLINK-8317][flip6] Implement Triggering of Savepo...

2018-01-01 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5223

[FLINK-8317][flip6] Implement Triggering of Savepoints

## What is the purpose of the change

*Implement triggering of savepoints through HTTP and through command line 
in FLIP-6 mode. This PR is based on #5207.*

CC: @tillrohrmann 

## Brief change log

- *Allow triggering of savepoints through RestfulGateway.*
- *Implement REST handlers to trigger and query the status of savepoints.*
- *Implement savepoint command in RestClusterClient.*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for REST handlesr, and `RestClusterClient`*
  - *Manually deployed the `SocketWindowWordCount` job and triggered a 
savepoint using Flink's command line client and `curl`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8317-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5223


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.

commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c
Author: gyao 
Date:   2017-12-22T23:02:10Z

[FLINK-8299][flip6] Retrieve JobExecutionResult after job submission

commit 55d920f628d7ef3f5b0db7fd843dfdd2d96a3917
Author: gyao 
Date:   2018-01-01T17:59:42Z

[FLINK-8317][flip6] Implement savepoints in RestClusterClient

Allow triggering of savepoints through RestfulGateway. Implement REST 
handlers
to trigger and query the status of savepoints. Implement
savepoint command in RestClusterClient.




---


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159153854
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Utility class to simulate in memory file like writes, flushes and 
closing.
+ */
+public class ContentDump {
+   private boolean writable = true;
+   private Map> filesContent = new HashMap<>();
+
+   public Set listFiles() {
+   return filesContent.keySet();
--- End diff --

Maybe return a copy here because the key set will reflect changes in the 
map.


---


[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability

2018-01-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5193#discussion_r159154132
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ---
@@ -492,6 +503,10 @@ public void close() throws Exception {
processingTimeService.shutdownService();
}
setupCalled = false;
+
+   if (internalEnvironment.isPresent()) {
--- End diff --

I think to enable this `Environment` must implement `AutoCloseable` as 
well. Maybe an empty default `close()` method? 
If you decide to stick with `Optional`, maybe change this line to: 
`internalEnvironment.ifPresent(MockEnvironment::close);`


---


[GitHub] flink issue #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClust...

2017-12-30 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5216
  
There are some checkstyle violations:
```
[ERROR] 
src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java:[33,8]
 (imports) UnusedImports: Unused import: 
java.util.concurrent.ScheduledExecutorService.
[ERROR] src/main/java/org/apache/flink/yarn/YarnClusterClient.java:[46,8] 
(imports) UnusedImports: Unused import: 
org.apache.hadoop.yarn.client.api.YarnClient.
```


---


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2017-12-24 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5207

[FLINK-8299][flip6] Poll JobExecutionResult after job submission

## What is the purpose of the change

*Poll JobExecutionResult after job submission. This is needed, for example, 
to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on 
#5194.*

CC: @tillrohrmann 


## Brief change log

  - *Retrieve JobExecutionResult after job submission in 
`RestClusterClient`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new classes and changed classes.*
  - *Manually run job in examples/batch/WordCount.jar and verified that the 
results are correctly collected/printed.*
  
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8299

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5207


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.

commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c
Author: gyao 
Date:   2017-12-22T23:02:10Z

[FLINK-8299][flip6] Retrieve JobExecutionResult after job submission




---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158265225
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 ---
@@ -19,81 +19,61 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Objects;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A base class for tests that run test programs in a Flink mini cluster.
+ * Base class for unit tests that run multiple tests and want to reuse the 
same
+ * Flink cluster. This saves a significant amount of time, since the 
startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the 
StreamExecutionEnvironment from
+ * the context:
+ *
+ * 
+ *   {@literal @}Test
+ *   public void someTest() {
+ *   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *   // test code
+ *   env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *   // test code
+ *   env.execute();
+ *   }
+ *
+ * 
  */
 public abstract class AbstractTestBase extends TestBaseUtils {
 
-   /** Configuration to start the testing cluster with. */
-   protected final Configuration config;
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractTestBase.class);
--- End diff --

Maybe `protected final Logger log = LoggerFactory.getLogger(getClass());` 
so that the class name of the implementation is logged.


---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158264752
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
 ---
@@ -66,47 +47,34 @@ protected void postSubmit() throws Exception {}
 
@Test
public void testJob() throws Exception {
+   // pre-submit
try {
-   // pre-submit
-   try {
-   preSubmit();
-   }
-   catch (Exception e) {
-   System.err.println(e.getMessage());
-   e.printStackTrace();
-   fail("Pre-submit work caused an error: " + 
e.getMessage());
-   }
-
-   // prepare the test environment
-   startCluster();
-
-   TestStreamEnvironment.setAsContext(this.executor, 
getParallelism());
+   preSubmit();
+   }
+   catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Pre-submit work caused an error: " + 
e.getMessage());
--- End diff --

nit: The test should fail on exception anyways. If you want to leave it to 
keep the diff smaller, it's also ok.


---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158261437
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 ---
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.runtime.stream
 
-import java.math.BigDecimal
 import java.lang.{Integer => JInt, Long => JLong}
+import java.math.BigDecimal
--- End diff --

nit: Are we following import orders for Scala as described here for Java: 
http://flink.apache.org/contribute-code.html#imports?


---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158260663
  
--- Diff: 
flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
 ---
@@ -23,8 +23,7 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
--- End diff --

nit: The import looks strange. I think

```
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
```
is enough.



---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158259995
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 ---
@@ -55,6 +55,7 @@ public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setGlobalJobParameters(params);
+   env.setParallelism(1);
--- End diff --

Is this strictly needed? It's not a Unit or ITCase and the example seems to 
work without this line.


---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158259149
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 ---
@@ -215,23 +211,15 @@ private static void 
startSecureFlinkClusterWithRecoveryModeEnabled() {
dfs.mkdirs(new Path("/flink/checkpoints"));
dfs.mkdirs(new Path("/flink/recovery"));
 
-   org.apache.flink.configuration.Configuration config = 
new org.apache.flink.configuration.Configuration();
-
-   
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-   
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
-   
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
-   
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-   config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
-   config.setString(CoreOptions.STATE_BACKEND, 
"filesystem");
-   
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI 
+ "/flink/checkpoints");
-   
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + 
"/flink/recovery");
-   config.setString("state.backend.fs.checkpointdir", 
hdfsURI + "/flink/checkpoints");
-
-   
SecureTestEnvironment.populateFlinkSecureConfigurations(config);
-
-   cluster = TestBaseUtils.startCluster(config, false);
-   TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
+   
MINICLUSTER_CONFIGURATION.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, 
false);
--- End diff --

Because of this:
```
private static void skipIfHadoopVersionIsNotAppropriate() {
// Skips all tests if the Hadoop version doesn't match
String hadoopVersionString = VersionInfo.getVersion();
String[] split = hadoopVersionString.split("\\.");
if (split.length != 3) {
throw new IllegalStateException("Hadoop version was not 
of format 'X.X.X': " + hadoopVersionString);
}
Assume.assumeTrue(
// check whether we're running Hadoop version >= 3.x.x
Integer.parseInt(split[0]) >= 3
);
}
```
I assume that the test will never run.

I wonder if the test has ever worked correctly.


---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158257284
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 ---
@@ -19,81 +19,61 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Objects;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A base class for tests that run test programs in a Flink mini cluster.
+ * Base class for unit tests that run multiple tests and want to reuse the 
same
+ * Flink cluster. This saves a significant amount of time, since the 
startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the 
StreamExecutionEnvironment from
+ * the context:
+ *
+ * 
+ *   {@literal @}Test
+ *   public void someTest() {
+ *   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *   // test code
+ *   env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *   // test code
+ *   env.execute();
+ *   }
+ *
+ * 
  */
 public abstract class AbstractTestBase extends TestBaseUtils {
 
-   /** Configuration to start the testing cluster with. */
-   protected final Configuration config;
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractTestBase.class);
 
-   private final FiniteDuration timeout;
+   private static final int DEFAULT_PARALLELISM = 4;
 
-   protected int taskManagerNumSlots = 1;
+   protected static final Configuration MINICLUSTER_CONFIGURATION = new 
Configuration();
 
-   protected int numTaskManagers = 1;
+   @ClassRule
--- End diff --

`miniClusterResource` will be initialized before 
`MINICLUSTER_CONFIGURATION` can be modified by a `@BeforeClass` method, i.e., 
for some cases the configuration cannot be supplied in time.


---


[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases

2017-12-21 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4896#discussion_r158254745
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 ---
@@ -19,81 +19,61 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Objects;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A base class for tests that run test programs in a Flink mini cluster.
+ * Base class for unit tests that run multiple tests and want to reuse the 
same
+ * Flink cluster. This saves a significant amount of time, since the 
startup and
+ * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the 
StreamExecutionEnvironment from
+ * the context:
+ *
+ * 
+ *   {@literal @}Test
+ *   public void someTest() {
+ *   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+ *   // test code
+ *   env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *   // test code
+ *   env.execute();
+ *   }
+ *
+ * 
  */
 public abstract class AbstractTestBase extends TestBaseUtils {
 
-   /** Configuration to start the testing cluster with. */
-   protected final Configuration config;
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractTestBase.class);
 
-   private final FiniteDuration timeout;
+   private static final int DEFAULT_PARALLELISM = 4;
 
-   protected int taskManagerNumSlots = 1;
+   protected static final Configuration MINICLUSTER_CONFIGURATION = new 
Configuration();
--- End diff --

I think it's dangerous to have mutable global state. 

For example if I have the following two tests:
```
public class TestTest extends AbstractTestBase {

@BeforeClass
public static void setUp() throws Exception {
MINICLUSTER_CONFIGURATION.setString("foo", "bar");
}

@Test
public void name() throws Exception {
System.out.println(MINICLUSTER_CONFIGURATION);
}

}
```

```
public class TestTest2 extends AbstractTestBase {

@Test
public void name() throws Exception {
System.out.println(MINICLUSTER_CONFIGURATION);
}

}
```
and run them both from IntelliJ, `{foo=bar}` is printed twice.

`MINICLUSTER_CONFIGURATION` is never cleaned up.


---


[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158081176
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference>() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map> accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS:
+   assertNextToken(p, 
JsonToken.START_OBJECT);
+   accumulatorResults = 
parse

[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5194#discussion_r158078273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobExecutionResult}.
+ *
+ * @see JobExecutionResultSerializer
+ */
+public class JobExecutionResultDeserializer extends 
StdDeserializer {
+
+   private static final long serialVersionUID = 1L;
+
+   private final JobIDDeserializer jobIdDeserializer = new 
JobIDDeserializer();
+
+   private final SerializedThrowableDeserializer 
serializedThrowableDeserializer =
+   new SerializedThrowableDeserializer();
+
+   private final SerializedValueDeserializer serializedValueDeserializer;
+
+   public JobExecutionResultDeserializer() {
+   super(JobExecutionResult.class);
+   final JavaType objectSerializedValueType = 
TypeFactory.defaultInstance()
+   .constructType(new 
TypeReference>() {
+   });
+   serializedValueDeserializer = new 
SerializedValueDeserializer(objectSerializedValueType);
+   }
+
+   @Override
+   public JobExecutionResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
+   JobID jobId = null;
+   long netRuntime = -1;
+   SerializedThrowable serializedThrowable = null;
+   Map> accumulatorResults = null;
+
+   while (true) {
+   final JsonToken jsonToken = p.nextToken();
+   assertNotEndOfInput(p, jsonToken);
+   if (jsonToken == JsonToken.END_OBJECT) {
+   break;
+   }
+
+   final String fieldName = p.getValueAsString();
+   switch (fieldName) {
+   case 
JobExecutionResultSerializer.FIELD_NAME_JOB_ID:
+   assertNextToken(p, 
JsonToken.VALUE_STRING);
+   jobId = 
jobIdDeserializer.deserialize(p, ctxt);
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME:
+   assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
+   netRuntime = p.getLongValue();
+   break;
+   case 
JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS:
+   assertNextToken(p, 
JsonToken.START_OBJECT);
+   accumulatorResults = 
parseAccumulatorResults(

[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler

2017-12-20 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5194

[FLINK-8233][flip6] Add JobExecutionResultHandler

## What is the purpose of the change

*Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. 
This will be needed so that accumulator results can be transmitted to the 
client.*

This PR is based on #5184.

## Brief change log

  - *Add `JobExecutionResultHandler` to enable retrieval of 
`JobExecutionResult`.*
  - *Add serializer and deserializer for `JobExecutionResult`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new and changed classes.*
  - *Manually ran the WordCount example job and fetched the 
`JobExecutionResult` with `curl`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8233-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5194


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.




---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157963387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() {
 
private volatile Throwable runnerException;
 
-   private volatile JobExecutionResult result;
+   private volatile 
org.apache.flink.runtime.jobmaster.JobExecutionResult result;

BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
this.jobId = jobId;
this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
}
 
@Override
-   public void jobFinished(JobExecutionResult jobResult) {
-   this.result = jobResult;
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   this.result = result;
jobMastersToWaitFor.countDown();
}
 
@Override
-   public void jobFailed(Throwable cause) {
-   jobException = cause;
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   
checkArgument(result.getSerializedThrowable().isPresent());
+
+   jobException = result
--- End diff --

Actually it is not needed to store the exception separately because the 
JobExecutionResult already contains the exception.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157962431
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?



---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157961376
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
+
+   private final JobID jobId;
+
+   private static final long serialVersionUID = 1L;
--- End diff --

Fixed.


---


[GitHub] flink issue #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5168
  
New PR #5184 


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157961174
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
--- End diff --

I think it is not a *is-a* relationship.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157960760
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
 ---
@@ -55,6 +56,26 @@ public SerializedJobExecutionResult(JobID jobID, long 
netRuntime,
this.accumulatorResults = accumulators;
}
 
+   /**
+* Creates an instance from {@link JobExecutionResult}.
+*/
+   public static SerializedJobExecutionResult from(final 
JobExecutionResult jobExecutionResult) {
+   final Map accumulatorResults = 
jobExecutionResult.getAllAccumulatorResults();
+
+   final Map> 
serializedAccumulatorResults = new HashMap<>(accumulatorResults.size());
+   for (final Map.Entry entry : 
accumulatorResults.entrySet()) {
+   try {
+   
serializedAccumulatorResults.put(entry.getKey(), new 
SerializedValue<>(entry.getValue()));
+   } catch (final IOException e) {
+   throw new RuntimeException(e);
--- End diff --

Not needed anymore.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157960722
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +362,28 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture> getJobExecutionResult(
--- End diff --

Done.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157960666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
+   assertJobExecutionResultNotCached(result.getJobId());
--- End diff --

It's just being strict. Can remove if it is wrong.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157878862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int 
numJobManagersToWaitFor) {
}
 
@Override
-   public void jobFinished(JobExecutionResult result) {
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
decrementCheckAndCleanup();
}
 
@Override
-   public void jobFailed(Throwable cause) {
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
--- End diff --

Maybe rename to `JobResult` after all to avoid fqn.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877969
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
+
+   private final JobID jobId;
+
+   private final Map> accumulatorResults;
+
+   private final long netRuntime;
+
+   private final SerializedThrowable serializedThrowable;
+
+   private JobExecutionResult(
+   final JobID jobId,
+   final Map> 
accumulatorResults,
+   final long netRuntime,
+   @Nullable final SerializedThrowable 
serializedThrowable) {
+
+   checkArgument(netRuntime >= 0, "netRuntime must be greater than 
or equals 0");
+
+   this.jobId = requireNonNull(jobId);
+   this.accumulatorResults = requireNonNull(accumulatorResults);
+   this.netRuntime = netRuntime;
+   this.serializedThrowable = serializedThrowable;
+   }
+
+   public boolean isSuccess() {
--- End diff --

Javadocs are missing.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?

CC: @tillrohrmann 


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
--- End diff --

Javadoc needs to be updated.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876793
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
+* failure cause.
+*
+* @param jobId ID of the job that we are interested in.
+* @param timeout Timeout for the asynchronous operation.
+*
+* @see #isJobExecutionResultPresent(JobID, Time)
+*
+* @return {@link CompletableFuture} containing the {@link 
JobExecutionResult} or a
+* {@link Throwable} which represents the failure cause. If there is no 
result, the future will
+* be completed exceptionally with
+* {@link 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException}
+*/
+   default CompletableFuture getJobExecutionResult(
+   JobID jobId,
+   @RpcTimeout Time timeout) {
+   throw new UnsupportedOperationException();
+   }
+
+   /**
+* Tests if the {@link SerializedJobExecutionResult} is present.
--- End diff --

Javadoc needs to be updated.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5184

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

## What is the purpose of the change

Cache `JobExecutionResult` in `Dispatcher`, and add methods to 
`RestfulGateway` to enable retrieval of results through HTTP (not yet 
implemented). This will be needed so that accumulator results can be 
transmitted to the client.

## Brief change log

  - *Introduce new JobExecutionResult used by JobMaster to forward the 
information in the already existing JobExecutionResult.*
  - *Always cache a JobExecutionResult. Even in case of job failures. In 
case of job failures, the serialized exception is stored additionally.*
  - *Introduce new methods to RestfulGateway to allow retrieval of cached 
JobExecutionResults.*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests to verify that the Dispatcher caches the job results 
when the job finishes successfully or by failure.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5184


commit d05c76e621106810c32bc17aa0576923ba6be401
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults




---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread GJL
Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/5168


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157489280
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +362,28 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture> getJobExecutionResult(
+   final JobID jobId,
+   final Time timeout) {
+   final Either 
jobExecutionResult =
+   jobExecutionResultCache.get(jobId);
+   if (jobExecutionResult == null) {
+   return FutureUtils.completedExceptionally(new 
JobExecutionResultNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(jobExecutionResult);
+   }
+   }
+
+   @Override
+   public CompletableFuture isJobExecutionResultPresent(final 
JobID jobId, final Time timeout) {
+   final boolean jobExecutionResultPresent = 
jobExecutionResultCache.contains(jobId);
+   if (!jobManagerRunners.containsKey(jobId) && 
!jobExecutionResultPresent) {
+   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   }
+   return 
CompletableFuture.completedFuture(jobExecutionResultPresent);
--- End diff --

But this would never return a future containing `false`. 


---


[GitHub] flink pull request #4889: [FLINK-7903] [tests] Add flip6 build profile

2017-12-18 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4889#discussion_r157481924
  
--- Diff: pom.xml ---
@@ -125,6 +125,9 @@ under the License.
1.6.5
1.3
false
+   
--- End diff --

nit: I don't understand this comment. Shouldn't it be something like 
`` ?


---


[GitHub] flink pull request #4889: [FLINK-7903] [tests] Add flip6 build profile

2017-12-18 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4889#discussion_r157468208
  
--- Diff: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/category/Flip6.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.category;
+
+/**
+ * Category marker interface for Junit.
+ */
+public interface Flip6 {
--- End diff --

nit: This is an interface, but `OldAndFlip6 ` is a class. 
nit: Maybe include *flip6* in the javadoc, e.g., *Category marker interface 
for Flip6 Junit tests.*


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-18 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157462859
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResult.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.util.SerializedValue;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class is used to represent the information in {@link 
JobExecutionResult} as JSON. In case
+ * of a job failure, no {@link JobExecutionResult} will be available. In 
this case instances of this
+ * class will only store a {@link Throwable}.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobExecutionResult {
+
+   private static final String FIELD_NAME_JOB_ID = "id";
+
+   private static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
+
+   private static final String FIELD_NAME_ACCUMULATOR_RESULTS = 
"accumulator-results";
+
+   private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause";
+
+   @JsonSerialize(using = JobIDSerializer.class)
+   @JsonDeserialize(using = JobIDDeserializer.class)
+   @JsonProperty(value = FIELD_NAME_JOB_ID, required = true)
+   private final JobID jobId;
+
+   @JsonProperty(FIELD_NAME_NET_RUNTIME)
+   private final Long netRuntime;
+
+   @JsonProperty(FIELD_NAME_ACCUMULATOR_RESULTS)
+   private final Map> accumulatorResults;
+
+   @JsonProperty(FIELD_NAME_FAILURE_CAUSE)
+   private final Throwable throwable;
--- End diff --

Maybe only include the errorMessage to avoid ser/des issues.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027881
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
+   assertJobExecutionResultNotCached(result.getJobId());
+   jobExecutionResultCache.put(result.getJobId(), 
Either.Right(result));
+   }
+
+   public void put(final JobID jobId, Throwable throwable) {
+   assertJobExecutionResultNotCached(jobId);
+   jobExecutionResultCache.put(jobId, Either.Left(throwable));
+   }
+
+   public boolean contains(final JobID jobId) {
+   return jobExecutionResultCache.getIfPresent(jobId) != null;
+   }
+
+   @Nullable
+   public Either get(final JobID 
jobId) {
--- End diff --

Not sure if I am abusing Flink's `Either` here.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027633
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Exception indicating that we could not find a
+ * {@link org.apache.flink.api.common.JobExecutionResult} under the given 
{@link JobID}.
+ */
+public class JobExecutionResultNotFoundException extends FlinkException {
+
+   private final JobID jobId;
+
+   private static final long serialVersionUID = 1L;
--- End diff --

Should be on top.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, 
TimeUnit.SECONDS)
+   .build();
+
+   public void put(final SerializedJobExecutionResult result) {
--- End diff --

Javadocs are missing.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157027178
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
--- End diff --

I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields 
are not always represented in the JSON. The `RestClient` would otherwise run 
into problems.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026590
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResult;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for 
a given {@link JobID}.
+ */
+public class JobExecutionResultHandler
--- End diff --

Sample response after running batch WordCount example:
```
{
  "status": {
"id": "CREATED"
  },
  "job-execution-result": {
"id": "533a165a6de7f70919a54b1d6f36d3b3",
"net-runtime": 0,
"accumulator-results": {
  "94a58184eb17398571f35da42b714517": 
"rO0ABXNyABNqYXZhLnV0aWwuQXJyYXlMaXN0eIHSHZnHYZ0DAAFJAARzaXpleHCqdwQAAACqdXIAAltCrPMX+AYIVOACAAB4cAYCYQV1cQB+AAILB2FjdGlvbgF1cQB+AAIKBmFmdGVyAXVxAH4AAgwIYWdhaW5zdAF1cQB+AAIIBGFsbAJ1cQB+AAIIBGFuZAx1cQB+AAIJBWFybXMBdXEAfgACCwdhcnJvd3MBdXEAfgACCQVhd3J5AXVxAH4AAgcDYXkBdXEAfgACCQViYXJlAXVxAH4AAgcDYmUEdXEAfgACCQViZWFyA3VxAH4AAgsHYm9ka2luAXVxAH4AAgoGYm91cm4BdXEAfgACCARidXQBdXEAfgACBwNieQJ1cQB+AAINCWNhbGFtaXR5AXVxAH4AAgkFY2FzdAF1cQB+AAIJBWNvaWwBdXEAfgACCQVjb21lAXVxAH4AAg8LY29uc2NpZW5jZQF1cQB+AAIRDWNvbnN1bW1hdGlvbgF1cQB+AAIOCmNvbnR1bWVseQF1cQB+AAIMCGNvdW50cnkBdXEAfgACDAhjb3dhcmRzAXVxAH4AAg0JY3VycmVudHMBdXEAfgACBgJkBHVxAH4AAgoGZGVhdGgCdXEAfgACCgZkZWxheQF1cQB+AAILB2Rlc3BpcwF1cQB+AAINCWRldm91dGx5AAA
 
AAXVxAH4AAggEZGllAnVxAH4AAgkFZG9lcwF1cQB+AAIKBmRyZWFkAXVxAH4AAgoGZHJlYW0BdXEAfgACCwdkcmVhbXMBdXEAfgACCARlbmQCdXEAfgACEAxlbnRlcnByaXNlcwF1cQB+AAIHA2VyAXVxAH4AAgkFZmFpcgF1cQB+AAIMCGZhcmRlbHMBdXEAfgACCgZmbGVzaAF1cQB+AAIIBGZseQF1cQB+AAIIBGZvcgJ1cQB+AAIMCGZvcnR1bmUBdXEAfgACCQVmcm9tAXVxAH4AAgkFZ2l2ZQF1cQB+AAIKBmdyZWF0AXVxAH4AAgoGZ3J1bnQBdXEAfgACCQVoYXZlAnVxAH4AAgcDaGUBdXEAfgACDgpoZWFydGFjaGUBdXEAfgACCQVoZWlyAXVxAH4AAgwIaGltc2VsZgF1cQB+AAIIBGhpcwF1cQB+AAIIBGh1ZQF1cQB+AAIJBWlsbHMBdXEAfgACBwNpbgN1cQB+AAIOCmluc29sZW5jZQF1cQB+AAIHA2lzA3VxAH4AAgkF

[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157026313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
 ---
@@ -33,10 +40,23 @@
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-   
DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+   DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
objectMapper.disable(
SerializationFeature.FAIL_ON_EMPTY_BEANS);
+
+   final SimpleModule jacksonFlinkModule = new SimpleModule();
+
+   final JavaType serializedValueWildcardType = objectMapper
+   .getTypeFactory()
+   .constructType(new TypeReference>() {
+   });
+
+   jacksonFlinkModule.addSerializer(new 
SerializedValueSerializer(serializedValueWildcardType));
--- End diff --

Could also be done using `@JsonSerialization` annotation


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5168#discussion_r157025791
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.types.Either;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link SerializedJobExecutionResult}s.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache>
--- End diff --

Cache isn't size limited.


---


[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP

2017-12-14 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5168

[FLINK-8234][flip6] WIP

WIP

@tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5168


commit cc969846791bf818fbc81feb241a188410431ae5
Author: gyao 
Date:   2017-12-14T16:27:16Z

[FLINK-8234][flip6] WIP




---


[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4987#discussion_r155804733
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -757,6 +775,64 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
return 
CompletableFuture.completedFuture(executionGraph.getState());
}
 
+   
//--
+   // RestfulGateway RPC methods
+   
//--
+
+   @Override
+   public CompletableFuture requestRestAddress(Time timeout) {
+   return restAddressFuture;
+   }
+
+   @Override
+   public CompletableFuture requestJob(JobID jobId, 
Time timeout) {
+   if (Objects.equals(jobGraph.getJobID(), jobId)) {
--- End diff --

When I see `Objects.equals`, I am assuming that it's possible that both 
arguments can be null. However, `jobGraph.getJobID()` is always non-null.


---


[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4987#discussion_r155802027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -27,8 +27,6 @@
 
 /**
  * Interface for a metric registry.
-
-   LOG.debug("Started MetricQueryService under 
{}.", metricQueryServicePath);
--- End diff --

beautiful


---


[GitHub] flink pull request #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in ...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4988#discussion_r155797999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 ---
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
+import 
org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
+import 
org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import 
org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksTimes

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770364
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
+
+   private static final DummySlotOwner slotOwner = new DummySlotOwner();
+
+   private static final TestingAllocatedSlotActions allocatedSlotActions = 
new TestingAllocatedSlotActions();
--- End diff --

This instance is mutable... should not be `static`


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770104
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
--- End diff --

Should be `SLOT_SHARING_GROUP_ID` since it is a constant.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155768880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final Map> 
resolvedRootSlots;
+
+

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155758219
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
--- End diff --

nit: *leaf nodes*


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155754738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.g

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155751694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.g

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155604499
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
--- End diff --

The variable name is confusing. `multiTaskSlotFuture` is not of type 
`Future`.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155605251
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155590317
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
--- End diff --

nit: variable name should be *leaf* 

https://www.dict.cc/?s=leaf


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155549755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -32,6 +34,20 @@
  */
 public interface LogicalSlot {
 
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


---


[GitHub] flink issue #5086: [FLINK-8078] Introduce LogicalSlot interface

2017-12-07 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5086
  
LGTM


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155519870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final Map> 
resolvedRootSlots;
+
+

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155528224
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final Map> 
resolvedRootSlots;
+
+

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155520946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
--- End diff --

nit: All fields are commented with non-javadoc comments. Normally comments 
on fields are also done in Javadoc style, e.g., `SlotPool`. Javadoc comments on 
fields are displayed by IntelliJ (`Ctrl + J`).


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
--- End diff --

nit: wrong import order (not sorted lexicographically)
```
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
```
items should appear before `LogicalSlot`


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507294
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final Map> 
resolvedRootSlots;
+
+

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503866
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
--- End diff --

😃 


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
+
+   }
+
+   scheduler.instanceDied(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i3);
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+   assertFalse(i1.isAlive());
+   assertFalse(i2.isAlive());
+   assertFalse(i3.isAlive());
+   }
+   catch (Exception e) {
--- End diff --

Better propagate the exception but I guess this file was copy pasted.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155502971
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Simple {@link AllocatedSlotActions} implementations for testing 
purposes.
+ */
+public class TestingAllocatedSlotActions implements AllocatedSlotActions {
+
+   private volatile Consumer> releaseSlotConsumer;
+
+   public void setReleaseSlotConsumer(Consumer> releaseSlotConsumer) {
+   this.releaseSlotConsumer = 
Preconditions.checkNotNull(releaseSlotConsumer);
+   }
+
+   @Override
+   public CompletableFuture releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable 
Throwable cause) {
+   Consumer> 
currentReleaseSlotConsumer = this.releaseSlotConsumer;
+
+   if (currentReleaseSlotConsumer != null) {
+   
currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, 
cause));
--- End diff --

nit: whitespace after `cause`
```
... cause   ));
```


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r155496482
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
+* requests.
+*
+* See FLINK-8089
+*/
+   @Test
+   public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
+   final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+   final JobMasterId jobMasterId = JobMasterId.generate();
+   final String jobMasterAddress = "foobar";
+   final CompletableFuture allocationIdFuture = new 
CompletableFuture<>();
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+
+   resourceManagerGateway.setRequestSlotConsumer(
+   (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+   final SlotRequestID slotRequestId1 = new SlotRequestID();
+   final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+   try {
+   slotPool.start(jobMasterId, jobMasterAddress);
+
+   final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+
+   final ScheduledUnit scheduledUnit = new 
ScheduledUnit(mock(Execution.class));
+
+   
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+   CompletableFuture slotFuture1 = 
slotPoolGateway.allocateSlot(
+   slotRequestId1,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   // wait for the first slot request
+   final AllocationID allocationId = 
allocationIdFuture.get();
+
+   CompletableFuture slotFuture2 = 
slotPoolGateway.allocateSlot(
+   slotRequestId2,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+   try {
+   // this should fail with a CancellationException
+   slotFuture1.get();
+   fail("The first slot future should have failed 
because it was cancelled.");
+   } catch (ExecutionException ee) {
+   
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
CancellationException);
+   }
+
+   final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+
+   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+   
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
+
+   // the slot offer should fulfil the second slot request
--- End diff --

nit: same here


---


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r155495322
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
--- End diff --

nit: *fulfill* instead of *fulfil*


---


[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5088#discussion_r155296898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
 ---
@@ -144,6 +144,78 @@ public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
 
+   /**
+* Triggers the release of the logical slot.
+*/
+   public void triggerLogicalSlotRelease() {
+   final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+   if (logicalSlot != null) {
+   logicalSlot.releaseSlot();
+   }
+   }
+
+   /**
+* Releases the logical slot.
+*
+* @return true if the logical slot could be released, false otherwise.
+*/
+   public boolean releaseLogicalSlot() {
+   final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+   if (logicalSlot != null) {
+   if (logicalSlot instanceof Slot) {
+   final Slot slot = (Slot) logicalSlot;
+   if (slot.markReleased()) {
+   logicalSlotReference.set(null);
+   return true;
+   }
+   } else {
+   throw new RuntimeException("Unsupported logical 
slot type encounterd " + logicalSlot.getClass());
--- End diff --

Typo: * encounterd* 


---


[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5088#discussion_r155279729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -312,24 +312,36 @@ public void returnAllocatedSlot(Slot slot) {
// (1) do we have a slot available already?
SlotAndLocality slotFromPool = availableSlots.poll(resources, 
locationPreferences);
if (slotFromPool != null) {
-   SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), 
slotFromPool.locality());
--- End diff --

Method `createSimpleSlot` is no longer in use.


---


[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5088#discussion_r155276432
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface for the context of a logical {@link Slot}. This context 
contains information
+ * about the underlying allocated slot and how to communicate with the 
TaskManager on which
+ * it was allocated.
+ */
+public interface SlotContext {
+
+   /**
+* Gets the ID under which the slot is allocated, which uniquely 
identifies the slot.
+*
+* @return The ID under which the slot is allocated
+*/
+   AllocationID getAllocationId();
+
+   /**
+* Gets the location info of the TaskManager that offers this slot.
+*
+* @return The location info of the TaskManager that offers this slot
+*/
+   TaskManagerLocation getTaskManagerLocation();
+
+   /**
+* Gets the number of the slot.
+*
+* @return The number of the slot on the TaskManager.
+*/
+   int getPhysicalSlotNumber();
+
+   /**
+* Gets the actor gateway that can be used to send messages to the 
TaskManager.
+* 
+* This method should be removed once the new interface-based RPC 
abstraction is in place
+*
+* @return The actor gateway that can be used to send messages to the 
TaskManager.
--- End diff --

The fact that `TaskManagerGateway` can be an *actor gateway* is not 
something that is relevant for the Javadoc.


---


[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5088#discussion_r155258087
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotsTestImplContext.java
 ---
@@ -31,7 +30,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class AllocatedSlotsTest {
+public class SlotsTestImplContext {
--- End diff --

Why was this renamed?


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r155245759
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -507,6 +514,41 @@ public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from 
the LeaderElectionService.", exception));
}
 
+   //--
+   // SubmittedJobGraphListener
+   //--
+
+   @Override
+   public void onAddedJobGraph(final JobID jobId) {
+   getRpcService().execute(() -> {
+   final SubmittedJobGraph submittedJobGraph;
+   try {
+   submittedJobGraph = 
submittedJobGraphStore.recoverJobGraph(jobId);
+   } catch (final Exception e) {
+   log.error("Could not recover job graph for job 
{}.", jobId, e);
+   return;
+   }
+   runAsync(() -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
--- End diff --

Removed.


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r155245771
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -507,6 +514,41 @@ public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from 
the LeaderElectionService.", exception));
}
 
+   //--
+   // SubmittedJobGraphListener
+   //--
+
+   @Override
+   public void onAddedJobGraph(final JobID jobId) {
+   getRpcService().execute(() -> {
+   final SubmittedJobGraph submittedJobGraph;
+   try {
+   submittedJobGraph = 
submittedJobGraphStore.recoverJobGraph(jobId);
+   } catch (final Exception e) {
+   log.error("Could not recover job graph for job 
{}.", jobId, e);
+   return;
+   }
+   runAsync(() -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT);
+   }
+   });
+   });
+   }
+
+   @Override
+   public void onRemovedJobGraph(final JobID jobId) {
+   runAsync(() -> {
+   if (jobManagerRunners.containsKey(jobId)) {
--- End diff --

Removed.


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r155242826
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -86,122 +125,143 @@ public static void teardown() {
}
}
 
-   /**
-* Tests that we can submit a job to the Dispatcher which then spawns a
-* new JobManagerRunner.
-*/
-   @Test
-   public void testJobSubmission() throws Exception {
-   TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
+   @Before
+   public void setUp() throws Exception {
+   MockitoAnnotations.initMocks(this);
 
-   TestingLeaderElectionService dispatcherLeaderElectionService = 
new TestingLeaderElectionService();
-   TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-   
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
-   haServices.setSubmittedJobGraphStore(new 
StandaloneSubmittedJobGraphStore());
+   final JobVertex testVertex = new JobVertex("testVertex");
+   testVertex.setInvokableClass(NoOpInvokable.class);
+   jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex);
+   jobGraph.setAllowQueuedScheduling(true);
 
-   HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1L);
-   JobManagerRunner jobManagerRunner = 
mock(JobManagerRunner.class);
+   fatalErrorHandler = new TestingFatalErrorHandler();
+   final HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1L);
+   submittedJobGraphStore = spy(new 
InMemorySubmittedJobGraphStore());
 
-   final JobGraph jobGraph = mock(JobGraph.class);
-   final JobID jobId = new JobID();
-   when(jobGraph.getJobID()).thenReturn(jobId);
+   dispatcherLeaderElectionService = new 
TestingLeaderElectionService();
+   jobMasterLeaderElectionService = new 
TestingLeaderElectionService();
 
-   final TestingDispatcher dispatcher = new TestingDispatcher(
+   final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+   
haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService);
+   haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+   haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, 
jobMasterLeaderElectionService);
+   haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+   haServices.setResourceManagerLeaderRetriever(new 
TestingLeaderRetrievalService());
+   runningJobsRegistry = haServices.getRunningJobsRegistry();
+
+   final Configuration blobServerConfig = new Configuration();
+   blobServerConfig.setString(
+   BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+
+   dispatcher = new TestingDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
new Configuration(),
haServices,
mock(ResourceManagerGateway.class),
-   mock(BlobServer.class),
+   new BlobServer(blobServerConfig, new VoidBlobStore()),
heartbeatServices,
-   mock(MetricRegistryImpl.class),
+   new NoOpMetricRegistry(),
fatalErrorHandler,
-   jobManagerRunner,
-   jobId);
+   TEST_JOB_ID);
 
-   try {
-   dispatcher.start();
+   dispatcher.start();
+   }
 
-   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+   @After
+   public void tearDown() throws Exception {
+   try {
+   fatalErrorHandler.rethrowError();
+   } finally {
+   RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT);
+   }
+   }
 
-   // wait for the leader to be elected
-   leaderFuture.get();
+   /**
+* Tests that we can submit a job to the Dispatcher which then spawns a
+* new JobManagerRunner.
+*/
+   @Test
+   public void testJobSubmission() throws Exception {
+   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
-   Dispa

[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r155242900
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -86,122 +125,143 @@ public static void teardown() {
}
}
 
-   /**
-* Tests that we can submit a job to the Dispatcher which then spawns a
-* new JobManagerRunner.
-*/
-   @Test
-   public void testJobSubmission() throws Exception {
-   TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
+   @Before
+   public void setUp() throws Exception {
+   MockitoAnnotations.initMocks(this);
--- End diff --

Nice catch.


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r155242108
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * In-Memory implementation of {@link SubmittedJobGraphStore} for testing 
purposes.
+ */
+public class InMemorySubmittedJobGraphStore implements 
SubmittedJobGraphStore {
+
+   private final Map storedJobs = new 
HashMap<>();
+
+   private boolean started;
+
+   @Override
+   public synchronized void start(@Nullable SubmittedJobGraphListener 
jobGraphListener) throws Exception {
--- End diff --

Not sure if it is actually applicable. `onRemovedJobGraph` explicitly 
demands that the graph is removed by a different `SubmittedJobGraphStore` 
instance:

```
/**
 * Callback for {@link SubmittedJobGraph} instances removed by 
a different {@link
 * SubmittedJobGraphStore} instance.
 *
 * @param jobId The {@link JobID} of the removed job graph
 */
void onRemovedJobGraph(JobID jobId);
```


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r155240139
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * In-Memory implementation of {@link SubmittedJobGraphStore} for testing 
purposes.
+ */
+public class InMemorySubmittedJobGraphStore implements 
SubmittedJobGraphStore {
+
+   private final Map storedJobs = new 
HashMap<>();
+
+   private boolean started;
+
+   @Override
+   public synchronized void start(@Nullable SubmittedJobGraphListener 
jobGraphListener) throws Exception {
--- End diff --

Good point.


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155227397
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
---
@@ -208,27 +209,61 @@ public void setLocality(Locality locality) {
// 

 
@Override
-   public void releaseSlot() {
+   public void releaseInstanceSlot() {
+   releaseSlot();
+   }
+
+   @Override
+   public CompletableFuture releaseSlot() {
if (!isCanceled()) {
+   final CompletableFuture terminationFuture;
 
-   // kill all tasks currently running in this slot
-   Execution exec = this.executedTask;
-   if (exec != null && !exec.isFinished()) {
-   exec.fail(new Exception("TaskManager was 
lost/killed: " + getTaskManagerLocation()));
-   }
+   if (payload != null) {
+   // trigger the failure of the slot payload
+   payload.fail(new FlinkException("TaskManager 
was lost/killed: " + getTaskManagerLocation()));
 
-   // release directly (if we are directly allocated),
-   // otherwise release through the parent shared slot
-   if (getParent() == null) {
-   // we have to give back the slot to the owning 
instance
-   if (markCancelled()) {
-   getOwner().returnAllocatedSlot(this);
-   }
+   // wait for the termination of the payload 
before releasing the slot
+   terminationFuture = 
payload.getTerminalStateFuture();
} else {
-   // we have to ask our parent to dispose us
-   getParent().releaseChild(this);
+   terminationFuture = 
CompletableFuture.completedFuture(null);
}
+
+   terminationFuture.whenComplete(
+   (Object ignored, Throwable throwable) -> {
+   // release directly (if we are directly 
allocated),
+   // otherwise release through the parent 
shared slot
+   if (getParent() == null) {
+   // we have to give back the 
slot to the owning instance
+   if (markCancelled()) {
--- End diff --

If `markCancelled` returns `false`, `releaseFuture` will never be 
completed. Is that intended? 


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155225733
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java 
---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+   /**
+* Return the TaskManager location of this slot
+*
+* @return TaskManager location of this slot
+*/
+   TaskManagerLocation getTaskManagerLocation();
+
+   /**
+* Return the TaskManager gateway to talk to the TaskManager.
+*
+* @return TaskManager gateway to talk to the TaskManager
+*/
+   TaskManagerGateway getTaskManagerGateway();
+
+   /**
+* True if the slot is still alive.
+*
+* @return True if the slot is still alive, otherwise false
+*/
+   boolean isAlive();
+
+   /**
+* Tries to assign a payload to this slot. This can only happens
+* exactly once.
+*
+* @param payload to be assigned to this slot.
+* @return true if the payload could be set, otherwise false
+*/
+   boolean tryAssignPayload(Payload payload);
+
+   /**
+* Returns the set payload or null if none.
+*
+* @return Payload of this slot of null if none
+*/
+   @Nullable
+   Payload getPayload();
+
+   /**
+* Releases this slot.
+*
+* @return Future which is completed once the slot has been released,
+*  in case of a failure it is completed exceptionally
+*/
+   CompletableFuture releaseSlot();
+
+   /**
+* Gets the slot number on the TaskManager.
+*
+* @return slot number
+*/
+   int getPhysicalSlotNumber();
+
+   /**
+* Gets the allocation id of this slot.
+*
+* @return allocation id of this slot
+*/
+   AllocationID getAllocationId();
+
+   /**
+* Payload for a logical slot.
+*/
+   interface Payload {
+
+   /**
+* Fail the payload with the given cause.
+*
+* @param cause of the failure
+*/
+   void fail(Throwable cause);
--- End diff --

`Execution#fail(Throwable)` is not annotated with `@Override`.


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155208534
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
---
@@ -37,16 +41,18 @@
  * If this slot is part of a {@link SharedSlot}, then the parent 
attribute will point to that shared slot.
  * If not, then the parent attribute is null.
  */
-public class SimpleSlot extends Slot {
+public class SimpleSlot extends Slot implements LogicalSlot {
 
/** The updater used to atomically swap in the execution */
--- End diff --

Will the payload always be the `execution`?


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-06 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155207330
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -965,11 +965,20 @@ public void cancel() {
 
// we build a future that is complete 
once all vertices have reached a terminal state
final ConjunctFuture allTerminal 
= FutureUtils.waitForAll(futures);
-   allTerminal.thenAccept(
-   (Void value) -> {
-   // cancellations may 
currently be overridden by failures which trigger
-   // restarts, so we need 
to pass a proper restart global version here
-   
allVerticesInTerminalState(globalVersionForRestart);
+   allTerminal.whenCompleteAsync(
--- End diff --

Does it have to run asynchronously? If yes, does it make sense to specify a 
thread pool? Now it can run on `ForkJoinPool.commonPool()`.


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155018207
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() 
throws ExecutionException, Int
assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
}
 
+   /**
+* Checks that the {@link Execution} termination future is only 
completed after the
+* assigned slot has been released.
+*
+* NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
+* to execute this test multiple times to see the failure.
+*/
+   @Test
+   public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
+   final JobVertexID jobVertexId = new JobVertexID();
+   final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+
+   final SimpleSlot slot = new SimpleSlot(
+   new JobID(),
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway());
+
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+   slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
+
+   Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
+
+   CompletableFuture returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
+   CompletableFuture terminationFuture = 
executionVertex.cancel();
+
+   // run canceling in a separate thread to allow an interleaving 
between termination
+   // future callback registrations
+   CompletableFuture.runAsync(
+   () -> currentExecutionAttempt.cancelingComplete(),
+   TestingUtils.defaultExecutor());
--- End diff --

If an executor is created, where is it shutdown afterwards? I think this 
should be just fine:

```
new Thread() {
@Override
public void run() {
currentExecutionAttempt.cancelingComplete();
}
}.start();
```


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155016510
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
@@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() 
throws ExecutionException, Int
assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
}
 
+   /**
+* Checks that the {@link Execution} termination future is only 
completed after the
+* assigned slot has been released.
+*
+* NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
+* to execute this test multiple times to see the failure.
+*/
+   @Test
+   public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
+   final JobVertexID jobVertexId = new JobVertexID();
+   final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+
+   final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
+
+   final SimpleSlot slot = new SimpleSlot(
+   new JobID(),
+   slotOwner,
+   new LocalTaskManagerLocation(),
+   0,
+   new SimpleAckingTaskManagerGateway());
+
+   final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+   slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
+
+   ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+   new JobID(),
+   slotProvider,
+   new NoRestartStrategy(),
+   jobVertex);
+
+   ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+   ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+   assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
+
+   Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
+
+   CompletableFuture returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
+   CompletableFuture terminationFuture = 
executionVertex.cancel();
+
+   // run canceling in a separate thread to allow an interleaving 
between termination
+   // future callback registrations
+   CompletableFuture.runAsync(
+   () -> currentExecutionAttempt.cancelingComplete(),
+   TestingUtils.defaultExecutor());
+
+   // to increase probability for problematic interleaving, let 
the current thread yield the processor
+   Thread.yield();
+
+   CompletableFuture restartFuture = 
terminationFuture.thenApply(
+   ignored -> {
+   try {
+   assertTrue(returnedSlotFuture.isDone());
+   } catch (Exception e) {
+   throw new CompletionException(e);
--- End diff --

`isDone` does not throw an exception. `assertTrue` throws an `Error`, which 
is not an `Exception`. Is it possible to get into this code path?


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155015331
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 ---
@@ -294,11 +297,11 @@ public void testScheduleWithDyingInstances() {

i2.markDead();

-   for (SimpleSlot slot : slots) {
-   if (slot.getOwner() == i2) {
-   assertTrue(slot.isCanceled());
+   for (LogicalSlot slot : slots) {
+   if 
(Objects.equals(slot.getTaskManagerLocation().getResourceID(), 
i2.getTaskManagerID())) {
--- End diff --

I think `slot.getTaskManagerLocation().getResourceID()` cannot return null. 
Is there a need to use `Objects.equals`?


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r155012983
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -331,8 +334,19 @@ public void setInitialState(TaskStateSnapshot 
checkpointStateHandles) {
 *
 * @return A future for the execution's termination
 */
-   public CompletableFuture getTerminationFuture() {
-   return terminationFuture;
+   @Override
+   public CompletableFuture getTerminalStateFuture() {
+   return terminalStateFuture;
+   }
+
+   /**
+* Gets the release future which is completed once the execution 
reaches a terminal
+* state and the assigned resource has been released.
+*
+* @return
--- End diff --

`@return` tag can be removed.


---


[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface

2017-12-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5087#discussion_r154954054
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
---
@@ -37,16 +41,18 @@
  * If this slot is part of a {@link SharedSlot}, then the parent 
attribute will point to that shared slot.
  * If not, then the parent attribute is null.
  */
-public class SimpleSlot extends Slot {
+public class SimpleSlot extends Slot implements LogicalSlot {
 
/** The updater used to atomically swap in the execution */
--- End diff --

nit: _to swap in the payload_


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-04 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154748301
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -244,7 +302,32 @@ protected JobManagerRunner createJobManagerRunner(
FatalErrorHandler fatalErrorHandler) throws 
Exception {
assertEquals(expectedJobId, jobGraph.getJobID());
 
-   return jobManagerRunner;
+   return new JobManagerRunner(resourceId, jobGraph, 
configuration, rpcService,
+   highAvailabilityServices, heartbeatServices, 
jobManagerServices, metricRegistry,
+   onCompleteActions, fatalErrorHandler);
+   }
+
+   @Override
+   public CompletableFuture submitJob(final JobGraph 
jobGraph, final Time timeout) {
+   final CompletableFuture submitJobFuture = 
super.submitJob(jobGraph, timeout);
+
+   try {
+   submitJobFuture.get();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+
+   submitJobLatch.countDown();
+   return submitJobFuture;
+   }
+
+   @Override
+   void recoverJobs() {
+   if (recoverJobsEnabled.get()) {
--- End diff --

Without this I do not see how I can verify whether a job was submitted 
regularly or via `recoverJobs`.


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-04 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154747386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -534,6 +536,40 @@ public void handleError(final Exception exception) {
onFatalError(new DispatcherException("Received an error from 
the LeaderElectionService.", exception));
}
 
+   //--
+   // SubmittedJobGraphListener
+   //--
+
+   @Override
+   public void onAddedJobGraph(final JobID jobId) {
+   runAsync(() -> {
+   final SubmittedJobGraph submittedJobGraph;
+   try {
+   submittedJobGraph = 
submittedJobGraphStore.recoverJobGraph(jobId);
+   } catch (final Exception e) {
+   log.error("Could not submit job {}.", jobId, e);
--- End diff --

Changed it.


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-04 Thread GJL
GitHub user GJL reopened a pull request:

https://github.com/apache/flink/pull/5107

[FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher

## What is the purpose of the change

The FLIP-6 dispatcher never calls `start()` on its SubmittedJobGraphStore 
instance. Hence, when a Job is submitted (YARN session mode with HA enabled), 
an IllegalStateException is thrown. This pull request adds the necessary 
changes so that jobs can be submitted.

## Brief change log

  - *Implement SubmittedJobGraphListener interface in Dispatcher*
 
## Verifying this change

  - *Added unit tests for new methods in Dispatcher class*
  - *Verified that jobs can be submitted in FLIP-6 YARN session mode with 
HA. Did not verify anything else.*


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8176

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5107.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5107


commit d238ef0c23eea585974929eafdff33af916d19ba
Author: gyao 
Date:   2017-11-30T14:37:30Z

[hotfix][tests] Extract SubmittedJobGraphStore implementation from 
JobManagerHARecoveryTest

commit 33b9d2848c767088f43fed2d03e6402695827221
Author: gyao 
Date:   2017-11-30T14:44:23Z

[FLINK-8176][flip6] Implement SubmittedJobGraphListener interface in 
Dispatcher

Call start() on SubmittedJobGraphStore with Dispatcher as listener. To 
enable
this, the dispatcher must implement the SubmittedJobGraphListener 
interface. Add
simple unit tests for the new methods. Refactor DispatcherTest to remove
redundancy.

commit 88359172e23413aa195177993551613349056b68
Author: gyao 
Date:   2017-12-04T18:57:29Z

[FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe

commit 9cdb29604e9915c7d6ea60ed6fcee06c9bad57b9
Author: gyao 
Date:   2017-12-04T18:58:26Z

[hotfix][Javadoc] Make first sentence in JobSubmissionException Javadoc end 
with period

commit 53ad1771e8bec063157d69c0f7a187ccb5fb340e
Author: gyao 
Date:   2017-12-04T19:04:52Z

[FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService

commit 0c030fb19d7b5b9dba4df5811a69086906e20ca0
Author: gyao 
Date:   2017-12-04T19:05:47Z

[FLINK-8176][flip6] Return same RunningJobsRegistry instance from 
TestingHighAvailabilityServices

commit 7a04cbe54bcf380684c4e79a4f999b31b650570e
Author: gyao 
Date:   2017-12-04T19:09:36Z

[FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest

Check if jobManagerRunner exists before submitting job.
Replace JobManagerRunner mock used in tests with real instance.
Do not run job graph recovery in actor main thread when job graph is 
recovered
from SubmittedJobGraphListener#onAddedJobGraph(JobID).




---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-01 Thread GJL
Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/5107


---


[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154320952
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception {
heartbeatServices,
mock(MetricRegistryImpl.class),
fatalErrorHandler,
-   jobManagerRunner,
-   jobId);
+   mockJobManagerRunner,
+   TEST_JOB_ID);
 
-   try {
-   dispatcher.start();
+   dispatcher.start();
+   }
 
-   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+   @After
+   public void tearDown() throws Exception {
+   try {
+   fatalErrorHandler.rethrowError();
+   } finally {
+   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+   }
+   }
 
-   // wait for the leader to be elected
-   leaderFuture.get();
+   /**
+* Tests that we can submit a job to the Dispatcher which then spawns a
+* new JobManagerRunner.
+*/
+   @Test
+   public void testJobSubmission() throws Exception {
+   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
-   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+   // wait for the leader to be elected
+   leaderFuture.get();
 
-   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
+   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-   acknowledgeFuture.get();
+   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
 
-   verify(jobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
+   acknowledgeFuture.get();
 
-   // check that no error has occurred
-   fatalErrorHandler.rethrowError();
-   } finally {
-   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-   }
+   verify(mockJobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
}
 
/**
 * Tests that the dispatcher takes part in the leader election.
 */
@Test
public void testLeaderElection() throws Exception {
-   TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
-   TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-
UUID expectedLeaderSessionId = UUID.randomUUID();
-   CompletableFuture leaderSessionIdFuture = new 
CompletableFuture<>();
-   SubmittedJobGraphStore mockSubmittedJobGraphStore = 
mock(SubmittedJobGraphStore.class);
-   TestingLeaderElectionService testingLeaderElectionService = new 
TestingLeaderElectionService() {
-   @Override
-   public void confirmLeaderSessionID(UUID 
leaderSessionId) {
-   super.confirmLeaderSessionID(leaderSessionId);
-   leaderSessionIdFuture.complete(leaderSessionId);
-   }
-   };
-
-   
haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
-   
haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
-   HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
-   final JobID jobId = new JobID();
-
-   final TestingDispatcher dispatcher = new TestingDispatcher(
-   rpcService,
-   Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
-   new Configuration(),
-   haServices,
-   mock(ResourceManagerGateway.class),
-   mock(BlobServer.class),
-   heartbeatServices,
-   mock(MetricRegistryImpl.class),
-   fatalErrorHandler,
-   mock(JobManagerRunner.class),
-   jobId);
 
-   try {
-   dispatcher.start();
+   
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
 
-   assertFalse(leaderSessionIdFuture.isDone());
+   
dispatcherLeaderElectionService.isLeader(expecte

[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...

2017-12-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5107#discussion_r154320903
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
@@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception {
heartbeatServices,
mock(MetricRegistryImpl.class),
fatalErrorHandler,
-   jobManagerRunner,
-   jobId);
+   mockJobManagerRunner,
+   TEST_JOB_ID);
 
-   try {
-   dispatcher.start();
+   dispatcher.start();
+   }
 
-   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+   @After
+   public void tearDown() throws Exception {
+   try {
+   fatalErrorHandler.rethrowError();
+   } finally {
+   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+   }
+   }
 
-   // wait for the leader to be elected
-   leaderFuture.get();
+   /**
+* Tests that we can submit a job to the Dispatcher which then spawns a
+* new JobManagerRunner.
+*/
+   @Test
+   public void testJobSubmission() throws Exception {
+   CompletableFuture leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
 
-   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+   // wait for the leader to be elected
+   leaderFuture.get();
 
-   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
+   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-   acknowledgeFuture.get();
+   CompletableFuture acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
 
-   verify(jobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
+   acknowledgeFuture.get();
 
-   // check that no error has occurred
-   fatalErrorHandler.rethrowError();
-   } finally {
-   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
-   }
+   verify(mockJobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
}
 
/**
 * Tests that the dispatcher takes part in the leader election.
 */
@Test
public void testLeaderElection() throws Exception {
-   TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
-   TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-
UUID expectedLeaderSessionId = UUID.randomUUID();
-   CompletableFuture leaderSessionIdFuture = new 
CompletableFuture<>();
-   SubmittedJobGraphStore mockSubmittedJobGraphStore = 
mock(SubmittedJobGraphStore.class);
-   TestingLeaderElectionService testingLeaderElectionService = new 
TestingLeaderElectionService() {
-   @Override
-   public void confirmLeaderSessionID(UUID 
leaderSessionId) {
-   super.confirmLeaderSessionID(leaderSessionId);
-   leaderSessionIdFuture.complete(leaderSessionId);
-   }
-   };
-
-   
haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
-   
haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
-   HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
-   final JobID jobId = new JobID();
-
-   final TestingDispatcher dispatcher = new TestingDispatcher(
-   rpcService,
-   Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
-   new Configuration(),
-   haServices,
-   mock(ResourceManagerGateway.class),
-   mock(BlobServer.class),
-   heartbeatServices,
-   mock(MetricRegistryImpl.class),
-   fatalErrorHandler,
-   mock(JobManagerRunner.class),
-   jobId);
 
-   try {
-   dispatcher.start();
+   
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
 
-   assertFalse(leaderSessionIdFuture.isDone());
+   
dispatcherLeaderElectionService.isLeader(expecte

<    1   2   3   4   5   6   7   >