This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd9b5de41eb9935c7ff1350f3b6e5d818104c677 Author: Till Rohrmann <[email protected]> AuthorDate: Sat Sep 22 19:54:07 2018 +0200 [FLINK-10396] Remove CodebaseType CodebaseType was used to distinguish between the legacy and new mode. This commit removes the CodebaseType and the codebase switch in the MiniClusterResource. This closes #6748. --- .../runtime/webmonitor/WebFrontendITCase.java | 36 +++------- .../webmonitor/handlers/JarRunHandlerTest.java | 2 - .../webmonitor/history/HistoryServerTest.java | 2 - .../apache/flink/api/scala/ScalaShellITCase.scala | 44 +++++-------- .../api/scala/ScalaShellLocalStartupITCase.scala | 9 +-- .../flink/test/util/MiniClusterResource.java | 76 +--------------------- .../util/MiniClusterResourceConfiguration.java | 24 +------ .../org/apache/flink/test/util/TestBaseUtils.java | 22 ------- .../test/example/client/JobRetrievalITCase.java | 2 - .../flink/test/misc/AutoParallelismITCase.java | 4 -- .../java/org/apache/flink/yarn/YarnTestBase.java | 9 +-- 11 files changed, 28 insertions(+), 202 deletions(-) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index b90277f..3ae830d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -152,11 +152,7 @@ public class WebFrontendITCase extends TestLogger { if (notFoundJobConnection.getResponseCode() >= 400) { // we don't set the content-encoding header Assert.assertNull(notFoundJobConnection.getContentEncoding()); - if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) { - Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType()); - } else { - Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType()); - } + Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType()); } else { throw new RuntimeException("Request for non-existing job did not return an error."); } @@ -280,23 +276,13 @@ public class WebFrontendITCase extends TestLogger { final Deadline deadline = testTimeout.fromNow(); try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { - if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) { - // stop the job - client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft()); - HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); - - assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.getType()); - assertEquals("{}", response.getContent()); - } else { - // stop the job - client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft()); - HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); - - assertEquals(HttpResponseStatus.OK, response.getStatus()); - assertEquals("application/json; charset=UTF-8", response.getType()); - assertEquals("{}", response.getContent()); - } + // stop the job + client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); + assertEquals("application/json; charset=UTF-8", response.getType()); + assertEquals("{}", response.getContent()); } // wait for cancellation to finish @@ -355,11 +341,7 @@ public class WebFrontendITCase extends TestLogger { HttpTestClient.SimpleHttpResponse response = client .getNextResponse(deadline.timeLeft()); - if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) { - assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); - } else { - assertEquals(HttpResponseStatus.OK, response.getStatus()); - } + assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); assertEquals("application/json; charset=UTF-8", response.getType()); assertEquals("{}", response.getContent()); } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java index 6427f4d..a2138e1 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.ExceptionUtils; import org.junit.ClassRule; @@ -67,7 +66,6 @@ public class JarRunHandlerTest { .setConfiguration(config) .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) - .setCodebaseType(TestBaseUtils.CodebaseType.NEW) .build()); clusterResource.before(); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 4451e6d..18dd76e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -81,7 +80,6 @@ public class HistoryServerTest extends TestLogger { .setConfiguration(clusterConfig) .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) - .setCodebaseType(TestBaseUtils.CodebaseType.NEW) .build()); cluster.before(); } diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index c6b43c2..6a41793 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -19,13 +19,11 @@ package org.apache.flink.api.scala import java.io._ -import java.util.Objects import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions} import org.apache.flink.runtime.clusterframework.BootstrapTools import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster} -import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils} -import org.apache.flink.test.util.TestBaseUtils.CodebaseType +import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.util.TestLogger import org.junit._ import org.junit.rules.TemporaryFolder @@ -322,32 +320,20 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { - val isNew = TestBaseUtils.isNewCodebase() - if (isNew) { - configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) - // set to different than default so not to interfere with ScalaShellLocalStartupITCase - configuration.setInteger(RestOptions.PORT, 8082) - val miniConfig = new MiniClusterConfiguration.Builder() - .setConfiguration(configuration) - .setNumSlotsPerTaskManager(parallelism) - .build() - - val miniCluster = new MiniCluster(miniConfig) - miniCluster.start() - port = miniCluster.getRestAddress.getPort - hostname = miniCluster.getRestAddress.getHost - - cluster = Some(Left(miniCluster)) - } else { - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE) - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism) - val standaloneCluster = new StandaloneMiniCluster(configuration) - - hostname = standaloneCluster.getHostname - port = standaloneCluster.getPort - - cluster = Some(Right(standaloneCluster)) - } + configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) + // set to different than default so not to interfere with ScalaShellLocalStartupITCase + configuration.setInteger(RestOptions.PORT, 8082) + val miniConfig = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(parallelism) + .build() + + val miniCluster = new MiniCluster(miniConfig) + miniCluster.start() + port = miniCluster.getRestAddress.getPort + hostname = miniCluster.getRestAddress.getHost + + cluster = Some(Left(miniCluster)) } @AfterClass diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala index f13f57b..3952a0f 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala @@ -20,9 +20,8 @@ package org.apache.flink.api.scala import java.io._ -import org.apache.flink.configuration.{Configuration, CoreOptions} +import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.clusterframework.BootstrapTools -import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.util.TestLogger import org.junit.rules.TemporaryFolder import org.junit.{Assert, Rule, Test} @@ -86,12 +85,6 @@ class ScalaShellLocalStartupITCase extends TestLogger { System.setOut(new PrintStream(baos)) val configuration = new Configuration() - val mode = if (TestBaseUtils.isNewCodebase()) { - CoreOptions.NEW_MODE - } else { - CoreOptions.LEGACY_MODE - } - configuration.setString(CoreOptions.MODE, mode) val dir = temporaryFolder.newFolder() BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml")) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 1835fc6..9140bb4 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -19,10 +19,7 @@ package org.apache.flink.test.util; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.DefaultActorSystemLoader; import org.apache.flink.client.program.MiniClusterClient; -import org.apache.flink.client.program.StandaloneClusterClient; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -30,17 +27,12 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.minicluster.JobExecutorService; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; -import akka.actor.ActorSystem; -import org.junit.Assume; import org.junit.rules.ExternalResource; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -49,11 +41,6 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import scala.Option; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; - /** * Starts a Flink mini cluster as a resource and registers the respective * ExecutionEnvironment and StreamExecutionEnvironment. @@ -66,8 +53,6 @@ public class MiniClusterResource extends ExternalResource { private final MiniClusterResourceConfiguration miniClusterResourceConfiguration; - private final TestBaseUtils.CodebaseType codebaseType; - private JobExecutorService jobExecutorService; private ClusterClient<?> clusterClient; @@ -82,11 +67,6 @@ public class MiniClusterResource extends ExternalResource { public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration); - this.codebaseType = miniClusterResourceConfiguration.getCodebaseType(); - } - - public TestBaseUtils.CodebaseType getCodebaseType() { - return codebaseType; } public int getNumberSlots() { @@ -111,12 +91,9 @@ public class MiniClusterResource extends ExternalResource { @Override public void before() throws Exception { - // verify that we are running in the correct test profile - Assume.assumeThat(TestBaseUtils.getCodebaseType(), is(equalTo(codebaseType))); - temporaryFolder.create(); - startJobExecutorService(codebaseType); + startMiniCluster(); numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers(); @@ -163,57 +140,6 @@ public class MiniClusterResource extends ExternalResource { } } - private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception { - switch (miniClusterType) { - case LEGACY: - startLegacyMiniCluster(); - break; - case NEW: - startMiniCluster(); - break; - default: - throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.'); - } - } - - private void startLegacyMiniCluster() throws Exception { - final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration()); - configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers()); - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager()); - configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); - - final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster( - configuration, - miniClusterResourceConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED); - - jobExecutorService = flinkMiniCluster; - - switch (miniClusterResourceConfiguration.getRpcServiceSharing()) { - case SHARED: - Option<ActorSystem> actorSystemOption = flinkMiniCluster.firstActorSystem(); - Preconditions.checkState(actorSystemOption.isDefined()); - - final ActorSystem actorSystem = actorSystemOption.get(); - clusterClient = new StandaloneClusterClient( - configuration, - flinkMiniCluster.highAvailabilityServices(), - true, - new DefaultActorSystemLoader(actorSystem)); - break; - case DEDICATED: - clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true); - break; - } - - Configuration restClientConfig = new Configuration(); - restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort()); - this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); - - if (flinkMiniCluster.webMonitor().isDefined()) { - webUIPort = flinkMiniCluster.webMonitor().get().getServerPort(); - } - } - private void startMiniCluster() throws Exception { final Configuration configuration = miniClusterResourceConfiguration.getConfiguration(); configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java index c938920..bd521a2 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java @@ -37,8 +37,6 @@ public class MiniClusterResourceConfiguration { private final Time shutdownTimeout; - private final TestBaseUtils.CodebaseType codebaseType; - private final RpcServiceSharing rpcServiceSharing; MiniClusterResourceConfiguration( @@ -46,13 +44,11 @@ public class MiniClusterResourceConfiguration { int numberTaskManagers, int numberSlotsPerTaskManager, Time shutdownTimeout, - TestBaseUtils.CodebaseType codebaseType, RpcServiceSharing rpcServiceSharing) { this.configuration = Preconditions.checkNotNull(configuration); this.numberTaskManagers = numberTaskManagers; this.numberSlotsPerTaskManager = numberSlotsPerTaskManager; this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout); - this.codebaseType = Preconditions.checkNotNull(codebaseType); this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing); } @@ -72,14 +68,6 @@ public class MiniClusterResourceConfiguration { return shutdownTimeout; } - /** - * @deprecated Will be irrelevant once the legacy mode has been removed. - */ - @Deprecated - public TestBaseUtils.CodebaseType getCodebaseType() { - return codebaseType; - } - public RpcServiceSharing getRpcServiceSharing() { return rpcServiceSharing; } @@ -93,7 +81,6 @@ public class MiniClusterResourceConfiguration { private int numberTaskManagers = 1; private int numberSlotsPerTaskManager = 1; private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration); - private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType(); private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED; @@ -117,22 +104,13 @@ public class MiniClusterResourceConfiguration { return this; } - /** - * @deprecated Will be irrelevant once the legacy mode has been removed. - */ - @Deprecated - public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) { - this.codebaseType = codebaseType; - return this; - } - public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) { this.rpcServiceSharing = rpcServiceSharing; return this; } public MiniClusterResourceConfiguration build() { - return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing); + return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing); } } } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index a0f52f2..363327a 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -44,8 +44,6 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.File; @@ -678,26 +676,6 @@ public class TestBaseUtils extends TestLogger { throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable."); } - @Nonnull - public static CodebaseType getCodebaseType() { - return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY; - } - - public static boolean isNewCodebase() { - return CodebaseType.NEW == getCodebaseType(); - } - - /** - * Type of the mini cluster to start. - * - * @deprecated Will be irrelevant once the legacy mode has been removed. - */ - @Deprecated - public enum CodebaseType { - LEGACY, - NEW - } - /** * Comparator for comparable Tuples. * @param <T> tuple type diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java index 66a894f..56df46e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -58,7 +57,6 @@ public class JobRetrievalITCase extends TestLogger { new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(4) - .setCodebaseType(TestBaseUtils.CodebaseType.NEW) .build()); private RestClusterClient<StandaloneClusterId> client; diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index 4826a46..f62ccf7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -28,7 +28,6 @@ import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -80,9 +79,6 @@ public class AutoParallelismITCase extends TestLogger { assertEquals(PARALLELISM, resultCollection.size()); } catch (Exception ex) { - if (MINI_CLUSTER_RESOURCE.getCodebaseType() == TestBaseUtils.CodebaseType.LEGACY) { - throw ex; - } assertTrue( ExceptionUtils.findThrowableWithMessage(ex, ExecutionGraphBuilder.PARALLELISM_AUTO_MAX_ERROR_MESSAGE).isPresent()); } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 1a0520f..3763f65 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -21,7 +21,6 @@ package org.apache.flink.yarn; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.test.util.TestBaseUtils; @@ -75,7 +74,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Scanner; import java.util.Set; import java.util.UUID; @@ -153,7 +151,7 @@ public abstract class YarnTestBase extends TestLogger { protected org.apache.flink.configuration.Configuration flinkConfiguration; - protected boolean isNewMode; + protected final boolean isNewMode = true; static { YARN_CONFIGURATION = new YarnConfiguration(); @@ -198,8 +196,6 @@ public abstract class YarnTestBase extends TestLogger { } flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); - - isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()); } /** @@ -552,9 +548,6 @@ public abstract class YarnTestBase extends TestLogger { FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun); - globalConfiguration.setString(CoreOptions.MODE, - Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE); - BootstrapTools.writeConfiguration( globalConfiguration, new File(tempConfPathForSecureRun, "flink-conf.yaml"));
