[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227619 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -717,7 +645,26 @@ public int run( yarnCluster.waitForClusterToBeReady(); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, acceptInteractiveInput); + + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))){ + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + acceptInteractiveInput); + } catch (Exception e) { + LOG.info("Could not properly close the Yarn application status monitor.", e); --- End diff -- Same here. Catch block could be avoided. ---
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159226314 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))) { + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + true); + } catch (Exception e) { --- End diff -- Closing `YarnApplicationStatusMonitor` should not throw any checked exceptions. If you change the signature, this catch block won't be needed. ---
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159224695 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -743,6 +690,142 @@ private void logAndSysout(String message) { System.out.println(message); } + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + + final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + + final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); + int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() { + return cli.run(args, flinkConfiguration, configurationDirectory); + } + }); + System.exit(retCode); + } + + private static void runInteractiveCli( + YarnClusterClient clusterClient, + YarnApplicationStatusMonitor yarnApplicationStatusMonitor, + boolean readConsoleInput) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + boolean continueRepl = true; + int numTaskmanagers = 0; + long unknownStatusSince = System.currentTimeMillis(); + + while (continueRepl) { + + final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow(); + + switch (applicationStatus) { + case FAILED: + case CANCELED: + System.err.println("The Flink Yarn cluster has failed."); + continueRepl = false; + break; + case UNKNOWN: + if (unknownStatusSince < 0L) { + unknownStatusSince = System.currentTimeMillis(); + } + + if ((System.currentTimeMillis() - unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) { + System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster."); + continueRepl = false; + } else { + continueRepl = repStep(in, readConsoleInput); + } + break; + case SUCCEEDED: + if (unknownStatusSince > 0L) { + unknownStatusSince = -1L; + } + + // -- check if there are updates by the cluster --- + try { + final GetClusterStatusResponse status = clusterClient.getClusterStatus(); + + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + + status.numRegisteredTaskManagers() + ". " + + "Slots available: " + status.totalNumberOfSlots()); + numTaskmanagers = status.numRegisteredTaskManagers(); + } + } catch (Exception e) { + LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e); + } + + printClusterMessages(clusterClient);
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159228367 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -717,7 +645,26 @@ public int run( yarnCluster.waitForClusterToBeReady(); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, acceptInteractiveInput); + + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))){ + runInteractiveCli( + yarnCluster, + yarnApplicationStatusMonitor, + acceptInteractiveInput); --- End diff -- The code block looks duplicated except for this flag. ---
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159230885 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -743,6 +690,142 @@ private void logAndSysout(String message) { System.out.println(message); } + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + + final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + + final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); + int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() { + return cli.run(args, flinkConfiguration, configurationDirectory); + } + }); + System.exit(retCode); + } + + private static void runInteractiveCli( + YarnClusterClient clusterClient, + YarnApplicationStatusMonitor yarnApplicationStatusMonitor, + boolean readConsoleInput) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + boolean continueRepl = true; + int numTaskmanagers = 0; + long unknownStatusSince = System.currentTimeMillis(); --- End diff -- nit: `System.nanoTime()` should be preferred to measure elapsed time because it does not depend on wall clock, i.e., it is not affected by the user changing the system's time: https://stackoverflow.com/a/351571 However, if you use `nanoTime()`, the trick in line `729` with negative `unknownStatusSince` won't work. ---
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159225871 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.cli; + +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Utility class which monitors the specified yarn application status periodically. + */ +public class YarnApplicationStatusMonitor implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationStatusMonitor.class); + + private static final long UPDATE_INTERVAL = 1000L; + + private final YarnClient yarnClient; + + private final ApplicationId yarnApplicationId; + + private final ScheduledFuture applicationStatusUpdateFuture; + + private volatile ApplicationStatus applicationStatus; + + public YarnApplicationStatusMonitor( + YarnClient yarnClient, + ApplicationId yarnApplicationId, + ScheduledExecutor scheduledExecutor) { + this.yarnClient = Preconditions.checkNotNull(yarnClient); + this.yarnApplicationId = Preconditions.checkNotNull(yarnApplicationId); + + applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay( + this::updateApplicationStatus, + UPDATE_INTERVAL, + UPDATE_INTERVAL, + TimeUnit.MILLISECONDS); + + applicationStatus = ApplicationStatus.UNKNOWN; + } + + public ApplicationStatus getApplicationStatusNow() { + return applicationStatus; + } + + @Override + public void close() throws Exception { + applicationStatusUpdateFuture.cancel(false); --- End diff -- There is no need to declare `throws Exception` here because `cancel()` does not throw any checked exceptions. ---
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227955 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); --- End diff -- I think the executor could as well be in the Monitor. If needed in the future, one could provide a constructor that accepts an external executor (e.g., for unit tests). ---
[GitHub] flink pull request #5215: [FLINK-8328] [flip6] Move Yarn ApplicationStatus p...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159227421 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -660,7 +570,25 @@ public int run( "yarn application -kill " + applicationId.getOpt()); yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster, true); + ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( + yarnDescriptor.getYarnClient(), + yarnCluster.getApplicationId(), + new ScheduledExecutorServiceAdapter(scheduledExecutorService))) { --- End diff -- Why do we need to use the `ScheduledExecutor` interface from Flink? Why not use Java's `ScheduledExecutorService` directly? ---
[GitHub] flink pull request #5223: [FLINK-8317][flip6] Implement Triggering of Savepo...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5223 [FLINK-8317][flip6] Implement Triggering of Savepoints ## What is the purpose of the change *Implement triggering of savepoints through HTTP and through command line in FLIP-6 mode. This PR is based on #5207.* CC: @tillrohrmann ## Brief change log - *Allow triggering of savepoints through RestfulGateway.* - *Implement REST handlers to trigger and query the status of savepoints.* - *Implement savepoint command in RestClusterClient.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for REST handlesr, and `RestClusterClient`* - *Manually deployed the `SocketWindowWordCount` job and triggered a savepoint using Flink's command line client and `curl`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8317-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5223.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5223 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyao Date: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c Author: gyao Date: 2017-12-22T23:02:10Z [FLINK-8299][flip6] Retrieve JobExecutionResult after job submission commit 55d920f628d7ef3f5b0db7fd843dfdd2d96a3917 Author: gyao Date: 2018-01-01T17:59:42Z [FLINK-8317][flip6] Implement savepoints in RestClusterClient Allow triggering of savepoints through RestfulGateway. Implement REST handlers to trigger and query the status of savepoints. Implement savepoint command in RestClusterClient. ---
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159153854 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/ContentDump.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Utility class to simulate in memory file like writes, flushes and closing. + */ +public class ContentDump { + private boolean writable = true; + private Map> filesContent = new HashMap<>(); + + public Set listFiles() { + return filesContent.keySet(); --- End diff -- Maybe return a copy here because the key set will reflect changes in the map. ---
[GitHub] flink pull request #5193: [FLINK-8268][tests] Improve tests stability
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159154132 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java --- @@ -492,6 +503,10 @@ public void close() throws Exception { processingTimeService.shutdownService(); } setupCalled = false; + + if (internalEnvironment.isPresent()) { --- End diff -- I think to enable this `Environment` must implement `AutoCloseable` as well. Maybe an empty default `close()` method? If you decide to stick with `Optional`, maybe change this line to: `internalEnvironment.ifPresent(MockEnvironment::close);` ---
[GitHub] flink issue #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClust...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5216 There are some checkstyle violations: ``` [ERROR] src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java:[33,8] (imports) UnusedImports: Unused import: java.util.concurrent.ScheduledExecutorService. [ERROR] src/main/java/org/apache/flink/yarn/YarnClusterClient.java:[46,8] (imports) UnusedImports: Unused import: org.apache.hadoop.yarn.client.api.YarnClient. ``` ---
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5207 [FLINK-8299][flip6] Poll JobExecutionResult after job submission ## What is the purpose of the change *Poll JobExecutionResult after job submission. This is needed, for example, to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on #5194.* CC: @tillrohrmann ## Brief change log - *Retrieve JobExecutionResult after job submission in `RestClusterClient`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new classes and changed classes.* - *Manually run job in examples/batch/WordCount.jar and verified that the results are correctly collected/printed.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8299 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5207 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyao Date: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c Author: gyao Date: 2017-12-22T23:02:10Z [FLINK-8299][flip6] Retrieve JobExecutionResult after job submission ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158265225 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java --- @@ -19,81 +19,61 @@ package org.apache.flink.test.util; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.FileUtils; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Objects; - -import scala.concurrent.duration.FiniteDuration; /** - * A base class for tests that run test programs in a Flink mini cluster. + * Base class for unit tests that run multiple tests and want to reuse the same + * Flink cluster. This saves a significant amount of time, since the startup and + * shutdown of the Flink clusters (including actor systems, etc) usually dominates + * the execution of the actual tests. + * + * To write a unit test against this test base, simply extend it and add + * one or more regular test methods and retrieve the StreamExecutionEnvironment from + * the context: + * + * + * {@literal @}Test + * public void someTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * */ public abstract class AbstractTestBase extends TestBaseUtils { - /** Configuration to start the testing cluster with. */ - protected final Configuration config; + protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class); --- End diff -- Maybe `protected final Logger log = LoggerFactory.getLogger(getClass());` so that the class name of the implementation is logged. ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158264752 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java --- @@ -66,47 +47,34 @@ protected void postSubmit() throws Exception {} @Test public void testJob() throws Exception { + // pre-submit try { - // pre-submit - try { - preSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Pre-submit work caused an error: " + e.getMessage()); - } - - // prepare the test environment - startCluster(); - - TestStreamEnvironment.setAsContext(this.executor, getParallelism()); + preSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Pre-submit work caused an error: " + e.getMessage()); --- End diff -- nit: The test should fail on exception anyways. If you want to leave it to keep the diff smaller, it's also ok. ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158261437 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala --- @@ -18,8 +18,8 @@ package org.apache.flink.table.runtime.stream -import java.math.BigDecimal import java.lang.{Integer => JInt, Long => JLong} +import java.math.BigDecimal --- End diff -- nit: Are we following import orders for Scala as described here for Java: http://flink.apache.org/contribute-code.html#imports? ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158260663 --- Diff: flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala --- @@ -23,8 +23,7 @@ import java.io.File import org.apache.commons.io.FileUtils import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} --- End diff -- nit: The import looks strange. I think ``` import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment ``` is enough. ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158259995 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java --- @@ -55,6 +55,7 @@ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setGlobalJobParameters(params); + env.setParallelism(1); --- End diff -- Is this strictly needed? It's not a Unit or ITCase and the example seems to work without this line. ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158259149 --- Diff: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java --- @@ -215,23 +211,15 @@ private static void startSecureFlinkClusterWithRecoveryModeEnabled() { dfs.mkdirs(new Path("/flink/checkpoints")); dfs.mkdirs(new Path("/flink/recovery")); - org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); - - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); - config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); - config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - config.setString(CoreOptions.STATE_BACKEND, "filesystem"); - config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); - config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); - - SecureTestEnvironment.populateFlinkSecureConfigurations(config); - - cluster = TestBaseUtils.startCluster(config, false); - TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); + MINICLUSTER_CONFIGURATION.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); --- End diff -- Because of this: ``` private static void skipIfHadoopVersionIsNotAppropriate() { // Skips all tests if the Hadoop version doesn't match String hadoopVersionString = VersionInfo.getVersion(); String[] split = hadoopVersionString.split("\\."); if (split.length != 3) { throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString); } Assume.assumeTrue( // check whether we're running Hadoop version >= 3.x.x Integer.parseInt(split[0]) >= 3 ); } ``` I assume that the test will never run. I wonder if the test has ever worked correctly. ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158257284 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java --- @@ -19,81 +19,61 @@ package org.apache.flink.test.util; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.FileUtils; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Objects; - -import scala.concurrent.duration.FiniteDuration; /** - * A base class for tests that run test programs in a Flink mini cluster. + * Base class for unit tests that run multiple tests and want to reuse the same + * Flink cluster. This saves a significant amount of time, since the startup and + * shutdown of the Flink clusters (including actor systems, etc) usually dominates + * the execution of the actual tests. + * + * To write a unit test against this test base, simply extend it and add + * one or more regular test methods and retrieve the StreamExecutionEnvironment from + * the context: + * + * + * {@literal @}Test + * public void someTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * */ public abstract class AbstractTestBase extends TestBaseUtils { - /** Configuration to start the testing cluster with. */ - protected final Configuration config; + protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class); - private final FiniteDuration timeout; + private static final int DEFAULT_PARALLELISM = 4; - protected int taskManagerNumSlots = 1; + protected static final Configuration MINICLUSTER_CONFIGURATION = new Configuration(); - protected int numTaskManagers = 1; + @ClassRule --- End diff -- `miniClusterResource` will be initialized before `MINICLUSTER_CONFIGURATION` can be modified by a `@BeforeClass` method, i.e., for some cases the configuration cannot be supplied in time. ---
[GitHub] flink pull request #4896: [FLINK-7909] Unify Flink test bases
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4896#discussion_r158254745 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java --- @@ -19,81 +19,61 @@ package org.apache.flink.test.util; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.FileUtils; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Objects; - -import scala.concurrent.duration.FiniteDuration; /** - * A base class for tests that run test programs in a Flink mini cluster. + * Base class for unit tests that run multiple tests and want to reuse the same + * Flink cluster. This saves a significant amount of time, since the startup and + * shutdown of the Flink clusters (including actor systems, etc) usually dominates + * the execution of the actual tests. + * + * To write a unit test against this test base, simply extend it and add + * one or more regular test methods and retrieve the StreamExecutionEnvironment from + * the context: + * + * + * {@literal @}Test + * public void someTest() { + * ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * */ public abstract class AbstractTestBase extends TestBaseUtils { - /** Configuration to start the testing cluster with. */ - protected final Configuration config; + protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class); - private final FiniteDuration timeout; + private static final int DEFAULT_PARALLELISM = 4; - protected int taskManagerNumSlots = 1; + protected static final Configuration MINICLUSTER_CONFIGURATION = new Configuration(); --- End diff -- I think it's dangerous to have mutable global state. For example if I have the following two tests: ``` public class TestTest extends AbstractTestBase { @BeforeClass public static void setUp() throws Exception { MINICLUSTER_CONFIGURATION.setString("foo", "bar"); } @Test public void name() throws Exception { System.out.println(MINICLUSTER_CONFIGURATION); } } ``` ``` public class TestTest2 extends AbstractTestBase { @Test public void name() throws Exception { System.out.println(MINICLUSTER_CONFIGURATION); } } ``` and run them both from IntelliJ, `{foo=bar}` is printed twice. `MINICLUSTER_CONFIGURATION` is never cleaned up. ---
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158081176 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference>() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map> accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS: + assertNextToken(p, JsonToken.START_OBJECT); + accumulatorResults = parse
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5194#discussion_r158078273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobExecutionResultDeserializer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * JSON deserializer for {@link JobExecutionResult}. + * + * @see JobExecutionResultSerializer + */ +public class JobExecutionResultDeserializer extends StdDeserializer { + + private static final long serialVersionUID = 1L; + + private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer(); + + private final SerializedThrowableDeserializer serializedThrowableDeserializer = + new SerializedThrowableDeserializer(); + + private final SerializedValueDeserializer serializedValueDeserializer; + + public JobExecutionResultDeserializer() { + super(JobExecutionResult.class); + final JavaType objectSerializedValueType = TypeFactory.defaultInstance() + .constructType(new TypeReference>() { + }); + serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType); + } + + @Override + public JobExecutionResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + JobID jobId = null; + long netRuntime = -1; + SerializedThrowable serializedThrowable = null; + Map> accumulatorResults = null; + + while (true) { + final JsonToken jsonToken = p.nextToken(); + assertNotEndOfInput(p, jsonToken); + if (jsonToken == JsonToken.END_OBJECT) { + break; + } + + final String fieldName = p.getValueAsString(); + switch (fieldName) { + case JobExecutionResultSerializer.FIELD_NAME_JOB_ID: + assertNextToken(p, JsonToken.VALUE_STRING); + jobId = jobIdDeserializer.deserialize(p, ctxt); + break; + case JobExecutionResultSerializer.FIELD_NAME_NET_RUNTIME: + assertNextToken(p, JsonToken.VALUE_NUMBER_INT); + netRuntime = p.getLongValue(); + break; + case JobExecutionResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS: + assertNextToken(p, JsonToken.START_OBJECT); + accumulatorResults = parseAccumulatorResults(
[GitHub] flink pull request #5194: [FLINK-8233][flip6] Add JobExecutionResultHandler
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5194 [FLINK-8233][flip6] Add JobExecutionResultHandler ## What is the purpose of the change *Allow retrieval of the JobExecutionResult cached in Dispatcher via HTTP. This will be needed so that accumulator results can be transmitted to the client.* This PR is based on #5184. ## Brief change log - *Add `JobExecutionResultHandler` to enable retrieval of `JobExecutionResult`.* - *Add serializer and deserializer for `JobExecutionResult`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new and changed classes.* - *Manually ran the WordCount example job and fetched the `JobExecutionResult` with `curl`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8233-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5194.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5194 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyao Date: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157963387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() { private volatile Throwable runnerException; - private volatile JobExecutionResult result; + private volatile org.apache.flink.runtime.jobmaster.JobExecutionResult result; BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { this.jobId = jobId; this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); } @Override - public void jobFinished(JobExecutionResult jobResult) { - this.result = jobResult; + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + this.result = result; jobMastersToWaitFor.countDown(); } @Override - public void jobFailed(Throwable cause) { - jobException = cause; + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + checkArgument(result.getSerializedThrowable().isPresent()); + + jobException = result --- End diff -- Actually it is not needed to store the exception separately because the JobExecutionResult already contains the exception. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157962431 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157961376 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { + + private final JobID jobId; + + private static final long serialVersionUID = 1L; --- End diff -- Fixed. ---
[GitHub] flink issue #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5168 New PR #5184 ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157961174 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { --- End diff -- I think it is not a *is-a* relationship. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157960760 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java --- @@ -55,6 +56,26 @@ public SerializedJobExecutionResult(JobID jobID, long netRuntime, this.accumulatorResults = accumulators; } + /** +* Creates an instance from {@link JobExecutionResult}. +*/ + public static SerializedJobExecutionResult from(final JobExecutionResult jobExecutionResult) { + final Map accumulatorResults = jobExecutionResult.getAllAccumulatorResults(); + + final Map> serializedAccumulatorResults = new HashMap<>(accumulatorResults.size()); + for (final Map.Entry entry : accumulatorResults.entrySet()) { + try { + serializedAccumulatorResults.put(entry.getKey(), new SerializedValue<>(entry.getValue())); + } catch (final IOException e) { + throw new RuntimeException(e); --- End diff -- Not needed anymore. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157960722 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture> getJobExecutionResult( --- End diff -- Done. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157960666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { + assertJobExecutionResultNotCached(result.getJobId()); --- End diff -- It's just being strict. Can remove if it is wrong. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157878862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) { } @Override - public void jobFinished(JobExecutionResult result) { + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { decrementCheckAndCleanup(); } @Override - public void jobFailed(Throwable cause) { + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { --- End diff -- Maybe rename to `JobResult` after all to avoid fqn. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877969 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { + + private final JobID jobId; + + private final Map> accumulatorResults; + + private final long netRuntime; + + private final SerializedThrowable serializedThrowable; + + private JobExecutionResult( + final JobID jobId, + final Map> accumulatorResults, + final long netRuntime, + @Nullable final SerializedThrowable serializedThrowable) { + + checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0"); + + this.jobId = requireNonNull(jobId); + this.accumulatorResults = requireNonNull(accumulatorResults); + this.netRuntime = netRuntime; + this.serializedThrowable = serializedThrowable; + } + + public boolean isSuccess() { --- End diff -- Javadocs are missing. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? CC: @tillrohrmann ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876761 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the --- End diff -- Javadoc needs to be updated. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876793 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the +* failure cause. +* +* @param jobId ID of the job that we are interested in. +* @param timeout Timeout for the asynchronous operation. +* +* @see #isJobExecutionResultPresent(JobID, Time) +* +* @return {@link CompletableFuture} containing the {@link JobExecutionResult} or a +* {@link Throwable} which represents the failure cause. If there is no result, the future will +* be completed exceptionally with +* {@link org.apache.flink.runtime.messages.JobExecutionResultNotFoundException} +*/ + default CompletableFuture getJobExecutionResult( + JobID jobId, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } + + /** +* Tests if the {@link SerializedJobExecutionResult} is present. --- End diff -- Javadoc needs to be updated. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5184 [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher ## What is the purpose of the change Cache `JobExecutionResult` in `Dispatcher`, and add methods to `RestfulGateway` to enable retrieval of results through HTTP (not yet implemented). This will be needed so that accumulator results can be transmitted to the client. ## Brief change log - *Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult.* - *Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally.* - *Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests to verify that the Dispatcher caches the job results when the job finishes successfully or by failure.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5184 commit d05c76e621106810c32bc17aa0576923ba6be401 Author: gyao Date: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL closed the pull request at: https://github.com/apache/flink/pull/5168 ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157489280 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture> getJobExecutionResult( + final JobID jobId, + final Time timeout) { + final Either jobExecutionResult = + jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture isJobExecutionResultPresent(final JobID jobId, final Time timeout) { + final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId); + if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } + return CompletableFuture.completedFuture(jobExecutionResultPresent); --- End diff -- But this would never return a future containing `false`. ---
[GitHub] flink pull request #4889: [FLINK-7903] [tests] Add flip6 build profile
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4889#discussion_r157481924 --- Diff: pom.xml --- @@ -125,6 +125,9 @@ under the License. 1.6.5 1.3 false + --- End diff -- nit: I don't understand this comment. Shouldn't it be something like `` ? ---
[GitHub] flink pull request #4889: [FLINK-7903] [tests] Add flip6 build profile
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4889#discussion_r157468208 --- Diff: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/category/Flip6.java --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.testutils.category; + +/** + * Category marker interface for Junit. + */ +public interface Flip6 { --- End diff -- nit: This is an interface, but `OldAndFlip6 ` is a class. nit: Maybe include *flip6* in the javadoc, e.g., *Category marker interface for Flip6 Junit tests.* ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157462859 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResult.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; +import org.apache.flink.util.SerializedValue; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * This class is used to represent the information in {@link JobExecutionResult} as JSON. In case + * of a job failure, no {@link JobExecutionResult} will be available. In this case instances of this + * class will only store a {@link Throwable}. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class JobExecutionResult { + + private static final String FIELD_NAME_JOB_ID = "id"; + + private static final String FIELD_NAME_NET_RUNTIME = "net-runtime"; + + private static final String FIELD_NAME_ACCUMULATOR_RESULTS = "accumulator-results"; + + private static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause"; + + @JsonSerialize(using = JobIDSerializer.class) + @JsonDeserialize(using = JobIDDeserializer.class) + @JsonProperty(value = FIELD_NAME_JOB_ID, required = true) + private final JobID jobId; + + @JsonProperty(FIELD_NAME_NET_RUNTIME) + private final Long netRuntime; + + @JsonProperty(FIELD_NAME_ACCUMULATOR_RESULTS) + private final Map> accumulatorResults; + + @JsonProperty(FIELD_NAME_FAILURE_CAUSE) + private final Throwable throwable; --- End diff -- Maybe only include the errorMessage to avoid ser/des issues. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027881 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { + assertJobExecutionResultNotCached(result.getJobId()); + jobExecutionResultCache.put(result.getJobId(), Either.Right(result)); + } + + public void put(final JobID jobId, Throwable throwable) { + assertJobExecutionResultNotCached(jobId); + jobExecutionResultCache.put(jobId, Either.Left(throwable)); + } + + public boolean contains(final JobID jobId) { + return jobExecutionResultCache.getIfPresent(jobId) != null; + } + + @Nullable + public Either get(final JobID jobId) { --- End diff -- Not sure if I am abusing Flink's `Either` here. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027633 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { + + private final JobID jobId; + + private static final long serialVersionUID = 1L; --- End diff -- Should be on top. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { --- End diff -- Javadocs are missing. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027178 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields are not always represented in the JSON. The `RestClient` would otherwise run into problems. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026590 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResult; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for a given {@link JobID}. + */ +public class JobExecutionResultHandler --- End diff -- Sample response after running batch WordCount example: ``` { "status": { "id": "CREATED" }, "job-execution-result": { "id": "533a165a6de7f70919a54b1d6f36d3b3", "net-runtime": 0, "accumulator-results": { "94a58184eb17398571f35da42b714517": "rO0ABXNyABNqYXZhLnV0aWwuQXJyYXlMaXN0eIHSHZnHYZ0DAAFJAARzaXpleHCqdwQAAACqdXIAAltCrPMX+AYIVOACAAB4cAYCYQV1cQB+AAILB2FjdGlvbgF1cQB+AAIKBmFmdGVyAXVxAH4AAgwIYWdhaW5zdAF1cQB+AAIIBGFsbAJ1cQB+AAIIBGFuZAx1cQB+AAIJBWFybXMBdXEAfgACCwdhcnJvd3MBdXEAfgACCQVhd3J5AXVxAH4AAgcDYXkBdXEAfgACCQViYXJlAXVxAH4AAgcDYmUEdXEAfgACCQViZWFyA3VxAH4AAgsHYm9ka2luAXVxAH4AAgoGYm91cm4BdXEAfgACCARidXQBdXEAfgACBwNieQJ1cQB+AAINCWNhbGFtaXR5AXVxAH4AAgkFY2FzdAF1cQB+AAIJBWNvaWwBdXEAfgACCQVjb21lAXVxAH4AAg8LY29uc2NpZW5jZQF1cQB+AAIRDWNvbnN1bW1hdGlvbgF1cQB+AAIOCmNvbnR1bWVseQF1cQB+AAIMCGNvdW50cnkBdXEAfgACDAhjb3dhcmRzAXVxAH4AAg0JY3VycmVudHMBdXEAfgACBgJkBHVxAH4AAgoGZGVhdGgCdXEAfgACCgZkZWxheQF1cQB+AAILB2Rlc3BpcwF1cQB+AAINCWRldm91dGx5AAA AAXVxAH4AAggEZGllAnVxAH4AAgkFZG9lcwF1cQB+AAIKBmRyZWFkAXVxAH4AAgoGZHJlYW0BdXEAfgACCwdkcmVhbXMBdXEAfgACCARlbmQCdXEAfgACEAxlbnRlcnByaXNlcwF1cQB+AAIHA2VyAXVxAH4AAgkFZmFpcgF1cQB+AAIMCGZhcmRlbHMBdXEAfgACCgZmbGVzaAF1cQB+AAIIBGZseQF1cQB+AAIIBGZvcgJ1cQB+AAIMCGZvcnR1bmUBdXEAfgACCQVmcm9tAXVxAH4AAgkFZ2l2ZQF1cQB+AAIKBmdyZWF0AXVxAH4AAgoGZ3J1bnQBdXEAfgACCQVoYXZlAnVxAH4AAgcDaGUBdXEAfgACDgpoZWFydGFjaGUBdXEAfgACCQVoZWlyAXVxAH4AAgwIaGltc2VsZgF1cQB+AAIIBGhpcwF1cQB+AAIIBGh1ZQF1cQB+AAIJBWlsbHMBdXEAfgACBwNpbgN1cQB+AAIOCmluc29sZW5jZQF1cQB+AAIHA2lzA3VxAH4AAgkF
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); + DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY); objectMapper.disable( SerializationFeature.FAIL_ON_EMPTY_BEANS); + + final SimpleModule jacksonFlinkModule = new SimpleModule(); + + final JavaType serializedValueWildcardType = objectMapper + .getTypeFactory() + .constructType(new TypeReference>() { + }); + + jacksonFlinkModule.addSerializer(new SerializedValueSerializer(serializedValueWildcardType)); --- End diff -- Could also be done using `@JsonSerialization` annotation ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157025791 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> --- End diff -- Cache isn't size limited. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5168 [FLINK-8234][flip6] WIP WIP @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5168 commit cc969846791bf818fbc81feb241a188410431ae5 Author: gyao Date: 2017-12-14T16:27:16Z [FLINK-8234][flip6] WIP ---
[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4987#discussion_r155804733 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -757,6 +775,64 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { return CompletableFuture.completedFuture(executionGraph.getState()); } + //-- + // RestfulGateway RPC methods + //-- + + @Override + public CompletableFuture requestRestAddress(Time timeout) { + return restAddressFuture; + } + + @Override + public CompletableFuture requestJob(JobID jobId, Time timeout) { + if (Objects.equals(jobGraph.getJobID(), jobId)) { --- End diff -- When I see `Objects.equals`, I am assuming that it's possible that both arguments can be null. However, `jobGraph.getJobID()` is always non-null. ---
[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4987#discussion_r155802027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java --- @@ -27,8 +27,6 @@ /** * Interface for a metric registry. - - LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath); --- End diff -- beautiful ---
[GitHub] flink pull request #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4988#discussion_r155797999 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java --- @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler; +import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler; +import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; +import org.apache.flink.runtime.rest.handler.job.JobIdsHandler; +import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; +import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler; +import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; +import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobConfigHeaders; +import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; +import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders; +import org.apache.flink.runtime.rest.messages.JobPlanHeaders; +import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.SubtasksTimes
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155770364 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for the {@link SlotSharingManager}. + */ +public class SlotSharingManagerTest extends TestLogger { + + private static final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); + + private static final DummySlotOwner slotOwner = new DummySlotOwner(); + + private static final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions(); --- End diff -- This instance is mutable... should not be `static` ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155770104 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for the {@link SlotSharingManager}. + */ +public class SlotSharingManagerTest extends TestLogger { + + private static final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); --- End diff -- Should be `SLOT_SHARING_GROUP_ID` since it is a constant. ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155768880 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final Map allTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final Map> resolvedRootSlots; + +
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155758219 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. --- End diff -- nit: *leaf nodes* ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155754738 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return slotAndLocalityFuture.thenApply( + (SlotAndLocality slotAndLocality) -> { + final AllocatedSlot allocatedSlot = slotAndLocality.g
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155751694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return slotAndLocalityFuture.thenApply( + (SlotAndLocality slotAndLocality) -> { + final AllocatedSlot allocatedSlot = slotAndLocality.g
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155604499 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; --- End diff -- The variable name is confusing. `multiTaskSlotFuture` is not of type `Future`. ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155605251 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java --- @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * A logical slot represents a resource on a TaskManager into + * which a single task can be deployed. + */ +public interface LogicalSlot { + +Payload TERMINATED_PAYLOAD = new Payload() { + + private final CompletableFuture COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null); --- End diff -- nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not actually a constant (not static). ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155590317 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( --- End diff -- nit: variable name should be *leaf* https://www.dict.cc/?s=leaf ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155549755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java --- @@ -32,6 +34,20 @@ */ public interface LogicalSlot { +Payload TERMINATED_PAYLOAD = new Payload() { + + private final CompletableFuture COMPLETED_TERMINATION_FUTURE = CompletableFuture.completedFuture(null); --- End diff -- nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not actually a constant (not static). ---
[GitHub] flink issue #5086: [FLINK-8078] Introduce LogicalSlot interface
Github user GJL commented on the issue: https://github.com/apache/flink/pull/5086 LGTM ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155519870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final Map allTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final Map> resolvedRootSlots; + +
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155528224 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final Map allTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final Map> resolvedRootSlots; + +
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155520946 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released --- End diff -- nit: All fields are commented with non-javadoc comments. Normally comments on fields are also done in Javadoc style, e.g., `SlotPool`. Javadoc comments on fields are displayed by IntelliJ (`Ctrl + J`). ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155507607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; --- End diff -- nit: wrong import order (not sorted lexicographically) ``` import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; ``` items should appear before `LogicalSlot` ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155507294 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final Map allTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final Map> resolvedRootSlots; + +
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155503866 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class SchedulerTest extends TestLogger { + + @Test + public void testAddAndRemoveInstance() { + try { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance i1 = getRandomInstance(2); + Instance i2 = getRandomInstance(2); + Instance i3 = getRandomInstance(2); + + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i3); + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + + // cannot add available instance again + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance twice"); + } + catch (IllegalArgumentException e) { + // bueno! + } + + // some instances die + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + + // try to add a dead instance + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead instance"); + } + catch (IllegalArgumentException e) { + // stimmt --- End diff -- 😃 ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155503994 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class SchedulerTest extends TestLogger { + + @Test + public void testAddAndRemoveInstance() { + try { + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + + Instance i1 = getRandomInstance(2); + Instance i2 = getRandomInstance(2); + Instance i3 = getRandomInstance(2); + + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + scheduler.newInstanceAvailable(i3); + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + + // cannot add available instance again + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted instance twice"); + } + catch (IllegalArgumentException e) { + // bueno! + } + + // some instances die + assertEquals(3, scheduler.getNumberOfAvailableInstances()); + assertEquals(6, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i2); + assertEquals(2, scheduler.getNumberOfAvailableInstances()); + assertEquals(4, scheduler.getNumberOfAvailableSlots()); + + // try to add a dead instance + try { + scheduler.newInstanceAvailable(i2); + fail("Scheduler accepted dead instance"); + } + catch (IllegalArgumentException e) { + // stimmt + + } + + scheduler.instanceDied(i1); + assertEquals(1, scheduler.getNumberOfAvailableInstances()); + assertEquals(2, scheduler.getNumberOfAvailableSlots()); + scheduler.instanceDied(i3); + assertEquals(0, scheduler.getNumberOfAvailableInstances()); + assertEquals(0, scheduler.getNumberOfAvailableSlots()); + + assertFalse(i1.isAlive()); + assertFalse(i2.isAlive()); + assertFalse(i3.isAlive()); + } + catch (Exception e) { --- End diff -- Better propagate the exception but I guess this file was copy pasted. ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155502971 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Simple {@link AllocatedSlotActions} implementations for testing purposes. + */ +public class TestingAllocatedSlotActions implements AllocatedSlotActions { + + private volatile Consumer> releaseSlotConsumer; + + public void setReleaseSlotConsumer(Consumer> releaseSlotConsumer) { + this.releaseSlotConsumer = Preconditions.checkNotNull(releaseSlotConsumer); + } + + @Override + public CompletableFuture releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable cause) { + Consumer> currentReleaseSlotConsumer = this.releaseSlotConsumer; + + if (currentReleaseSlotConsumer != null) { + currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, cause)); --- End diff -- nit: whitespace after `cause` ``` ... cause )); ``` ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r155496482 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot +* requests. +* +* See FLINK-8089 +*/ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfil the second slot request --- End diff -- nit: same here ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5090#discussion_r155495322 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java --- @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** +* Tests that unused offered slots are directly used to fulfil pending slot --- End diff -- nit: *fulfill* instead of *fulfil* ---
[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5088#discussion_r155296898 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java --- @@ -144,6 +144,78 @@ public TaskManagerGateway getTaskManagerGateway() { return taskManagerGateway; } + /** +* Triggers the release of the logical slot. +*/ + public void triggerLogicalSlotRelease() { + final LogicalSlot logicalSlot = logicalSlotReference.get(); + + if (logicalSlot != null) { + logicalSlot.releaseSlot(); + } + } + + /** +* Releases the logical slot. +* +* @return true if the logical slot could be released, false otherwise. +*/ + public boolean releaseLogicalSlot() { + final LogicalSlot logicalSlot = logicalSlotReference.get(); + + if (logicalSlot != null) { + if (logicalSlot instanceof Slot) { + final Slot slot = (Slot) logicalSlot; + if (slot.markReleased()) { + logicalSlotReference.set(null); + return true; + } + } else { + throw new RuntimeException("Unsupported logical slot type encounterd " + logicalSlot.getClass()); --- End diff -- Typo: * encounterd* ---
[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5088#discussion_r155279729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java --- @@ -312,24 +312,36 @@ public void returnAllocatedSlot(Slot slot) { // (1) do we have a slot available already? SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences); if (slotFromPool != null) { - SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality()); --- End diff -- Method `createSimpleSlot` is no longer in use. ---
[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5088#discussion_r155276432 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +/** + * Interface for the context of a logical {@link Slot}. This context contains information + * about the underlying allocated slot and how to communicate with the TaskManager on which + * it was allocated. + */ +public interface SlotContext { + + /** +* Gets the ID under which the slot is allocated, which uniquely identifies the slot. +* +* @return The ID under which the slot is allocated +*/ + AllocationID getAllocationId(); + + /** +* Gets the location info of the TaskManager that offers this slot. +* +* @return The location info of the TaskManager that offers this slot +*/ + TaskManagerLocation getTaskManagerLocation(); + + /** +* Gets the number of the slot. +* +* @return The number of the slot on the TaskManager. +*/ + int getPhysicalSlotNumber(); + + /** +* Gets the actor gateway that can be used to send messages to the TaskManager. +* +* This method should be removed once the new interface-based RPC abstraction is in place +* +* @return The actor gateway that can be used to send messages to the TaskManager. --- End diff -- The fact that `TaskManagerGateway` can be an *actor gateway* is not something that is relevant for the Javadoc. ---
[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5088#discussion_r155258087 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotsTestImplContext.java --- @@ -31,7 +30,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class AllocatedSlotsTest { +public class SlotsTestImplContext { --- End diff -- Why was this renamed? ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155245759 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -507,6 +514,41 @@ public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } + //-- + // SubmittedJobGraphListener + //-- + + @Override + public void onAddedJobGraph(final JobID jobId) { + getRpcService().execute(() -> { + final SubmittedJobGraph submittedJobGraph; + try { + submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + } catch (final Exception e) { + log.error("Could not recover job graph for job {}.", jobId, e); + return; + } + runAsync(() -> { + if (!jobManagerRunners.containsKey(jobId)) { --- End diff -- Removed. ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155245771 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -507,6 +514,41 @@ public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } + //-- + // SubmittedJobGraphListener + //-- + + @Override + public void onAddedJobGraph(final JobID jobId) { + getRpcService().execute(() -> { + final SubmittedJobGraph submittedJobGraph; + try { + submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + } catch (final Exception e) { + log.error("Could not recover job graph for job {}.", jobId, e); + return; + } + runAsync(() -> { + if (!jobManagerRunners.containsKey(jobId)) { + submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT); + } + }); + }); + } + + @Override + public void onRemovedJobGraph(final JobID jobId) { + runAsync(() -> { + if (jobManagerRunners.containsKey(jobId)) { --- End diff -- Removed. ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155242826 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -86,122 +125,143 @@ public static void teardown() { } } - /** -* Tests that we can submit a job to the Dispatcher which then spawns a -* new JobManagerRunner. -*/ - @Test - public void testJobSubmission() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); - TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService); - haServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore()); + final JobVertex testVertex = new JobVertex("testVertex"); + testVertex.setInvokableClass(NoOpInvokable.class); + jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex); + jobGraph.setAllowQueuedScheduling(true); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1L); - JobManagerRunner jobManagerRunner = mock(JobManagerRunner.class); + fatalErrorHandler = new TestingFatalErrorHandler(); + final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1L); + submittedJobGraphStore = spy(new InMemorySubmittedJobGraphStore()); - final JobGraph jobGraph = mock(JobGraph.class); - final JobID jobId = new JobID(); - when(jobGraph.getJobID()).thenReturn(jobId); + dispatcherLeaderElectionService = new TestingLeaderElectionService(); + jobMasterLeaderElectionService = new TestingLeaderElectionService(); - final TestingDispatcher dispatcher = new TestingDispatcher( + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + haServices.setDispatcherLeaderElectionService(dispatcherLeaderElectionService); + haServices.setSubmittedJobGraphStore(submittedJobGraphStore); + haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + haServices.setResourceManagerLeaderRetriever(new TestingLeaderRetrievalService()); + runningJobsRegistry = haServices.getRunningJobsRegistry(); + + final Configuration blobServerConfig = new Configuration(); + blobServerConfig.setString( + BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + + dispatcher = new TestingDispatcher( rpcService, Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), new Configuration(), haServices, mock(ResourceManagerGateway.class), - mock(BlobServer.class), + new BlobServer(blobServerConfig, new VoidBlobStore()), heartbeatServices, - mock(MetricRegistryImpl.class), + new NoOpMetricRegistry(), fatalErrorHandler, - jobManagerRunner, - jobId); + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** +* Tests that we can submit a job to the Dispatcher which then spawns a +* new JobManagerRunner. +*/ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - Dispa
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155242900 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -86,122 +125,143 @@ public static void teardown() { } } - /** -* Tests that we can submit a job to the Dispatcher which then spawns a -* new JobManagerRunner. -*/ - @Test - public void testJobSubmission() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); --- End diff -- Nice catch. ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155242108 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes. + */ +public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore { + + private final Map storedJobs = new HashMap<>(); + + private boolean started; + + @Override + public synchronized void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception { --- End diff -- Not sure if it is actually applicable. `onRemovedJobGraph` explicitly demands that the graph is removed by a different `SubmittedJobGraphStore` instance: ``` /** * Callback for {@link SubmittedJobGraph} instances removed by a different {@link * SubmittedJobGraphStore} instance. * * @param jobId The {@link JobID} of the removed job graph */ void onRemovedJobGraph(JobID jobId); ``` ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155240139 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * In-Memory implementation of {@link SubmittedJobGraphStore} for testing purposes. + */ +public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore { + + private final Map storedJobs = new HashMap<>(); + + private boolean started; + + @Override + public synchronized void start(@Nullable SubmittedJobGraphListener jobGraphListener) throws Exception { --- End diff -- Good point. ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155227397 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java --- @@ -208,27 +209,61 @@ public void setLocality(Locality locality) { // @Override - public void releaseSlot() { + public void releaseInstanceSlot() { + releaseSlot(); + } + + @Override + public CompletableFuture releaseSlot() { if (!isCanceled()) { + final CompletableFuture terminationFuture; - // kill all tasks currently running in this slot - Execution exec = this.executedTask; - if (exec != null && !exec.isFinished()) { - exec.fail(new Exception("TaskManager was lost/killed: " + getTaskManagerLocation())); - } + if (payload != null) { + // trigger the failure of the slot payload + payload.fail(new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation())); - // release directly (if we are directly allocated), - // otherwise release through the parent shared slot - if (getParent() == null) { - // we have to give back the slot to the owning instance - if (markCancelled()) { - getOwner().returnAllocatedSlot(this); - } + // wait for the termination of the payload before releasing the slot + terminationFuture = payload.getTerminalStateFuture(); } else { - // we have to ask our parent to dispose us - getParent().releaseChild(this); + terminationFuture = CompletableFuture.completedFuture(null); } + + terminationFuture.whenComplete( + (Object ignored, Throwable throwable) -> { + // release directly (if we are directly allocated), + // otherwise release through the parent shared slot + if (getParent() == null) { + // we have to give back the slot to the owning instance + if (markCancelled()) { --- End diff -- If `markCancelled` returns `false`, `releaseFuture` will never be completed. Is that intended? ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155225733 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; + +/** + * A logical slot represents a resource on a TaskManager into + * which a single task can be deployed. + */ +public interface LogicalSlot { + + /** +* Return the TaskManager location of this slot +* +* @return TaskManager location of this slot +*/ + TaskManagerLocation getTaskManagerLocation(); + + /** +* Return the TaskManager gateway to talk to the TaskManager. +* +* @return TaskManager gateway to talk to the TaskManager +*/ + TaskManagerGateway getTaskManagerGateway(); + + /** +* True if the slot is still alive. +* +* @return True if the slot is still alive, otherwise false +*/ + boolean isAlive(); + + /** +* Tries to assign a payload to this slot. This can only happens +* exactly once. +* +* @param payload to be assigned to this slot. +* @return true if the payload could be set, otherwise false +*/ + boolean tryAssignPayload(Payload payload); + + /** +* Returns the set payload or null if none. +* +* @return Payload of this slot of null if none +*/ + @Nullable + Payload getPayload(); + + /** +* Releases this slot. +* +* @return Future which is completed once the slot has been released, +* in case of a failure it is completed exceptionally +*/ + CompletableFuture releaseSlot(); + + /** +* Gets the slot number on the TaskManager. +* +* @return slot number +*/ + int getPhysicalSlotNumber(); + + /** +* Gets the allocation id of this slot. +* +* @return allocation id of this slot +*/ + AllocationID getAllocationId(); + + /** +* Payload for a logical slot. +*/ + interface Payload { + + /** +* Fail the payload with the given cause. +* +* @param cause of the failure +*/ + void fail(Throwable cause); --- End diff -- `Execution#fail(Throwable)` is not annotated with `@Override`. ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155208534 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java --- @@ -37,16 +41,18 @@ * If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot. * If not, then the parent attribute is null. */ -public class SimpleSlot extends Slot { +public class SimpleSlot extends Slot implements LogicalSlot { /** The updater used to atomically swap in the execution */ --- End diff -- Will the payload always be the `execution`? ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155207330 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -965,11 +965,20 @@ public void cancel() { // we build a future that is complete once all vertices have reached a terminal state final ConjunctFuture allTerminal = FutureUtils.waitForAll(futures); - allTerminal.thenAccept( - (Void value) -> { - // cancellations may currently be overridden by failures which trigger - // restarts, so we need to pass a proper restart global version here - allVerticesInTerminalState(globalVersionForRestart); + allTerminal.whenCompleteAsync( --- End diff -- Does it have to run asynchronously? If yes, does it make sense to specify a thread pool? Now it can run on `ForkJoinPool.commonPool()`. ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155018207 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java --- @@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() throws ExecutionException, Int assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation3)); } + /** +* Checks that the {@link Execution} termination future is only completed after the +* assigned slot has been released. +* +* NOTE: This test only fails spuriously without the fix of this commit. Thus, one has +* to execute this test multiple times to see the failure. +*/ + @Test + public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception { + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); + + final SimpleSlot slot = new SimpleSlot( + new JobID(), + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway()); + + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + assertTrue(executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY)); + + Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); + + CompletableFuture returnedSlotFuture = slotOwner.getReturnedSlotFuture(); + CompletableFuture terminationFuture = executionVertex.cancel(); + + // run canceling in a separate thread to allow an interleaving between termination + // future callback registrations + CompletableFuture.runAsync( + () -> currentExecutionAttempt.cancelingComplete(), + TestingUtils.defaultExecutor()); --- End diff -- If an executor is created, where is it shutdown afterwards? I think this should be just fine: ``` new Thread() { @Override public void run() { currentExecutionAttempt.cancelingComplete(); } }.start(); ``` ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155016510 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java --- @@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() throws ExecutionException, Int assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation3)); } + /** +* Checks that the {@link Execution} termination future is only completed after the +* assigned slot has been released. +* +* NOTE: This test only fails spuriously without the fix of this commit. Thus, one has +* to execute this test multiple times to see the failure. +*/ + @Test + public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception { + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); + + final SimpleSlot slot = new SimpleSlot( + new JobID(), + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway()); + + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + assertTrue(executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY)); + + Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); + + CompletableFuture returnedSlotFuture = slotOwner.getReturnedSlotFuture(); + CompletableFuture terminationFuture = executionVertex.cancel(); + + // run canceling in a separate thread to allow an interleaving between termination + // future callback registrations + CompletableFuture.runAsync( + () -> currentExecutionAttempt.cancelingComplete(), + TestingUtils.defaultExecutor()); + + // to increase probability for problematic interleaving, let the current thread yield the processor + Thread.yield(); + + CompletableFuture restartFuture = terminationFuture.thenApply( + ignored -> { + try { + assertTrue(returnedSlotFuture.isDone()); + } catch (Exception e) { + throw new CompletionException(e); --- End diff -- `isDone` does not throw an exception. `assertTrue` throws an `Error`, which is not an `Exception`. Is it possible to get into this code path? ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155015331 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java --- @@ -294,11 +297,11 @@ public void testScheduleWithDyingInstances() { i2.markDead(); - for (SimpleSlot slot : slots) { - if (slot.getOwner() == i2) { - assertTrue(slot.isCanceled()); + for (LogicalSlot slot : slots) { + if (Objects.equals(slot.getTaskManagerLocation().getResourceID(), i2.getTaskManagerID())) { --- End diff -- I think `slot.getTaskManagerLocation().getResourceID()` cannot return null. Is there a need to use `Objects.equals`? ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155012983 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -331,8 +334,19 @@ public void setInitialState(TaskStateSnapshot checkpointStateHandles) { * * @return A future for the execution's termination */ - public CompletableFuture getTerminationFuture() { - return terminationFuture; + @Override + public CompletableFuture getTerminalStateFuture() { + return terminalStateFuture; + } + + /** +* Gets the release future which is completed once the execution reaches a terminal +* state and the assigned resource has been released. +* +* @return --- End diff -- `@return` tag can be removed. ---
[GitHub] flink pull request #5087: [FLINK-8085] Thin out LogicalSlot interface
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r154954054 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java --- @@ -37,16 +41,18 @@ * If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot. * If not, then the parent attribute is null. */ -public class SimpleSlot extends Slot { +public class SimpleSlot extends Slot implements LogicalSlot { /** The updater used to atomically swap in the execution */ --- End diff -- nit: _to swap in the payload_ ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154748301 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -244,7 +302,32 @@ protected JobManagerRunner createJobManagerRunner( FatalErrorHandler fatalErrorHandler) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); - return jobManagerRunner; + return new JobManagerRunner(resourceId, jobGraph, configuration, rpcService, + highAvailabilityServices, heartbeatServices, jobManagerServices, metricRegistry, + onCompleteActions, fatalErrorHandler); + } + + @Override + public CompletableFuture submitJob(final JobGraph jobGraph, final Time timeout) { + final CompletableFuture submitJobFuture = super.submitJob(jobGraph, timeout); + + try { + submitJobFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + submitJobLatch.countDown(); + return submitJobFuture; + } + + @Override + void recoverJobs() { + if (recoverJobsEnabled.get()) { --- End diff -- Without this I do not see how I can verify whether a job was submitted regularly or via `recoverJobs`. ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154747386 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -534,6 +536,40 @@ public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } + //-- + // SubmittedJobGraphListener + //-- + + @Override + public void onAddedJobGraph(final JobID jobId) { + runAsync(() -> { + final SubmittedJobGraph submittedJobGraph; + try { + submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + } catch (final Exception e) { + log.error("Could not submit job {}.", jobId, e); --- End diff -- Changed it. ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
GitHub user GJL reopened a pull request: https://github.com/apache/flink/pull/5107 [FLINK-8176][flip6] Start SubmittedJobGraphStore in Dispatcher ## What is the purpose of the change The FLIP-6 dispatcher never calls `start()` on its SubmittedJobGraphStore instance. Hence, when a Job is submitted (YARN session mode with HA enabled), an IllegalStateException is thrown. This pull request adds the necessary changes so that jobs can be submitted. ## Brief change log - *Implement SubmittedJobGraphListener interface in Dispatcher* ## Verifying this change - *Added unit tests for new methods in Dispatcher class* - *Verified that jobs can be submitted in FLIP-6 YARN session mode with HA. Did not verify anything else.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8176 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5107.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5107 commit d238ef0c23eea585974929eafdff33af916d19ba Author: gyao Date: 2017-11-30T14:37:30Z [hotfix][tests] Extract SubmittedJobGraphStore implementation from JobManagerHARecoveryTest commit 33b9d2848c767088f43fed2d03e6402695827221 Author: gyao Date: 2017-11-30T14:44:23Z [FLINK-8176][flip6] Implement SubmittedJobGraphListener interface in Dispatcher Call start() on SubmittedJobGraphStore with Dispatcher as listener. To enable this, the dispatcher must implement the SubmittedJobGraphListener interface. Add simple unit tests for the new methods. Refactor DispatcherTest to remove redundancy. commit 88359172e23413aa195177993551613349056b68 Author: gyao Date: 2017-12-04T18:57:29Z [FLINK-8176][flip6] Make InMemorySubmittedJobGraphStore thread-safe commit 9cdb29604e9915c7d6ea60ed6fcee06c9bad57b9 Author: gyao Date: 2017-12-04T18:58:26Z [hotfix][Javadoc] Make first sentence in JobSubmissionException Javadoc end with period commit 53ad1771e8bec063157d69c0f7a187ccb5fb340e Author: gyao Date: 2017-12-04T19:04:52Z [FLINK-8176][flip6] Add method isStarted() to TestingLeaderElectionService commit 0c030fb19d7b5b9dba4df5811a69086906e20ca0 Author: gyao Date: 2017-12-04T19:05:47Z [FLINK-8176][flip6] Return same RunningJobsRegistry instance from TestingHighAvailabilityServices commit 7a04cbe54bcf380684c4e79a4f999b31b650570e Author: gyao Date: 2017-12-04T19:09:36Z [FLINK-8176][flip6] Fix race conditions in Dispatcher and DispatcherTest Check if jobManagerRunner exists before submitting job. Replace JobManagerRunner mock used in tests with real instance. Do not run job graph recovery in actor main thread when job graph is recovered from SubmittedJobGraphListener#onAddedJobGraph(JobID). ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL closed the pull request at: https://github.com/apache/flink/pull/5107 ---
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154320952 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception { heartbeatServices, mock(MetricRegistryImpl.class), fatalErrorHandler, - jobManagerRunner, - jobId); + mockJobManagerRunner, + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, timeout); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** +* Tests that we can submit a job to the Dispatcher which then spawns a +* new JobManagerRunner. +*/ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + // wait for the leader to be elected + leaderFuture.get(); - CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - acknowledgeFuture.get(); + CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); - verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); + acknowledgeFuture.get(); - // check that no error has occurred - fatalErrorHandler.rethrowError(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + verify(mockJobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); } /** * Tests that the dispatcher takes part in the leader election. */ @Test public void testLeaderElection() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - UUID expectedLeaderSessionId = UUID.randomUUID(); - CompletableFuture leaderSessionIdFuture = new CompletableFuture<>(); - SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class); - TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { - @Override - public void confirmLeaderSessionID(UUID leaderSessionId) { - super.confirmLeaderSessionID(leaderSessionId); - leaderSessionIdFuture.complete(leaderSessionId); - } - }; - - haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore); - haServices.setDispatcherLeaderElectionService(testingLeaderElectionService); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final JobID jobId = new JobID(); - - final TestingDispatcher dispatcher = new TestingDispatcher( - rpcService, - Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), - haServices, - mock(ResourceManagerGateway.class), - mock(BlobServer.class), - heartbeatServices, - mock(MetricRegistryImpl.class), - fatalErrorHandler, - mock(JobManagerRunner.class), - jobId); - try { - dispatcher.start(); + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - assertFalse(leaderSessionIdFuture.isDone()); + dispatcherLeaderElectionService.isLeader(expecte
[GitHub] flink pull request #5107: [FLINK-8176][flip6] Start SubmittedJobGraphStore i...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154320903 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception { heartbeatServices, mock(MetricRegistryImpl.class), fatalErrorHandler, - jobManagerRunner, - jobId); + mockJobManagerRunner, + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, timeout); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** +* Tests that we can submit a job to the Dispatcher which then spawns a +* new JobManagerRunner. +*/ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + // wait for the leader to be elected + leaderFuture.get(); - CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - acknowledgeFuture.get(); + CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); - verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); + acknowledgeFuture.get(); - // check that no error has occurred - fatalErrorHandler.rethrowError(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + verify(mockJobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); } /** * Tests that the dispatcher takes part in the leader election. */ @Test public void testLeaderElection() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - UUID expectedLeaderSessionId = UUID.randomUUID(); - CompletableFuture leaderSessionIdFuture = new CompletableFuture<>(); - SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class); - TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { - @Override - public void confirmLeaderSessionID(UUID leaderSessionId) { - super.confirmLeaderSessionID(leaderSessionId); - leaderSessionIdFuture.complete(leaderSessionId); - } - }; - - haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore); - haServices.setDispatcherLeaderElectionService(testingLeaderElectionService); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final JobID jobId = new JobID(); - - final TestingDispatcher dispatcher = new TestingDispatcher( - rpcService, - Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), - haServices, - mock(ResourceManagerGateway.class), - mock(BlobServer.class), - heartbeatServices, - mock(MetricRegistryImpl.class), - fatalErrorHandler, - mock(JobManagerRunner.class), - jobId); - try { - dispatcher.start(); + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - assertFalse(leaderSessionIdFuture.isDone()); + dispatcherLeaderElectionService.isLeader(expecte