TisonKun closed pull request #6716: [hotfix] [yarn-test] Clean up inactive test URL: https://github.com/apache/flink/pull/6716
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java deleted file mode 100644 index 785dff9c0c7..00000000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; -import org.apache.flink.runtime.testutils.TestingResourceManager; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.JvmShutdownSafeguard; -import org.apache.flink.runtime.util.SignalHandler; - -/** - * Yarn application master which starts the {@link TestingYarnJobManager}, - * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}. - */ -public class TestingApplicationMaster extends YarnApplicationMasterRunner { - - @Override - public Class<? extends JobManager> getJobManagerClass() { - return TestingYarnJobManager.class; - } - - @Override - public Class<? extends MemoryArchivist> getArchivistClass() { - return TestingMemoryArchivist.class; - } - - @Override - protected Class<? extends TaskManager> getTaskManagerClass() { - return TestingYarnTaskManager.class; - } - - @Override - public Class<? extends YarnFlinkResourceManager> getResourceManagerClass() { - return TestingYarnFlinkResourceManager.class; - } - - public static void main(String[] args) { - EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args); - SignalHandler.register(LOG); - JvmShutdownSafeguard.installAsShutdownHook(LOG); - - // run and exit with the proper return code - int returnCode = new TestingApplicationMaster().run(args); - System.exit(returnCode); - } - -} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java deleted file mode 100644 index 37b8d410a5d..00000000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.client.deployment.ClusterSpecification; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.util.Preconditions; - -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; - -import java.io.File; -import java.io.FilenameFilter; -import java.util.ArrayList; -import java.util.List; - -/** - * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the - * flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set of files which - * are shipped to the yarn cluster. This is necessary to load the testing classes. - */ -public class TestingYarnClusterDescriptor extends LegacyYarnClusterDescriptor { - - public TestingYarnClusterDescriptor( - Configuration configuration, - YarnConfiguration yarnConfiguration, - String configurationDirectory, - YarnClient yarnClient, - boolean sharedYarnClient) { - super( - configuration, - yarnConfiguration, - configurationDirectory, - yarnClient, - sharedYarnClient); - List<File> filesToShip = new ArrayList<>(); - - File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests")); - Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " + - "Make sure to package the flink-yarn-tests module."); - - File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime")); - Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " + - "jar. Make sure to package the flink-runtime module."); - - File testingYarnJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn")); - Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-yarn tests " + - "jar. Make sure to package the flink-yarn module."); - - filesToShip.add(testingJar); - filesToShip.add(testingRuntimeJar); - filesToShip.add(testingYarnJar); - - addShipFiles(filesToShip); - } - - @Override - protected String getYarnSessionClusterEntrypoint() { - return TestingApplicationMaster.class.getName(); - } - - @Override - protected String getYarnJobClusterEntrypoint() { - throw new UnsupportedOperationException("Does not support Yarn per-job clusters."); - } - - @Override - public YarnClusterClient deployJobCluster( - ClusterSpecification clusterSpecification, - JobGraph jobGraph, - boolean detached) { - throw new UnsupportedOperationException("Cannot deploy a per-job cluster yet."); - } - - static class TestJarFinder implements FilenameFilter { - - private final String jarName; - - TestJarFinder(final String jarName) { - this.jarName = jarName; - } - - @Override - public boolean accept(File dir, String name) { - return name.startsWith(jarName) && name.endsWith("-tests.jar") && - dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator); - } - } -} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java deleted file mode 100644 index ff030bec284..00000000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.runtime.security.SecurityUtils; - -import java.io.IOException; - -/** - * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}. - */ -public class TestingYarnTaskManagerRunner { - public static void main(String[] args) throws IOException { - YarnTaskManagerRunnerFactory.Runner tmRunner = YarnTaskManagerRunnerFactory.create( - args, TestingYarnTaskManager.class, System.getenv()); - - try { - SecurityUtils.getInstalledContext().runSecured(tmRunner); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java deleted file mode 100644 index 9a8f5033f3f..00000000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.client.deployment.ClusterSpecification; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; - -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.testkit.JavaTestKit; -import org.apache.curator.test.TestingServer; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assume.assumeTrue; - -/** - * Tests that verify correct HA behavior. - */ -public class YARNHighAvailabilityITCase extends YarnTestBase { - - private static TestingServer zkServer; - - private static ActorSystem actorSystem; - - private static final int numberApplicationAttempts = 3; - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - @BeforeClass - public static void setup() { - actorSystem = AkkaUtils.createDefaultActorSystem(); - - try { - zkServer = new TestingServer(); - zkServer.start(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Could not start ZooKeeper testing cluster."); - } - - YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha"); - YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts); - - startYARNWithConfig(YARN_CONFIGURATION); - } - - @AfterClass - public static void teardown() throws Exception { - if (zkServer != null) { - zkServer.stop(); - } - - JavaTestKit.shutdownActorSystem(actorSystem); - actorSystem = null; - } - - /** - * Tests that the application master can be killed multiple times and that the surviving - * TaskManager successfully reconnects to the newly started JobManager. - * @throws Exception - */ - @Test - public void testMultipleAMKill() throws Exception { - assumeTrue("This test only works with the old actor based code.", !isNewMode); - final int numberKillingAttempts = numberApplicationAttempts - 1; - String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); - final Configuration configuration = GlobalConfiguration.loadConfiguration(); - TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor( - configuration, - getYarnConfiguration(), - confDirPath, - getYarnClient(), - true); - - Assert.assertNotNull("unable to get yarn client", flinkYarnClient); - flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); - - String fsStateHandlePath = temp.getRoot().getPath(); - - // load the configuration - File configDirectory = new File(confDirPath); - GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); - - flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + - zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + - "@@" + CheckpointingOptions.STATE_BACKEND.key() + "=FILESYSTEM" + - "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" + - "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery"); - - ClusterClient<ApplicationId> yarnClusterClient = null; - - final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); - - HighAvailabilityServices highAvailabilityServices = null; - - final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(768) - .setTaskManagerMemoryMB(1024) - .setNumberTaskManagers(1) - .setSlotsPerTaskManager(1) - .createClusterSpecification(); - - try { - yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification); - - highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - yarnClusterClient.getFlinkConfiguration(), - Executors.directExecutor(), - HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); - - final HighAvailabilityServices finalHighAvailabilityServices = highAvailabilityServices; - - new JavaTestKit(actorSystem) {{ - for (int attempt = 0; attempt < numberKillingAttempts; attempt++) { - new Within(timeout) { - @Override - protected void run() { - try { - ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway( - finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), - actorSystem, - timeout); - ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID()); - - gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway); - - expectMsgEquals(Acknowledge.get()); - - gateway.tell(PoisonPill.getInstance()); - } catch (Exception e) { - throw new AssertionError("Could not complete test.", e); - } - } - }; - } - - new Within(timeout) { - @Override - protected void run() { - try { - ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway( - finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), - actorSystem, - timeout); - ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID()); - - gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway); - - expectMsgEquals(Acknowledge.get()); - } catch (Exception e) { - throw new AssertionError("Could not complete test.", e); - } - } - }; - - }}; - } finally { - if (yarnClusterClient != null) { - log.info("Shutting down the Flink Yarn application."); - yarnClusterClient.shutDownCluster(); - yarnClusterClient.shutdown(); - } - - if (highAvailabilityServices != null) { - highAvailabilityServices.closeAndCleanupAllData(); - } - } - } -} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java deleted file mode 100644 index 758a09866d0..00000000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.client.deployment.ClusterSpecification; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.File; -import java.util.Arrays; -import java.util.Random; - -/** - * Test cases for the deployment of Yarn Flink clusters. - */ -public class YARNITCase extends YarnTestBase { - - @BeforeClass - public static void setup() { - YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha"); - startYARNWithConfig(YARN_CONFIGURATION); - } - - @Ignore("The cluster cannot be stopped yet.") - @Test - public void testPerJobMode() throws Exception { - Configuration configuration = new Configuration(); - configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); - final YarnClient yarnClient = getYarnClient(); - - try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( - configuration, - getYarnConfiguration(), - System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR), - yarnClient, - true)) { - - yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); - yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); - - final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(768) - .setTaskManagerMemoryMB(1024) - .setSlotsPerTaskManager(1) - .setNumberTaskManagers(1) - .createClusterSpecification(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(2); - - env.addSource(new InfiniteSource()) - .shuffle() - .addSink(new DiscardingSink<Integer>()); - - final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - - File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests")); - - jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); - - ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster( - clusterSpecification, - jobGraph, - true); - - clusterClient.shutdown(); - } - } - - private static class InfiniteSource implements ParallelSourceFunction<Integer> { - - private static final long serialVersionUID = 1642561062000662861L; - private volatile boolean running; - private final Random random; - - InfiniteSource() { - running = true; - random = new Random(); - } - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - while (running) { - synchronized (ctx.getCheckpointLock()) { - ctx.collect(random.nextInt()); - } - - Thread.sleep(5L); - } - } - - @Override - public void cancel() { - running = false; - } - } -} diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala deleted file mode 100644 index d0084b6cf8c..00000000000 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn - -import java.util.concurrent.{Executor, ScheduledExecutorService} - -import akka.actor.ActorRef -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.blob.BlobServer -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler -import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup -import org.apache.flink.runtime.testingUtils.TestingJobManagerLike - -import scala.concurrent.duration.FiniteDuration - -/** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin. - * - * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition - * instead of an anonymous class with the respective mixin to obtain a more readable logger name. - * - * @param flinkConfiguration Configuration object for the actor - * @param futureExecutor Execution context which is used to execute concurrent tasks in the - * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] - * @param ioExecutor for blocking io operations - * @param instanceManager Instance manager to manage the registered - * [[org.apache.flink.runtime.taskmanager.TaskManager]] - * @param scheduler Scheduler to schedule Flink jobs - * @param libraryCacheManager Manager to manage uploaded jar files - * @param archive Archive for finished Flink jobs - * @param restartStrategyFactory Default restart strategy for job restarts - * @param timeout Timeout for futures - * @param leaderElectionService LeaderElectionService to participate in the leader election - */ -class TestingYarnJobManager( - flinkConfiguration: Configuration, - futureExecutor: ScheduledExecutorService, - ioExecutor: Executor, - instanceManager: InstanceManager, - scheduler: Scheduler, - blobServer: BlobServer, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphs : SubmittedJobGraphStore, - checkpointRecoveryFactory : CheckpointRecoveryFactory, - jobRecoveryTimeout: FiniteDuration, - jobManagerMetricGroup : JobManagerMetricGroup, - optRestAddress: Option[String]) - extends YarnJobManager( - flinkConfiguration, - futureExecutor, - ioExecutor, - instanceManager, - scheduler, - blobServer, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - leaderElectionService, - submittedJobGraphs, - checkpointRecoveryFactory, - jobRecoveryTimeout, - jobManagerMetricGroup, - optRestAddress) - with TestingJobManagerLike {} diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala deleted file mode 100644 index 9dd0b282119..00000000000 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn - -import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.highavailability.HighAvailabilityServices -import org.apache.flink.runtime.io.disk.iomanager.IOManager -import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup -import org.apache.flink.runtime.security.SecurityUtils -import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration -import org.apache.flink.runtime.taskmanager.TaskManagerLocation -import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike - -/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin. - * - * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition - * instead of an anonymous class with the respective mixin to obtain a more readable logger name. - * - * @param config Configuration object for the actor - * @param resourceID The Yarn container id - * @param connectionInfo Connection information of this actor - * @param memoryManager MemoryManager which is responsible for Flink's managed memory allocation - * @param ioManager IOManager responsible for I/O - * @param network NetworkEnvironment for this actor - * @param taskManagerLocalStateStoresManager Task manager state store manager for this actor - * @param numberOfSlots Number of slots for this TaskManager - * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval - * service for retrieving the leading JobManager - */ -class TestingYarnTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, - numberOfSlots: Int, - highAvailabilityServices: HighAvailabilityServices, - taskManagerMetricGroup : TaskManagerMetricGroup) - extends YarnTaskManager( - config, - resourceID, - connectionInfo, - memoryManager, - ioManager, - network, - taskManagerLocalStateStoresManager, - numberOfSlots, - highAvailabilityServices, - taskManagerMetricGroup) - with TestingTaskManagerLike { - - object YarnTaskManager { - - /** Entry point (main method) to run the TaskManager on YARN. - * - * @param args The command line arguments. - */ - def main(args: Array[String]): Unit = { - val tmRunner = YarnTaskManagerRunnerFactory.create( - args, classOf[TestingYarnTaskManager], System.getenv()) - - try { - SecurityUtils.getInstalledContext.runSecured(tmRunner) - } catch { - case e: Exception => - throw new RuntimeException(e) - } - } - - } -} - ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services