Repository: flink
Updated Branches:
  refs/heads/release-1.5 3f1d43d5d -> 44506326f


http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 4a2219a..302fe3e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.testutils.category.New;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Tests the availability of accumulator results during runtime.
  */
-@Category(Flip6.class)
+@Category(New.class)
 public class AccumulatorLiveITCase extends TestLogger {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(AccumulatorLiveITCase.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index dd1c398..074b721 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -81,7 +81,7 @@ public class AutoParallelismITCase extends TestLogger {
                        assertEquals(PARALLELISM, resultCollection.size());
                }
                catch (Exception ex) {
-                       if 
(MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.OLD)) {
+                       if 
(MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.LEGACY)) {
                                throw ex;
                        }
                        assertTrue(

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index aeff578..a3a551c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -78,7 +78,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
        public static void setupCluster() throws Exception {
                configuration = new Configuration();
 
-               if 
(CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+               if 
(CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
                        configuration.setInteger(WebOptions.PORT, 0);
                        final MiniCluster miniCluster = new MiniCluster(
                                new MiniClusterConfiguration.Builder()
@@ -115,7 +115,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
         */
        @Test(expected = FlinkException.class)
        public void testInvalidAkkaConfiguration() throws Throwable {
-               
assumeTrue(CoreOptions.OLD_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
+               
assumeTrue(CoreOptions.LEGACY_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE)));
                Configuration config = new Configuration();
                config.setString(AkkaOptions.STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 2b97de8..d217a2a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -157,7 +157,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
         */
        public void testJobManagerFailure(String zkQuorum, final File 
coordinateDir) throws Exception {
                Configuration config = new Configuration();
-               config.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+               config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkQuorum);
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
FileStateBackendBasePath.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f76375d..b85a410 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -149,7 +149,7 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                        final Throwable[] errorRef = new Throwable[1];
 
                        final Configuration configuration = new Configuration();
-                       configuration.setString(CoreOptions.MODE, 
CoreOptions.OLD_MODE);
+                       configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
 
                        // start the test program, which infinitely blocks
                        Runnable programRunner = new Runnable() {

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 69fe7d6..7dc6f0c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -68,7 +68,7 @@ public class TaskManagerProcessFailureBatchRecoveryITCase 
extends AbstractTaskMa
        public void testTaskManagerFailure(int jobManagerPort, final File 
coordinateDir) throws Exception {
 
                final Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+               configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, 
configuration);
                env.setParallelism(PARALLELISM);
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
10000));

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 1ecbff3..766a799 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -67,7 +67,7 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase 
extends AbstractTa
                final File tempCheckpointDir = tempFolder.newFolder();
 
                final Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+               configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
                        "localhost",
                        jobManagerPort,

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 72f700a..b5c2aaf 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -144,8 +144,8 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                        } catch (Exception e) {
                                String exceptionString = 
ExceptionUtils.stringifyException(e);
                                if 
(!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") 
// legacy
-                                               || 
exceptionString.matches("(.*\n)*.*Not all required tasks are currently 
running(.*\n)*") // flip6
-                                               || 
exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not 
ready\\)(.*\n)*"))) { // flip6
+                                               || 
exceptionString.matches("(.*\n)*.*Not all required tasks are currently 
running(.*\n)*") // new
+                                               || 
exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not 
ready\\)(.*\n)*"))) { // new
                                        throw e;
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 4d2aaa0..37b8d41 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -36,7 +36,7 @@ import java.util.List;
  * flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set 
of files which
  * are shipped to the yarn cluster. This is necessary to load the testing 
classes.
  */
-public class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
+public class TestingYarnClusterDescriptor extends LegacyYarnClusterDescriptor {
 
        public TestingYarnClusterDescriptor(
                        Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index f9c03f9..18bcfeb 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -106,7 +106,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
         */
        @Test
        public void testMultipleAMKill() throws Exception {
-               assumeTrue("This test only works with the old actor based 
code.", !flip6);
+               assumeTrue("This test only works with the old actor based 
code.", !isNewMode);
                final int numberKillingAttempts = numberApplicationAttempts - 1;
                String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                final Configuration configuration = 
GlobalConfiguration.loadConfiguration();

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index ef6706a..758a098 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -57,15 +57,15 @@ public class YARNITCase extends YarnTestBase {
                configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
                final YarnClient yarnClient = getYarnClient();
 
-               try (final Flip6YarnClusterDescriptor 
flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
+               try (final YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor(
                        configuration,
                        getYarnConfiguration(),
                        System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
                        yarnClient,
                        true)) {
 
-                       flip6YarnClusterDescriptor.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-                       
flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+                       yarnClusterDescriptor.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
+                       
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
                        final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
                                .setMasterMemoryMB(768)
@@ -87,7 +87,7 @@ public class YARNITCase extends YarnTestBase {
 
                        jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-                       ClusterClient<ApplicationId> clusterClient = 
flip6YarnClusterDescriptor.deployJobCluster(
+                       ClusterClient<ApplicationId> clusterClient = 
yarnClusterDescriptor.deployJobCluster(
                                clusterSpecification,
                                jobGraph,
                                true);

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index d00a9c4..3767629 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -102,7 +102,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
         */
        @Test
        public void testClientStartup() throws IOException {
-               assumeTrue("Flip-6 does not start TMs upfront.", !flip6);
+               assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
                LOG.info("Starting testClientStartup()");
                runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
                                                "-n", "1",
@@ -192,7 +192,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
         */
        @Test(timeout = 100000) // timeout after 100 seconds
        public void testTaskManagerFailure() throws Exception {
-               assumeTrue("Flip-6 does not start TMs upfront.", !flip6);
+               assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
                LOG.info("Starting testTaskManagerFailure()");
                Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
                                "-n", "1",

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 464e73c..d9b02fb 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -99,7 +99,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                runner.join();
                checkForLogString("The Flink YARN client has been started in 
detached mode");
 
-               if (!flip6) {
+               if (!isNewMode) {
                        LOG.info("Waiting until two containers are running");
                        // wait until two containers are running
                        while (getRunningContainers() < 2) {
@@ -241,7 +241,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
                Configuration configuration = 
GlobalConfiguration.loadConfiguration();
 
-               try (final AbstractYarnClusterDescriptor clusterDescriptor = 
new YarnClusterDescriptor(
+               try (final AbstractYarnClusterDescriptor clusterDescriptor = 
new LegacyYarnClusterDescriptor(
                        configuration,
                        getYarnConfiguration(),
                        confDirPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2a1b099..c5d683e 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -90,7 +90,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
                
configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, (4L << 
20));
 
                final YarnConfiguration yarnConfiguration = 
getYarnConfiguration();
-               final Flip6YarnClusterDescriptor clusterDescriptor = new 
Flip6YarnClusterDescriptor(
+               final YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
                        configuration,
                        yarnConfiguration,
                        CliFrontend.getConfigurationDirectoryFromEnv(),

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 73abc87..421e4c0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -81,7 +81,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
 
-import static org.apache.flink.configuration.CoreOptions.OLD_MODE;
+import static org.apache.flink.configuration.CoreOptions.LEGACY_MODE;
 
 /**
  * This base class allows to use the MiniYARNCluster.
@@ -153,7 +153,7 @@ public abstract class YarnTestBase extends TestLogger {
 
        protected org.apache.flink.configuration.Configuration 
flinkConfiguration;
 
-       protected boolean flip6;
+       protected boolean isNewMode;
 
        static {
                YARN_CONFIGURATION = new YarnConfiguration();
@@ -220,7 +220,7 @@ public abstract class YarnTestBase extends TestLogger {
                }
 
                flinkConfiguration = new 
org.apache.flink.configuration.Configuration(globalConfiguration);
-               flip6 = 
CoreOptions.FLIP6_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
+               isNewMode = 
CoreOptions.NEW_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
        }
 
        @Nullable
@@ -528,7 +528,7 @@ public abstract class YarnTestBase extends TestLogger {
 
                                
globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), 
keytab);
                                
globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), 
principal);
-                               
globalConfiguration.setString(CoreOptions.MODE.key(), OLD_MODE);
+                               
globalConfiguration.setString(CoreOptions.MODE.key(), LEGACY_MODE);
 
                                BootstrapTools.writeConfiguration(
                                        globalConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c80818a..ef266f0 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1042,7 +1042,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        LOG.debug("Application State: {}", appState);
                        switch(appState) {
                                case FAILED:
-                               case FINISHED: //TODO: the finished state may 
be valid in flip-6
+                               case FINISHED:
                                case KILLED:
                                        throw new YarnDeploymentException("The 
YARN application unexpectedly switched to state "
                                                + appState + " during 
deployment. \n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
deleted file mode 100644
index 1374ca2..0000000
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.deployment.ClusterDeploymentException;
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
-import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * Implementation of {@link 
org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
- * new application master for a job under flip-6.
- */
-public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
-
-       public Flip6YarnClusterDescriptor(
-                       Configuration flinkConfiguration,
-                       YarnConfiguration yarnConfiguration,
-                       String configurationDirectory,
-                       YarnClient yarnClient,
-                       boolean sharedYarnClient) {
-               super(
-                       flinkConfiguration,
-                       yarnConfiguration,
-                       configurationDirectory,
-                       yarnClient,
-                       sharedYarnClient);
-       }
-
-       @Override
-       protected String getYarnSessionClusterEntrypoint() {
-               return YarnSessionClusterEntrypoint.class.getName();
-       }
-
-       @Override
-       protected String getYarnJobClusterEntrypoint() {
-               return YarnJobClusterEntrypoint.class.getName();
-       }
-
-       @Override
-       public ClusterClient<ApplicationId> deployJobCluster(
-               ClusterSpecification clusterSpecification,
-               JobGraph jobGraph,
-               boolean detached) throws ClusterDeploymentException {
-
-               // this is required to work with Flip-6 because the slots are 
allocated
-               // lazily
-               jobGraph.setAllowQueuedScheduling(true);
-
-               try {
-                       return deployInternal(
-                               clusterSpecification,
-                               "Flink per-job cluster",
-                               getYarnJobClusterEntrypoint(),
-                               jobGraph,
-                               detached);
-               } catch (Exception e) {
-                       throw new ClusterDeploymentException("Could not deploy 
Yarn job cluster.", e);
-               }
-       }
-
-       @Override
-       protected ClusterClient<ApplicationId> createYarnClusterClient(
-                       AbstractYarnClusterDescriptor descriptor,
-                       int numberTaskManagers,
-                       int slotsPerTaskManager,
-                       ApplicationReport report,
-                       Configuration flinkConfiguration,
-                       boolean perJobCluster) throws Exception {
-               return new RestClusterClient<>(
-                       flinkConfiguration,
-                       report.getApplicationId());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
new file mode 100644
index 0000000..443e6a5
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/LegacyYarnClusterDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Legacy implementation of {@link AbstractYarnClusterDescriptor} which starts 
an {@link YarnApplicationMasterRunner}.
+ */
+public class LegacyYarnClusterDescriptor extends AbstractYarnClusterDescriptor 
{
+
+       public LegacyYarnClusterDescriptor(
+                       Configuration flinkConfiguration,
+                       YarnConfiguration yarnConfiguration,
+                       String configurationDirectory,
+                       YarnClient yarnClient,
+                       boolean sharedYarnClient) {
+               super(
+                       flinkConfiguration,
+                       yarnConfiguration,
+                       configurationDirectory,
+                       yarnClient,
+                       sharedYarnClient);
+       }
+
+       @Override
+       protected String getYarnSessionClusterEntrypoint() {
+               return YarnApplicationMasterRunner.class.getName();
+       }
+
+       @Override
+       protected String getYarnJobClusterEntrypoint() {
+               throw new UnsupportedOperationException("The old Yarn 
descriptor does not support proper per-job mode.");
+       }
+
+       @Override
+       public YarnClusterClient deployJobCluster(
+                       ClusterSpecification clusterSpecification,
+                       JobGraph jobGraph,
+                       boolean detached) {
+               throw new UnsupportedOperationException("Cannot deploy a 
per-job yarn cluster yet.");
+       }
+
+       @Override
+       protected ClusterClient<ApplicationId> 
createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int 
numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, 
Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
+               return new YarnClusterClient(
+                       descriptor,
+                       numberTaskManagers,
+                       slotsPerTaskManager,
+                       report,
+                       flinkConfiguration,
+                       perJobCluster);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 8625cee..3dff72f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -29,7 +33,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 /**
- * Default implementation of {@link AbstractYarnClusterDescriptor} which 
starts an {@link YarnApplicationMasterRunner}.
+ * Implementation of {@link AbstractYarnClusterDescriptor} which is used to 
start the
+ * application master.
  */
 public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
@@ -49,30 +54,45 @@ public class YarnClusterDescriptor extends 
AbstractYarnClusterDescriptor {
 
        @Override
        protected String getYarnSessionClusterEntrypoint() {
-               return YarnApplicationMasterRunner.class.getName();
+               return YarnSessionClusterEntrypoint.class.getName();
        }
 
        @Override
        protected String getYarnJobClusterEntrypoint() {
-               throw new UnsupportedOperationException("The old Yarn 
descriptor does not support proper per-job mode.");
+               return YarnJobClusterEntrypoint.class.getName();
        }
 
        @Override
-       public YarnClusterClient deployJobCluster(
-                       ClusterSpecification clusterSpecification,
-                       JobGraph jobGraph,
-                       boolean detached) {
-               throw new UnsupportedOperationException("Cannot deploy a 
per-job yarn cluster yet.");
+       public ClusterClient<ApplicationId> deployJobCluster(
+               ClusterSpecification clusterSpecification,
+               JobGraph jobGraph,
+               boolean detached) throws ClusterDeploymentException {
+
+               // this is required because the slots are allocated lazily
+               jobGraph.setAllowQueuedScheduling(true);
+
+               try {
+                       return deployInternal(
+                               clusterSpecification,
+                               "Flink per-job cluster",
+                               getYarnJobClusterEntrypoint(),
+                               jobGraph,
+                               detached);
+               } catch (Exception e) {
+                       throw new ClusterDeploymentException("Could not deploy 
Yarn job cluster.", e);
+               }
        }
 
        @Override
-       protected ClusterClient<ApplicationId> 
createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int 
numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, 
Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
-               return new YarnClusterClient(
-                       descriptor,
-                       numberTaskManagers,
-                       slotsPerTaskManager,
-                       report,
+       protected ClusterClient<ApplicationId> createYarnClusterClient(
+                       AbstractYarnClusterDescriptor descriptor,
+                       int numberTaskManagers,
+                       int slotsPerTaskManager,
+                       ApplicationReport report,
+                       Configuration flinkConfiguration,
+                       boolean perJobCluster) throws Exception {
+               return new RestClusterClient<>(
                        flinkConfiguration,
-                       perJobCluster);
+                       report.getApplicationId());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 446377f..16abffa 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -40,7 +40,7 @@ import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.Flip6YarnClusterDescriptor;
+import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -158,7 +158,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
 
        private final String yarnPropertiesFileLocation;
 
-       private final boolean flip6;
+       private final boolean isNewMode;
 
        private final YarnConfiguration yarnConfiguration;
 
@@ -183,7 +183,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                this.configurationDirectory = 
Preconditions.checkNotNull(configurationDirectory);
                this.acceptInteractiveInput = acceptInteractiveInput;
 
-               this.flip6 = 
configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.FLIP6_MODE);
+               this.isNewMode = 
configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE);
 
                // Create the command line options
 
@@ -366,7 +366,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
        }
 
        private ClusterSpecification createClusterSpecification(Configuration 
configuration, CommandLine cmd) {
-               if (!flip6 && !cmd.hasOption(container.getOpt())) { // number 
of containers is required option!
+               if (!isNewMode && !cmd.hasOption(container.getOpt())) { // 
number of containers is required option!
                        LOG.error("Missing required argument {}", 
container.getOpt());
                        printUsage();
                        throw new IllegalArgumentException("Missing required 
argument " + container.getOpt());
@@ -971,15 +971,15 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine<ApplicationId
                yarnClient.init(yarnConfiguration);
                yarnClient.start();
 
-               if (flip6) {
-                       return new Flip6YarnClusterDescriptor(
+               if (isNewMode) {
+                       return new YarnClusterDescriptor(
                                configuration,
                                yarnConfiguration,
                                configurationDirectory,
                                yarnClient,
                                false);
                } else {
-                       return new YarnClusterDescriptor(
+                       return new LegacyYarnClusterDescriptor(
                                configuration,
                                yarnConfiguration,
                                configurationDirectory,

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 52bf8bb..f206b66 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -57,7 +57,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link YarnClusterDescriptor}.
+ * Tests for the {@link LegacyYarnClusterDescriptor}.
  */
 public class YarnClusterDescriptorTest extends TestLogger {
 
@@ -95,7 +95,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                final Configuration flinkConfiguration = new Configuration();
                
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN,
 0);
 
-               YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
+               LegacyYarnClusterDescriptor clusterDescriptor = new 
LegacyYarnClusterDescriptor(
                        flinkConfiguration,
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
@@ -132,7 +132,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                configuration.setInteger(YarnConfigOptions.VCORES, 
Integer.MAX_VALUE);
                
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 
0);
 
-               YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
+               LegacyYarnClusterDescriptor clusterDescriptor = new 
LegacyYarnClusterDescriptor(
                        configuration,
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
@@ -166,7 +166,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
        @Test
        public void testSetupApplicationMasterContainer() {
                Configuration cfg = new Configuration();
-               YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
+               LegacyYarnClusterDescriptor clusterDescriptor = new 
LegacyYarnClusterDescriptor(
                        cfg,
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
@@ -417,7 +417,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
         */
        @Test
        public void testExplicitLibShipping() throws Exception {
-               AbstractYarnClusterDescriptor descriptor = new 
YarnClusterDescriptor(
+               AbstractYarnClusterDescriptor descriptor = new 
LegacyYarnClusterDescriptor(
                        new Configuration(),
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
@@ -460,7 +460,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
         */
        @Test
        public void testEnvironmentLibShipping() throws Exception {
-               AbstractYarnClusterDescriptor descriptor = new 
YarnClusterDescriptor(
+               AbstractYarnClusterDescriptor descriptor = new 
LegacyYarnClusterDescriptor(
                        new Configuration(),
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
@@ -500,7 +500,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
         */
        @Test
        public void testYarnClientShutDown() {
-               YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor(
+               LegacyYarnClusterDescriptor yarnClusterDescriptor = new 
LegacyYarnClusterDescriptor(
                        new Configuration(),
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),
@@ -515,7 +515,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                closableYarnClient.init(yarnConfiguration);
                closableYarnClient.start();
 
-               yarnClusterDescriptor = new YarnClusterDescriptor(
+               yarnClusterDescriptor = new LegacyYarnClusterDescriptor(
                        new Configuration(),
                        yarnConfiguration,
                        temporaryFolder.getRoot().getAbsolutePath(),

http://git-wip-us.apache.org/repos/asf/flink/blob/143f37df/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8cdaf94..cca1527 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,9 +127,9 @@ under the License.
                <powermock.version>1.6.5</powermock.version>
                <hamcrest.version>1.3</hamcrest.version>
                <japicmp.skip>false</japicmp.skip>
-               <!-- run all groups except flip6 by default -->
-               
<test.excludedGroups>org.apache.flink.testutils.category.Flip6</test.excludedGroups>
-               <codebase>old</codebase>
+               <!-- run all groups except new by default -->
+               
<test.excludedGroups>org.apache.flink.testutils.category.New</test.excludedGroups>
+               <codebase>legacy</codebase>
                <!--
                        Keeping the MiniKDC version fixed instead of taking 
hadoop version dependency
                        to support testing Kafka, ZK etc., modules that does 
not have Hadoop dependency
@@ -593,16 +593,16 @@ under the License.
        <profiles>
 
                <profile>
-                       <id>flip6</id>
+                       <id>new</id>
                        <activation>
                                <property>
-                                       <name>flip6</name>
+                                       <name>new</name>
                                </property>
                        </activation>
                        <properties>
                                <!-- clear the excluded groups list -->
                                <test.excludedGroups></test.excludedGroups>
-                               <codebase>flip6</codebase>
+                               <codebase>new</codebase>
                        </properties>
                </profile>
 

Reply via email to