This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd9b5de41eb9935c7ff1350f3b6e5d818104c677
Author: Till Rohrmann <[email protected]>
AuthorDate: Sat Sep 22 19:54:07 2018 +0200

    [FLINK-10396] Remove CodebaseType
    
    CodebaseType was used to distinguish between the legacy and new mode.
    This commit removes the CodebaseType and the codebase switch in the
    MiniClusterResource.
    
    This closes #6748.
---
 .../runtime/webmonitor/WebFrontendITCase.java      | 36 +++-------
 .../webmonitor/handlers/JarRunHandlerTest.java     |  2 -
 .../webmonitor/history/HistoryServerTest.java      |  2 -
 .../apache/flink/api/scala/ScalaShellITCase.scala  | 44 +++++--------
 .../api/scala/ScalaShellLocalStartupITCase.scala   |  9 +--
 .../flink/test/util/MiniClusterResource.java       | 76 +---------------------
 .../util/MiniClusterResourceConfiguration.java     | 24 +------
 .../org/apache/flink/test/util/TestBaseUtils.java  | 22 -------
 .../test/example/client/JobRetrievalITCase.java    |  2 -
 .../flink/test/misc/AutoParallelismITCase.java     |  4 --
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  9 +--
 11 files changed, 28 insertions(+), 202 deletions(-)

diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index b90277f..3ae830d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -152,11 +152,7 @@ public class WebFrontendITCase extends TestLogger {
                if (notFoundJobConnection.getResponseCode() >= 400) {
                        // we don't set the content-encoding header
                        
Assert.assertNull(notFoundJobConnection.getContentEncoding());
-                       if (CLUSTER.getCodebaseType() == 
TestBaseUtils.CodebaseType.NEW) {
-                               Assert.assertEquals("application/json; 
charset=UTF-8", notFoundJobConnection.getContentType());
-                       } else {
-                               Assert.assertEquals("text/plain; 
charset=UTF-8", notFoundJobConnection.getContentType());
-                       }
+                       Assert.assertEquals("application/json; charset=UTF-8", 
notFoundJobConnection.getContentType());
                } else {
                        throw new RuntimeException("Request for non-existing 
job did not return an error.");
                }
@@ -280,23 +276,13 @@ public class WebFrontendITCase extends TestLogger {
                final Deadline deadline = testTimeout.fromNow();
 
                try (HttpTestClient client = new HttpTestClient("localhost", 
CLUSTER.getWebUIPort())) {
-                       if (CLUSTER.getCodebaseType() == 
TestBaseUtils.CodebaseType.NEW) {
-                               // stop the job
-                               client.sendPatchRequest("/jobs/" + jid + 
"/?mode=stop", deadline.timeLeft());
-                               HttpTestClient.SimpleHttpResponse response = 
client.getNextResponse(deadline.timeLeft());
-
-                               assertEquals(HttpResponseStatus.ACCEPTED, 
response.getStatus());
-                               assertEquals("application/json; charset=UTF-8", 
response.getType());
-                               assertEquals("{}", response.getContent());
-                       } else {
-                               // stop the job
-                               client.sendDeleteRequest("/jobs/" + jid + 
"/stop", deadline.timeLeft());
-                               HttpTestClient.SimpleHttpResponse response = 
client.getNextResponse(deadline.timeLeft());
-
-                               assertEquals(HttpResponseStatus.OK, 
response.getStatus());
-                               assertEquals("application/json; charset=UTF-8", 
response.getType());
-                               assertEquals("{}", response.getContent());
-                       }
+                       // stop the job
+                       client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", 
deadline.timeLeft());
+                       HttpTestClient.SimpleHttpResponse response = 
client.getNextResponse(deadline.timeLeft());
+
+                       assertEquals(HttpResponseStatus.ACCEPTED, 
response.getStatus());
+                       assertEquals("application/json; charset=UTF-8", 
response.getType());
+                       assertEquals("{}", response.getContent());
                }
 
                // wait for cancellation to finish
@@ -355,11 +341,7 @@ public class WebFrontendITCase extends TestLogger {
                        HttpTestClient.SimpleHttpResponse response = client
                                .getNextResponse(deadline.timeLeft());
 
-                       if (CLUSTER.getCodebaseType() == 
TestBaseUtils.CodebaseType.NEW) {
-                               assertEquals(HttpResponseStatus.ACCEPTED, 
response.getStatus());
-                       } else {
-                               assertEquals(HttpResponseStatus.OK, 
response.getStatus());
-                       }
+                       assertEquals(HttpResponseStatus.ACCEPTED, 
response.getStatus());
                        assertEquals("application/json; charset=UTF-8", 
response.getType());
                        assertEquals("{}", response.getContent());
                }
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 6427f4d..a2138e1 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.ClassRule;
@@ -67,7 +66,6 @@ public class JarRunHandlerTest {
                                .setConfiguration(config)
                                .setNumberTaskManagers(1)
                                .setNumberSlotsPerTaskManager(1)
-                               .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
                                .build());
                clusterResource.before();
 
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4451e6d..18dd76e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -81,7 +80,6 @@ public class HistoryServerTest extends TestLogger {
                                .setConfiguration(clusterConfig)
                                .setNumberTaskManagers(1)
                                .setNumberSlotsPerTaskManager(1)
-                               .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
                                .build());
                cluster.before();
        }
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index c6b43c2..6a41793 100644
--- 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,13 +19,11 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Objects
 
 import org.apache.flink.configuration.{Configuration, CoreOptions, 
RestOptions, TaskManagerOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.minicluster.{MiniCluster, 
MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
-import org.apache.flink.test.util.TestBaseUtils.CodebaseType
+import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit._
 import org.junit.rules.TemporaryFolder
@@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    val isNew = TestBaseUtils.isNewCodebase()
-    if (isNew) {
-      configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-      // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
-      configuration.setInteger(RestOptions.PORT, 8082)
-      val miniConfig = new MiniClusterConfiguration.Builder()
-        .setConfiguration(configuration)
-        .setNumSlotsPerTaskManager(parallelism)
-        .build()
-
-      val miniCluster = new MiniCluster(miniConfig)
-      miniCluster.start()
-      port = miniCluster.getRestAddress.getPort
-      hostname = miniCluster.getRestAddress.getHost
-
-      cluster = Some(Left(miniCluster))
-    } else {
-      configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-      configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-      val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-      hostname = standaloneCluster.getHostname
-      port = standaloneCluster.getPort
-
-      cluster = Some(Right(standaloneCluster))
-    }
+    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+    // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
+    configuration.setInteger(RestOptions.PORT, 8082)
+    val miniConfig = new MiniClusterConfiguration.Builder()
+      .setConfiguration(configuration)
+      .setNumSlotsPerTaskManager(parallelism)
+      .build()
+
+    val miniCluster = new MiniCluster(miniConfig)
+    miniCluster.start()
+    port = miniCluster.getRestAddress.getPort
+    hostname = miniCluster.getRestAddress.getHost
+
+    cluster = Some(Left(miniCluster))
   }
 
   @AfterClass
diff --git 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index f13f57b..3952a0f 100644
--- 
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ 
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -20,9 +20,8 @@ package org.apache.flink.api.scala
 
 import java.io._
 
-import org.apache.flink.configuration.{Configuration, CoreOptions}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
 import org.junit.{Assert, Rule, Test}
@@ -86,12 +85,6 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     System.setOut(new PrintStream(baos))
 
     val configuration = new Configuration()
-    val mode = if (TestBaseUtils.isNewCodebase()) {
-      CoreOptions.NEW_MODE
-    } else {
-      CoreOptions.LEGACY_MODE
-    }
-    configuration.setString(CoreOptions.MODE, mode)
 
     val dir = temporaryFolder.newFolder()
     BootstrapTools.writeConfiguration(configuration, new File(dir, 
"flink-conf.yaml"))
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 1835fc6..9140bb4 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,10 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.DefaultActorSystemLoader;
 import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -30,17 +27,12 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
-import org.junit.Assume;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -49,11 +41,6 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-
 /**
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
@@ -66,8 +53,6 @@ public class MiniClusterResource extends ExternalResource {
 
        private final MiniClusterResourceConfiguration 
miniClusterResourceConfiguration;
 
-       private final TestBaseUtils.CodebaseType codebaseType;
-
        private JobExecutorService jobExecutorService;
 
        private ClusterClient<?> clusterClient;
@@ -82,11 +67,6 @@ public class MiniClusterResource extends ExternalResource {
 
        public MiniClusterResource(final MiniClusterResourceConfiguration 
miniClusterResourceConfiguration) {
                this.miniClusterResourceConfiguration = 
Preconditions.checkNotNull(miniClusterResourceConfiguration);
-               this.codebaseType = 
miniClusterResourceConfiguration.getCodebaseType();
-       }
-
-       public TestBaseUtils.CodebaseType getCodebaseType() {
-               return codebaseType;
        }
 
        public int getNumberSlots() {
@@ -111,12 +91,9 @@ public class MiniClusterResource extends ExternalResource {
 
        @Override
        public void before() throws Exception {
-               // verify that we are running in the correct test profile
-               Assume.assumeThat(TestBaseUtils.getCodebaseType(), 
is(equalTo(codebaseType)));
-
                temporaryFolder.create();
 
-               startJobExecutorService(codebaseType);
+               startMiniCluster();
 
                numberSlots = 
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * 
miniClusterResourceConfiguration.getNumberTaskManagers();
 
@@ -163,57 +140,6 @@ public class MiniClusterResource extends ExternalResource {
                }
        }
 
-       private void startJobExecutorService(TestBaseUtils.CodebaseType 
miniClusterType) throws Exception {
-               switch (miniClusterType) {
-                       case LEGACY:
-                               startLegacyMiniCluster();
-                               break;
-                       case NEW:
-                               startMiniCluster();
-                               break;
-                       default:
-                               throw new FlinkRuntimeException("Unknown 
MiniClusterType " + miniClusterType + '.');
-               }
-       }
-
-       private void startLegacyMiniCluster() throws Exception {
-               final Configuration configuration = new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
-               
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
miniClusterResourceConfiguration.getNumberTaskManagers());
-               configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
-               configuration.setString(CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
-
-               final LocalFlinkMiniCluster flinkMiniCluster = 
TestBaseUtils.startCluster(
-                       configuration,
-                       miniClusterResourceConfiguration.getRpcServiceSharing() 
== RpcServiceSharing.SHARED);
-
-               jobExecutorService = flinkMiniCluster;
-
-               switch 
(miniClusterResourceConfiguration.getRpcServiceSharing()) {
-                       case SHARED:
-                               Option<ActorSystem> actorSystemOption = 
flinkMiniCluster.firstActorSystem();
-                               
Preconditions.checkState(actorSystemOption.isDefined());
-
-                               final ActorSystem actorSystem = 
actorSystemOption.get();
-                               clusterClient = new StandaloneClusterClient(
-                                       configuration,
-                                       
flinkMiniCluster.highAvailabilityServices(),
-                                       true,
-                                       new 
DefaultActorSystemLoader(actorSystem));
-                               break;
-                       case DEDICATED:
-                               clusterClient = new 
StandaloneClusterClient(configuration, 
flinkMiniCluster.highAvailabilityServices(), true);
-                               break;
-               }
-
-               Configuration restClientConfig = new Configuration();
-               restClientConfig.setInteger(JobManagerOptions.PORT, 
flinkMiniCluster.getLeaderRPCPort());
-               this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
-
-               if (flinkMiniCluster.webMonitor().isDefined()) {
-                       webUIPort = 
flinkMiniCluster.webMonitor().get().getServerPort();
-               }
-       }
-
        private void startMiniCluster() throws Exception {
                final Configuration configuration = 
miniClusterResourceConfiguration.getConfiguration();
                configuration.setString(CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index c938920..bd521a2 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -37,8 +37,6 @@ public class MiniClusterResourceConfiguration {
 
        private final Time shutdownTimeout;
 
-       private final TestBaseUtils.CodebaseType codebaseType;
-
        private final RpcServiceSharing rpcServiceSharing;
 
        MiniClusterResourceConfiguration(
@@ -46,13 +44,11 @@ public class MiniClusterResourceConfiguration {
                int numberTaskManagers,
                int numberSlotsPerTaskManager,
                Time shutdownTimeout,
-               TestBaseUtils.CodebaseType codebaseType,
                RpcServiceSharing rpcServiceSharing) {
                this.configuration = Preconditions.checkNotNull(configuration);
                this.numberTaskManagers = numberTaskManagers;
                this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
                this.shutdownTimeout = 
Preconditions.checkNotNull(shutdownTimeout);
-               this.codebaseType = Preconditions.checkNotNull(codebaseType);
                this.rpcServiceSharing = 
Preconditions.checkNotNull(rpcServiceSharing);
        }
 
@@ -72,14 +68,6 @@ public class MiniClusterResourceConfiguration {
                return shutdownTimeout;
        }
 
-       /**
-        * @deprecated Will be irrelevant once the legacy mode has been removed.
-        */
-       @Deprecated
-       public TestBaseUtils.CodebaseType getCodebaseType() {
-               return codebaseType;
-       }
-
        public RpcServiceSharing getRpcServiceSharing() {
                return rpcServiceSharing;
        }
@@ -93,7 +81,6 @@ public class MiniClusterResourceConfiguration {
                private int numberTaskManagers = 1;
                private int numberSlotsPerTaskManager = 1;
                private Time shutdownTimeout = 
AkkaUtils.getTimeoutAsTime(configuration);
-               private TestBaseUtils.CodebaseType codebaseType = 
TestBaseUtils.getCodebaseType();
 
                private RpcServiceSharing rpcServiceSharing = 
RpcServiceSharing.SHARED;
 
@@ -117,22 +104,13 @@ public class MiniClusterResourceConfiguration {
                        return this;
                }
 
-               /**
-                * @deprecated Will be irrelevant once the legacy mode has been 
removed.
-                */
-               @Deprecated
-               public Builder setCodebaseType(TestBaseUtils.CodebaseType 
codebaseType) {
-                       this.codebaseType = codebaseType;
-                       return this;
-               }
-
                public Builder setRpcServiceSharing(RpcServiceSharing 
rpcServiceSharing) {
                        this.rpcServiceSharing = rpcServiceSharing;
                        return this;
                }
 
                public MiniClusterResourceConfiguration build() {
-                       return new 
MiniClusterResourceConfiguration(configuration, numberTaskManagers, 
numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+                       return new 
MiniClusterResourceConfiguration(configuration, numberTaskManagers, 
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
                }
        }
 }
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a0f52f2..363327a 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,8 +44,6 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -678,26 +676,6 @@ public class TestBaseUtils extends TestLogger {
                throw new TimeoutException("Could not get HTTP response in time 
since the service is still unavailable.");
        }
 
-       @Nonnull
-       public static CodebaseType getCodebaseType() {
-               return Objects.equals(NEW_CODEBASE, 
System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
-       }
-
-       public static boolean isNewCodebase() {
-               return CodebaseType.NEW == getCodebaseType();
-       }
-
-       /**
-        * Type of the mini cluster to start.
-        *
-        * @deprecated Will be irrelevant once the legacy mode has been removed.
-        */
-       @Deprecated
-       public enum CodebaseType {
-               LEGACY,
-               NEW
-       }
-
        /**
         * Comparator for comparable Tuples.
         * @param <T> tuple type
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 66a894f..56df46e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -58,7 +57,6 @@ public class JobRetrievalITCase extends TestLogger {
                new MiniClusterResourceConfiguration.Builder()
                        .setNumberTaskManagers(1)
                        .setNumberSlotsPerTaskManager(4)
-                       .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
                        .build());
 
        private RestClusterClient<StandaloneClusterId> client;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 4826a46..f62ccf7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -80,9 +79,6 @@ public class AutoParallelismITCase extends TestLogger {
                        assertEquals(PARALLELISM, resultCollection.size());
                }
                catch (Exception ex) {
-                       if (MINI_CLUSTER_RESOURCE.getCodebaseType() == 
TestBaseUtils.CodebaseType.LEGACY) {
-                               throw ex;
-                       }
                        assertTrue(
                                ExceptionUtils.findThrowableWithMessage(ex, 
ExecutionGraphBuilder.PARALLELISM_AUTO_MAX_ERROR_MESSAGE).isPresent());
                }
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 1a0520f..3763f65 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -75,7 +74,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Scanner;
 import java.util.Set;
 import java.util.UUID;
@@ -153,7 +151,7 @@ public abstract class YarnTestBase extends TestLogger {
 
        protected org.apache.flink.configuration.Configuration 
flinkConfiguration;
 
-       protected boolean isNewMode;
+       protected final boolean isNewMode = true;
 
        static {
                YARN_CONFIGURATION = new YarnConfiguration();
@@ -198,8 +196,6 @@ public abstract class YarnTestBase extends TestLogger {
                }
 
                flinkConfiguration = new 
org.apache.flink.configuration.Configuration(globalConfiguration);
-
-               isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, 
TestBaseUtils.getCodebaseType());
        }
 
        /**
@@ -552,9 +548,6 @@ public abstract class YarnTestBase extends TestLogger {
 
                        FileUtils.copyDirectory(new File(confDirPath), 
tempConfPathForSecureRun);
 
-                       globalConfiguration.setString(CoreOptions.MODE,
-                               Objects.equals(TestBaseUtils.CodebaseType.NEW, 
TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : 
CoreOptions.LEGACY_MODE);
-
                        BootstrapTools.writeConfiguration(
                                globalConfiguration,
                                new File(tempConfPathForSecureRun, 
"flink-conf.yaml"));

Reply via email to