Repository: flink Updated Branches: refs/heads/release-1.5 3f1d43d5d -> 44506326f
http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 4a2219a..302fe3e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.testutils.category.New; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit; /** * Tests the availability of accumulator results during runtime. */ -@Category(Flip6.class) +@Category(New.class) public class AccumulatorLiveITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class); http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index dd1c398..074b721 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -81,7 +81,7 @@ public class AutoParallelismITCase extends TestLogger { assertEquals(PARALLELISM, resultCollection.size()); } catch (Exception ex) { - if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.OLD)) { + if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.LEGACY)) { throw ex; } assertTrue( http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java index aeff578..a3a551c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java @@ -78,7 +78,7 @@ public class RemoteEnvironmentITCase extends TestLogger { public static void setupCluster() throws Exception { configuration = new Configuration(); - if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) { + if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) { configuration.setInteger(WebOptions.PORT, 0); final MiniCluster miniCluster = new MiniCluster( new MiniClusterConfiguration.Builder() @@ -115,7 +115,7 @@ public class RemoteEnvironmentITCase extends TestLogger { */ @Test(expected = FlinkException.class) public void testInvalidAkkaConfiguration() throws Throwable { - assumeTrue(CoreOptions.OLD_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE))); + assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE))); Configuration config = new Configuration(); config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 2b97de8..d217a2a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -157,7 +157,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { */ public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception { Configuration config = new Configuration(); - config.setString(CoreOptions.MODE, CoreOptions.OLD_MODE); + config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index f76375d..b85a410 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -149,7 +149,7 @@ public class ProcessFailureCancelingITCase extends TestLogger { final Throwable[] errorRef = new Throwable[1]; final Configuration configuration = new Configuration(); - configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE); + configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); // start the test program, which infinitely blocks Runnable programRunner = new Runnable() { http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java index 69fe7d6..7dc6f0c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java @@ -68,7 +68,7 @@ public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskMa public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception { final Configuration configuration = new Configuration(); - configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE); + configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration); env.setParallelism(PARALLELISM); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000)); http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java index 1ecbff3..766a799 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java @@ -67,7 +67,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa final File tempCheckpointDir = tempFolder.newFolder(); final Configuration configuration = new Configuration(); - configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE); + configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", jobManagerPort, http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 72f700a..b5c2aaf 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -144,8 +144,8 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { } catch (Exception e) { String exceptionString = ExceptionUtils.stringifyException(e); if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy - || exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*") // flip6 - || exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*"))) { // flip6 + || exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*") // new + || exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*"))) { // new throw e; } } http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java index 4d2aaa0..37b8d41 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java @@ -36,7 +36,7 @@ import java.util.List; * flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set of files which * are shipped to the yarn cluster. This is necessary to load the testing classes. */ -public class TestingYarnClusterDescriptor extends YarnClusterDescriptor { +public class TestingYarnClusterDescriptor extends LegacyYarnClusterDescriptor { public TestingYarnClusterDescriptor( Configuration configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index f9c03f9..18bcfeb 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -106,7 +106,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { */ @Test public void testMultipleAMKill() throws Exception { - assumeTrue("This test only works with the old actor based code.", !flip6); + assumeTrue("This test only works with the old actor based code.", !isNewMode); final int numberKillingAttempts = numberApplicationAttempts - 1; String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); final Configuration configuration = GlobalConfiguration.loadConfiguration(); http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index ef6706a..758a098 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -57,15 +57,15 @@ public class YARNITCase extends YarnTestBase { configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); final YarnClient yarnClient = getYarnClient(); - try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor( + try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( configuration, getYarnConfiguration(), System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR), yarnClient, true)) { - flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); + yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); + yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(768) @@ -87,7 +87,7 @@ public class YARNITCase extends YarnTestBase { jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); - ClusterClient<ApplicationId> clusterClient = flip6YarnClusterDescriptor.deployJobCluster( + ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, true); http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index d00a9c4..3767629 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -102,7 +102,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { */ @Test public void testClientStartup() throws IOException { - assumeTrue("Flip-6 does not start TMs upfront.", !flip6); + assumeTrue("The new mode does not start TMs upfront.", !isNewMode); LOG.info("Starting testClientStartup()"); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", @@ -192,7 +192,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { */ @Test(timeout = 100000) // timeout after 100 seconds public void testTaskManagerFailure() throws Exception { - assumeTrue("Flip-6 does not start TMs upfront.", !flip6); + assumeTrue("The new mode does not start TMs upfront.", !isNewMode); LOG.info("Starting testTaskManagerFailure()"); Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 464e73c..d9b02fb 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -99,7 +99,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { runner.join(); checkForLogString("The Flink YARN client has been started in detached mode"); - if (!flip6) { + if (!isNewMode) { LOG.info("Waiting until two containers are running"); // wait until two containers are running while (getRunningContainers() < 2) { @@ -241,7 +241,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); Configuration configuration = GlobalConfiguration.loadConfiguration(); - try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( + try (final AbstractYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor( configuration, getYarnConfiguration(), confDirPath, http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index 2a1b099..c5d683e 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -90,7 +90,7 @@ public class YarnConfigurationITCase extends YarnTestBase { configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, (4L << 20)); final YarnConfiguration yarnConfiguration = getYarnConfiguration(); - final Flip6YarnClusterDescriptor clusterDescriptor = new Flip6YarnClusterDescriptor( + final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( configuration, yarnConfiguration, CliFrontend.getConfigurationDirectoryFromEnv(), http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 73abc87..421e4c0 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -81,7 +81,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; -import static org.apache.flink.configuration.CoreOptions.OLD_MODE; +import static org.apache.flink.configuration.CoreOptions.LEGACY_MODE; /** * This base class allows to use the MiniYARNCluster. @@ -153,7 +153,7 @@ public abstract class YarnTestBase extends TestLogger { protected org.apache.flink.configuration.Configuration flinkConfiguration; - protected boolean flip6; + protected boolean isNewMode; static { YARN_CONFIGURATION = new YarnConfiguration(); @@ -220,7 +220,7 @@ public abstract class YarnTestBase extends TestLogger { } flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); - flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE)); + isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE)); } @Nullable @@ -528,7 +528,7 @@ public abstract class YarnTestBase extends TestLogger { globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), keytab); globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), principal); - globalConfiguration.setString(CoreOptions.MODE.key(), OLD_MODE); + globalConfiguration.setString(CoreOptions.MODE.key(), LEGACY_MODE); BootstrapTools.writeConfiguration( globalConfiguration, http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index c80818a..ef266f0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -1042,7 +1042,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor LOG.debug("Application State: {}", appState); switch(appState) { case FAILED: - case FINISHED: //TODO: the finished state may be valid in flip-6 + case FINISHED: case KILLED: throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + appState + " during deployment. \n" + http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java deleted file mode 100644 index 1374ca2..0000000 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.client.deployment.ClusterDeploymentException; -import org.apache.flink.client.deployment.ClusterSpecification; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint; -import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint; - -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; - -/** - * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the - * new application master for a job under flip-6. - */ -public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor { - - public Flip6YarnClusterDescriptor( - Configuration flinkConfiguration, - YarnConfiguration yarnConfiguration, - String configurationDirectory, - YarnClient yarnClient, - boolean sharedYarnClient) { - super( - flinkConfiguration, - yarnConfiguration, - configurationDirectory, - yarnClient, - sharedYarnClient); - } - - @Override - protected String getYarnSessionClusterEntrypoint() { - return YarnSessionClusterEntrypoint.class.getName(); - } - - @Override - protected String getYarnJobClusterEntrypoint() { - return YarnJobClusterEntrypoint.class.getName(); - } - - @Override - public ClusterClient<ApplicationId> deployJobCluster( - ClusterSpecification clusterSpecification, - JobGraph jobGraph, - boolean detached) throws ClusterDeploymentException { - - // this is required to work with Flip-6 because the slots are allocated - // lazily - jobGraph.setAllowQueuedScheduling(true); - - try { - return deployInternal( - clusterSpecification, - "Flink per-job cluster", - getYarnJobClusterEntrypoint(), - jobGraph, - detached); - } catch (Exception e) { - throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e); - } - } - - @Override - protected ClusterClient<ApplicationId> createYarnClusterClient( - AbstractYarnClusterDescriptor descriptor, - int numberTaskManagers, - int slotsPerTaskManager, - ApplicationReport report, - Configuration flinkConfiguration, - boolean perJobCluster) throws Exception { - return new RestClusterClient<>( - flinkConfiguration, - report.getApplicationId()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java new file mode 100644 index 0000000..443e6a5 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Legacy implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}. + */ +public class LegacyYarnClusterDescriptor extends AbstractYarnClusterDescriptor { + + public LegacyYarnClusterDescriptor( + Configuration flinkConfiguration, + YarnConfiguration yarnConfiguration, + String configurationDirectory, + YarnClient yarnClient, + boolean sharedYarnClient) { + super( + flinkConfiguration, + yarnConfiguration, + configurationDirectory, + yarnClient, + sharedYarnClient); + } + + @Override + protected String getYarnSessionClusterEntrypoint() { + return YarnApplicationMasterRunner.class.getName(); + } + + @Override + protected String getYarnJobClusterEntrypoint() { + throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode."); + } + + @Override + public YarnClusterClient deployJobCluster( + ClusterSpecification clusterSpecification, + JobGraph jobGraph, + boolean detached) { + throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet."); + } + + @Override + protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception { + return new YarnClusterClient( + descriptor, + numberTaskManagers, + slotsPerTaskManager, + report, + flinkConfiguration, + perJobCluster); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 8625cee..3dff72f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -18,10 +18,14 @@ package org.apache.flink.yarn; +import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint; +import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -29,7 +33,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; /** - * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}. + * Implementation of {@link AbstractYarnClusterDescriptor} which is used to start the + * application master. */ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor { @@ -49,30 +54,45 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor { @Override protected String getYarnSessionClusterEntrypoint() { - return YarnApplicationMasterRunner.class.getName(); + return YarnSessionClusterEntrypoint.class.getName(); } @Override protected String getYarnJobClusterEntrypoint() { - throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode."); + return YarnJobClusterEntrypoint.class.getName(); } @Override - public YarnClusterClient deployJobCluster( - ClusterSpecification clusterSpecification, - JobGraph jobGraph, - boolean detached) { - throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet."); + public ClusterClient<ApplicationId> deployJobCluster( + ClusterSpecification clusterSpecification, + JobGraph jobGraph, + boolean detached) throws ClusterDeploymentException { + + // this is required because the slots are allocated lazily + jobGraph.setAllowQueuedScheduling(true); + + try { + return deployInternal( + clusterSpecification, + "Flink per-job cluster", + getYarnJobClusterEntrypoint(), + jobGraph, + detached); + } catch (Exception e) { + throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e); + } } @Override - protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception { - return new YarnClusterClient( - descriptor, - numberTaskManagers, - slotsPerTaskManager, - report, + protected ClusterClient<ApplicationId> createYarnClusterClient( + AbstractYarnClusterDescriptor descriptor, + int numberTaskManagers, + int slotsPerTaskManager, + ApplicationReport report, + Configuration flinkConfiguration, + boolean perJobCluster) throws Exception { + return new RestClusterClient<>( flinkConfiguration, - perJobCluster); + report.getApplicationId()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 446377f..16abffa 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -40,7 +40,7 @@ import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; -import org.apache.flink.yarn.Flip6YarnClusterDescriptor; +import org.apache.flink.yarn.LegacyYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -158,7 +158,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId private final String yarnPropertiesFileLocation; - private final boolean flip6; + private final boolean isNewMode; private final YarnConfiguration yarnConfiguration; @@ -183,7 +183,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory); this.acceptInteractiveInput = acceptInteractiveInput; - this.flip6 = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.FLIP6_MODE); + this.isNewMode = configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE); // Create the command line options @@ -366,7 +366,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId } private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) { - if (!flip6 && !cmd.hasOption(container.getOpt())) { // number of containers is required option! + if (!isNewMode && !cmd.hasOption(container.getOpt())) { // number of containers is required option! LOG.error("Missing required argument {}", container.getOpt()); printUsage(); throw new IllegalArgumentException("Missing required argument " + container.getOpt()); @@ -971,15 +971,15 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId yarnClient.init(yarnConfiguration); yarnClient.start(); - if (flip6) { - return new Flip6YarnClusterDescriptor( + if (isNewMode) { + return new YarnClusterDescriptor( configuration, yarnConfiguration, configurationDirectory, yarnClient, false); } else { - return new YarnClusterDescriptor( + return new LegacyYarnClusterDescriptor( configuration, yarnConfiguration, configurationDirectory, http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 52bf8bb..f206b66 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -57,7 +57,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** - * Tests for the {@link YarnClusterDescriptor}. + * Tests for the {@link LegacyYarnClusterDescriptor}. */ public class YarnClusterDescriptorTest extends TestLogger { @@ -95,7 +95,7 @@ public class YarnClusterDescriptorTest extends TestLogger { final Configuration flinkConfiguration = new Configuration(); flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0); - YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( + LegacyYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor( flinkConfiguration, yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), @@ -132,7 +132,7 @@ public class YarnClusterDescriptorTest extends TestLogger { configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE); configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0); - YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( + LegacyYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor( configuration, yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), @@ -166,7 +166,7 @@ public class YarnClusterDescriptorTest extends TestLogger { @Test public void testSetupApplicationMasterContainer() { Configuration cfg = new Configuration(); - YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( + LegacyYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor( cfg, yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), @@ -417,7 +417,7 @@ public class YarnClusterDescriptorTest extends TestLogger { */ @Test public void testExplicitLibShipping() throws Exception { - AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor( + AbstractYarnClusterDescriptor descriptor = new LegacyYarnClusterDescriptor( new Configuration(), yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), @@ -460,7 +460,7 @@ public class YarnClusterDescriptorTest extends TestLogger { */ @Test public void testEnvironmentLibShipping() throws Exception { - AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor( + AbstractYarnClusterDescriptor descriptor = new LegacyYarnClusterDescriptor( new Configuration(), yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), @@ -500,7 +500,7 @@ public class YarnClusterDescriptorTest extends TestLogger { */ @Test public void testYarnClientShutDown() { - YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( + LegacyYarnClusterDescriptor yarnClusterDescriptor = new LegacyYarnClusterDescriptor( new Configuration(), yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), @@ -515,7 +515,7 @@ public class YarnClusterDescriptorTest extends TestLogger { closableYarnClient.init(yarnConfiguration); closableYarnClient.start(); - yarnClusterDescriptor = new YarnClusterDescriptor( + yarnClusterDescriptor = new LegacyYarnClusterDescriptor( new Configuration(), yarnConfiguration, temporaryFolder.getRoot().getAbsolutePath(), http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8cdaf94..cca1527 100644 --- a/pom.xml +++ b/pom.xml @@ -127,9 +127,9 @@ under the License. <powermock.version>1.6.5</powermock.version> <hamcrest.version>1.3</hamcrest.version> <japicmp.skip>false</japicmp.skip> - <!-- run all groups except flip6 by default --> - <test.excludedGroups>org.apache.flink.testutils.category.Flip6</test.excludedGroups> - <codebase>old</codebase> + <!-- run all groups except new by default --> + <test.excludedGroups>org.apache.flink.testutils.category.New</test.excludedGroups> + <codebase>legacy</codebase> <!-- Keeping the MiniKDC version fixed instead of taking hadoop version dependency to support testing Kafka, ZK etc., modules that does not have Hadoop dependency @@ -593,16 +593,16 @@ under the License. <profiles> <profile> - <id>flip6</id> + <id>new</id> <activation> <property> - <name>flip6</name> + <name>new</name> </property> </activation> <properties> <!-- clear the excluded groups list --> <test.excludedGroups></test.excludedGroups> - <codebase>flip6</codebase> + <codebase>new</codebase> </properties> </profile>