TisonKun closed pull request #6716: [hotfix] [yarn-test] Clean up inactive test
URL: https://github.com/apache/flink/pull/6716
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
deleted file mode 100644
index 785dff9c0c7..00000000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-
-/**
- * Yarn application master which starts the {@link TestingYarnJobManager},
- * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}.
- */
-public class TestingApplicationMaster extends YarnApplicationMasterRunner {
-
-       @Override
-       public Class<? extends JobManager> getJobManagerClass() {
-               return TestingYarnJobManager.class;
-       }
-
-       @Override
-       public Class<? extends MemoryArchivist> getArchivistClass() {
-               return TestingMemoryArchivist.class;
-       }
-
-       @Override
-       protected Class<? extends TaskManager> getTaskManagerClass() {
-               return TestingYarnTaskManager.class;
-       }
-
-       @Override
-       public Class<? extends YarnFlinkResourceManager> 
getResourceManagerClass() {
-               return TestingYarnFlinkResourceManager.class;
-       }
-
-       public static void main(String[] args) {
-               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / JobManager", args);
-               SignalHandler.register(LOG);
-               JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-               // run and exit with the proper return code
-               int returnCode = new TestingApplicationMaster().run(args);
-               System.exit(returnCode);
-       }
-
-}
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
deleted file mode 100644
index 37b8d410a5d..00000000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Yarn client which starts a {@link TestingApplicationMaster}. Additionally 
the client adds the
- * flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set 
of files which
- * are shipped to the yarn cluster. This is necessary to load the testing 
classes.
- */
-public class TestingYarnClusterDescriptor extends LegacyYarnClusterDescriptor {
-
-       public TestingYarnClusterDescriptor(
-                       Configuration configuration,
-                       YarnConfiguration yarnConfiguration,
-                       String configurationDirectory,
-                       YarnClient yarnClient,
-                       boolean sharedYarnClient) {
-               super(
-                       configuration,
-                       yarnConfiguration,
-                       configurationDirectory,
-                       yarnClient,
-                       sharedYarnClient);
-               List<File> filesToShip = new ArrayList<>();
-
-               File testingJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn-tests"));
-               Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
-                       "Make sure to package the flink-yarn-tests module.");
-
-               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-runtime"));
-               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
-                       "jar. Make sure to package the flink-runtime module.");
-
-               File testingYarnJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn"));
-               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-yarn tests " +
-                       "jar. Make sure to package the flink-yarn module.");
-
-               filesToShip.add(testingJar);
-               filesToShip.add(testingRuntimeJar);
-               filesToShip.add(testingYarnJar);
-
-               addShipFiles(filesToShip);
-       }
-
-       @Override
-       protected String getYarnSessionClusterEntrypoint() {
-               return TestingApplicationMaster.class.getName();
-       }
-
-       @Override
-       protected String getYarnJobClusterEntrypoint() {
-               throw new UnsupportedOperationException("Does not support Yarn 
per-job clusters.");
-       }
-
-       @Override
-       public YarnClusterClient deployJobCluster(
-                       ClusterSpecification clusterSpecification,
-                       JobGraph jobGraph,
-                       boolean detached) {
-               throw new UnsupportedOperationException("Cannot deploy a 
per-job cluster yet.");
-       }
-
-       static class TestJarFinder implements FilenameFilter {
-
-               private final String jarName;
-
-               TestJarFinder(final String jarName) {
-                       this.jarName = jarName;
-               }
-
-               @Override
-               public boolean accept(File dir, String name) {
-                       return name.startsWith(jarName) && 
name.endsWith("-tests.jar") &&
-                               dir.getAbsolutePath().contains(dir.separator + 
jarName + dir.separator);
-               }
-       }
-}
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
deleted file mode 100644
index ff030bec284..00000000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.runtime.security.SecurityUtils;
-
-import java.io.IOException;
-
-/**
- * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
- */
-public class TestingYarnTaskManagerRunner {
-       public static void main(String[] args) throws IOException {
-               YarnTaskManagerRunnerFactory.Runner tmRunner = 
YarnTaskManagerRunnerFactory.create(
-                       args, TestingYarnTaskManager.class, System.getenv());
-
-               try {
-                       
SecurityUtils.getInstalledContext().runSecured(tmRunner);
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-       }
-}
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
deleted file mode 100644
index 9a8f5033f3f..00000000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
-import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assume.assumeTrue;
-
-/**
- * Tests that verify correct HA behavior.
- */
-public class YARNHighAvailabilityITCase extends YarnTestBase {
-
-       private static TestingServer zkServer;
-
-       private static ActorSystem actorSystem;
-
-       private static final int numberApplicationAttempts = 3;
-
-       @Rule
-       public TemporaryFolder temp = new TemporaryFolder();
-
-       @BeforeClass
-       public static void setup() {
-               actorSystem = AkkaUtils.createDefaultActorSystem();
-
-               try {
-                       zkServer = new TestingServer();
-                       zkServer.start();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Could not start ZooKeeper testing 
cluster.");
-               }
-
-               YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-               YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" 
+ numberApplicationAttempts);
-
-               startYARNWithConfig(YARN_CONFIGURATION);
-       }
-
-       @AfterClass
-       public static void teardown() throws Exception {
-               if (zkServer != null) {
-                       zkServer.stop();
-               }
-
-               JavaTestKit.shutdownActorSystem(actorSystem);
-               actorSystem = null;
-       }
-
-       /**
-        * Tests that the application master can be killed multiple times and 
that the surviving
-        * TaskManager successfully reconnects to the newly started JobManager.
-        * @throws Exception
-        */
-       @Test
-       public void testMultipleAMKill() throws Exception {
-               assumeTrue("This test only works with the old actor based 
code.", !isNewMode);
-               final int numberKillingAttempts = numberApplicationAttempts - 1;
-               String confDirPath = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-               final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
-               TestingYarnClusterDescriptor flinkYarnClient = new 
TestingYarnClusterDescriptor(
-                       configuration,
-                       getYarnConfiguration(),
-                       confDirPath,
-                       getYarnClient(),
-                       true);
-
-               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
-               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-               String fsStateHandlePath = temp.getRoot().getPath();
-
-               // load the configuration
-               File configDirectory = new File(confDirPath);
-               
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-
-               
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
-                       zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
-                       "@@" + CheckpointingOptions.STATE_BACKEND.key() + 
"=FILESYSTEM" +
-                       "@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" 
+ fsStateHandlePath + "/checkpoints" +
-                       "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
"=" + fsStateHandlePath + "/recovery");
-
-               ClusterClient<ApplicationId> yarnClusterClient = null;
-
-               final FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
-
-               HighAvailabilityServices highAvailabilityServices = null;
-
-               final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                       .setMasterMemoryMB(768)
-                       .setTaskManagerMemoryMB(1024)
-                       .setNumberTaskManagers(1)
-                       .setSlotsPerTaskManager(1)
-                       .createClusterSpecification();
-
-               try {
-                       yarnClusterClient = 
flinkYarnClient.deploySessionCluster(clusterSpecification);
-
-                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               yarnClusterClient.getFlinkConfiguration(),
-                               Executors.directExecutor(),
-                               
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-                       final HighAvailabilityServices 
finalHighAvailabilityServices = highAvailabilityServices;
-
-                       new JavaTestKit(actorSystem) {{
-                               for (int attempt = 0; attempt < 
numberKillingAttempts; attempt++) {
-                                       new Within(timeout) {
-                                               @Override
-                                               protected void run() {
-                                                       try {
-                                                               ActorGateway 
gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-                                                                       
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                                                                       
actorSystem,
-                                                                       
timeout);
-                                                               ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-                                                               
gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                               
expectMsgEquals(Acknowledge.get());
-
-                                                               
gateway.tell(PoisonPill.getInstance());
-                                                       } catch (Exception e) {
-                                                               throw new 
AssertionError("Could not complete test.", e);
-                                                       }
-                                               }
-                                       };
-                               }
-
-                               new Within(timeout) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       ActorGateway gateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
-                                                               
finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                                                               actorSystem,
-                                                               timeout);
-                                                       ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-                                                       gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                       
expectMsgEquals(Acknowledge.get());
-                                               } catch (Exception e) {
-                                                       throw new 
AssertionError("Could not complete test.", e);
-                                               }
-                                       }
-                               };
-
-                       }};
-               } finally {
-                       if (yarnClusterClient != null) {
-                               log.info("Shutting down the Flink Yarn 
application.");
-                               yarnClusterClient.shutDownCluster();
-                               yarnClusterClient.shutdown();
-                       }
-
-                       if (highAvailabilityServices != null) {
-                               
highAvailabilityServices.closeAndCleanupAllData();
-                       }
-               }
-       }
-}
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
deleted file mode 100644
index 758a09866d0..00000000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Random;
-
-/**
- * Test cases for the deployment of Yarn Flink clusters.
- */
-public class YARNITCase extends YarnTestBase {
-
-       @BeforeClass
-       public static void setup() {
-               YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-               startYARNWithConfig(YARN_CONFIGURATION);
-       }
-
-       @Ignore("The cluster cannot be stopped yet.")
-       @Test
-       public void testPerJobMode() throws Exception {
-               Configuration configuration = new Configuration();
-               configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
-               final YarnClient yarnClient = getYarnClient();
-
-               try (final YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor(
-                       configuration,
-                       getYarnConfiguration(),
-                       System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
-                       yarnClient,
-                       true)) {
-
-                       yarnClusterDescriptor.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-                       
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-                       final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                               .setMasterMemoryMB(768)
-                               .setTaskManagerMemoryMB(1024)
-                               .setSlotsPerTaskManager(1)
-                               .setNumberTaskManagers(1)
-                               .createClusterSpecification();
-
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setParallelism(2);
-
-                       env.addSource(new InfiniteSource())
-                               .shuffle()
-                               .addSink(new DiscardingSink<Integer>());
-
-                       final JobGraph jobGraph = 
env.getStreamGraph().getJobGraph();
-
-                       File testingJar = YarnTestBase.findFile("..", new 
TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
-
-                       jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
-
-                       ClusterClient<ApplicationId> clusterClient = 
yarnClusterDescriptor.deployJobCluster(
-                               clusterSpecification,
-                               jobGraph,
-                               true);
-
-                       clusterClient.shutdown();
-               }
-       }
-
-       private static class InfiniteSource implements 
ParallelSourceFunction<Integer> {
-
-               private static final long serialVersionUID = 
1642561062000662861L;
-               private volatile boolean running;
-               private final Random random;
-
-               InfiniteSource() {
-                       running = true;
-                       random = new Random();
-               }
-
-               @Override
-               public void run(SourceContext<Integer> ctx) throws Exception {
-                       while (running) {
-                               synchronized (ctx.getCheckpointLock()) {
-                                       ctx.collect(random.nextInt());
-                               }
-
-                               Thread.sleep(5L);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-       }
-}
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
deleted file mode 100644
index d0084b6cf8c..00000000000
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import java.util.concurrent.{Executor, ScheduledExecutorService}
-
-import akka.actor.ActorRef
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
-
-import scala.concurrent.duration.FiniteDuration
-
-/** [[YarnJobManager]] implementation which mixes in the 
[[TestingJobManagerLike]] mixin.
-  *
-  * This actor class is used for testing purposes on Yarn. Here we use an 
explicit class definition
-  * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
-  *
-  * @param flinkConfiguration Configuration object for the actor
-  * @param futureExecutor Execution context which is used to execute 
concurrent tasks in the
-  *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
-  * @param ioExecutor for blocking io operations
-  * @param instanceManager Instance manager to manage the registered
-  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
-  * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
-  * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Default restart strategy for job restarts
-  * @param timeout Timeout for futures
-  * @param leaderElectionService LeaderElectionService to participate in the 
leader election
-  */
-class TestingYarnJobManager(
-    flinkConfiguration: Configuration,
-    futureExecutor: ScheduledExecutorService,
-    ioExecutor: Executor,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    blobServer: BlobServer,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    jobRecoveryTimeout: FiniteDuration,
-    jobManagerMetricGroup : JobManagerMetricGroup,
-    optRestAddress: Option[String])
-  extends YarnJobManager(
-    flinkConfiguration,
-    futureExecutor,
-    ioExecutor,
-    instanceManager,
-    scheduler,
-    blobServer,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    jobRecoveryTimeout,
-    jobManagerMetricGroup,
-    optRestAddress)
-  with TestingJobManagerLike {}
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
deleted file mode 100644
index 9dd0b282119..00000000000
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
-import org.apache.flink.runtime.security.SecurityUtils
-import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
-import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
-
-/** [[YarnTaskManager]] implementation which mixes in the 
[[TestingTaskManagerLike]] mixin.
-  *
-  * This actor class is used for testing purposes on Yarn. Here we use an 
explicit class definition
-  * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
-  *
-  * @param config Configuration object for the actor
-  * @param resourceID The Yarn container id
-  * @param connectionInfo Connection information of this actor
-  * @param memoryManager MemoryManager which is responsible for Flink's 
managed memory allocation
-  * @param ioManager IOManager responsible for I/O
-  * @param network NetworkEnvironment for this actor
-  * @param taskManagerLocalStateStoresManager Task manager state store manager 
for this actor
-  * @param numberOfSlots Number of slots for this TaskManager
-  * @param highAvailabilityServices [[HighAvailabilityServices]] to create a 
leader retrieval
-  *                                service for retrieving the leading 
JobManager
-  */
-class TestingYarnTaskManager(
-    config: TaskManagerConfiguration,
-    resourceID: ResourceID,
-    connectionInfo: TaskManagerLocation,
-    memoryManager: MemoryManager,
-    ioManager: IOManager,
-    network: NetworkEnvironment,
-    taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
-    numberOfSlots: Int,
-    highAvailabilityServices: HighAvailabilityServices,
-    taskManagerMetricGroup : TaskManagerMetricGroup)
-  extends YarnTaskManager(
-    config,
-    resourceID,
-    connectionInfo,
-    memoryManager,
-    ioManager,
-    network,
-    taskManagerLocalStateStoresManager,
-    numberOfSlots,
-    highAvailabilityServices,
-    taskManagerMetricGroup)
-  with TestingTaskManagerLike {
-
-  object YarnTaskManager {
-
-    /** Entry point (main method) to run the TaskManager on YARN.
-      *
-      * @param args The command line arguments.
-      */
-    def main(args: Array[String]): Unit = {
-      val tmRunner = YarnTaskManagerRunnerFactory.create(
-        args, classOf[TestingYarnTaskManager], System.getenv())
-
-      try {
-        SecurityUtils.getInstalledContext.runSecured(tmRunner)
-      } catch {
-        case e: Exception =>
-          throw new RuntimeException(e)
-      }
-    }
-
-  }
-}
-


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to