http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
new file mode 100644
index 0000000..cb402a3
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.configuration.GlobalConfiguration;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+
+import org.apache.log4j.Level;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.flink.yarn.UtilsTest.addTestAppender;
+import static org.apache.flink.yarn.UtilsTest.checkForLogString;
+
+
+/**
+ * This test starts a MiniYARNCluster with a FIFO scheduler.
+ * There are no queues for that scheduler.
+ */
+public class YARNSessionFIFOITCase extends YarnTestBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
+
+       /*
+       Override init with FIFO scheduler.
+        */
+       @BeforeClass
+       public static void setup() {
+               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
FifoScheduler.class, ResourceScheduler.class);
+               yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-fifo");
+               startYARNWithConfig(yarnConfiguration);
+       }
+
+       @After
+       public void checkForProhibitedLogContents() {
+               ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, 
WHITELISTED_STRINGS);
+       }
+
+       /**
+        * Test regular operation, including command line parameter parsing.
+        */
+       @Test(timeout=60000) // timeout after a minute.
+       public void testDetachedMode() {
+               LOG.info("Starting testDetachedMode()");
+               addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
+               Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(),
+                                               "-t", 
flinkLibFolder.getAbsolutePath(),
+                                               "-n", "1",
+                                               "-jm", "768",
+                                               "-tm", "1024",
+                                               "--name", "MyCustomName", // 
test setting a custom name
+                                               "--detached"},
+                               "Flink JobManager is now running on", 
RunTypes.YARN_SESSION);
+
+               checkForLogString("The Flink YARN client has been started in 
detached mode");
+
+               Assert.assertFalse("The runner should detach.", 
runner.isAlive());
+
+               LOG.info("Waiting until two containers are running");
+               // wait until two containers are running
+               while(getRunningContainers() < 2) {
+                       sleep(500);
+               }
+               LOG.info("Two containers are running. Killing the application");
+
+               // kill application "externally".
+               try {
+                       YarnClient yc = YarnClient.createYarnClient();
+                       yc.init(yarnConfiguration);
+                       yc.start();
+                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+                       Assert.assertEquals(1, apps.size()); // Only one running
+                       ApplicationReport app = apps.get(0);
+
+                       Assert.assertEquals("MyCustomName", app.getName());
+                       ApplicationId id = app.getApplicationId();
+                       yc.killApplication(id);
+
+                       
while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
+                               sleep(500);
+                       }
+               } catch(Throwable t) {
+                       LOG.warn("Killing failed", t);
+                       Assert.fail();
+               }
+
+               LOG.info("Finished testDetachedMode()");
+       }
+
+       /**
+        * Test querying the YARN cluster.
+        *
+        * This test validates through 666*2 cores in the "cluster".
+        */
+       @Test
+       public void testQueryCluster() {
+               LOG.info("Starting testQueryCluster()");
+               runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 
totalCores 1332",null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
+               LOG.info("Finished testQueryCluster()");
+       }
+
+       /**
+        * Test deployment to non-existing queue. (user-reported error)
+        * Deployment to the queue is possible because there are no queues, so 
we don't check.
+        */
+       @Test
+       public void testNonexistingQueue() {
+               LOG.info("Starting testNonexistingQueue()");
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+                               "-t", flinkLibFolder.getAbsolutePath(),
+                               "-n", "1",
+                               "-jm", "768",
+                               "-tm", "1024",
+                               "-qu", "doesntExist"}, "Number of connected 
TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
+               LOG.info("Finished testNonexistingQueue()");
+       }
+
+       /**
+        * The test cluster has the following resources:
+        * - 2 Nodes with 4096 MB each.
+        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+        *
+        * We allocate:
+        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
+        * 5 TaskManagers with 1585 MB
+        *
+        * user sees a total request of: 8181 MB (fits)
+        * system sees a total request of: 8437 (doesn't fit due to min alloc 
mb)
+        */
+       @Ignore("The test is too resource consuming (8.5 GB of memory)")
+       @Test
+       public void testResourceComputation() {
+               addTestAppender(FlinkYarnClient.class, Level.WARN);
+               LOG.info("Starting testResourceComputation()");
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
+                               "-n", "5",
+                               "-jm", "256",
+                               "-tm", "1585"}, "Number of connected 
TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
+               LOG.info("Finished testResourceComputation()");
+               checkForLogString("This YARN session requires 8437MB of memory 
in the cluster. There are currently only 8192MB available.");
+       }
+
+       /**
+        * The test cluster has the following resources:
+        * - 2 Nodes with 4096 MB each.
+        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+        *
+        * We allocate:
+        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
+        * 2 TaskManagers with 3840 MB
+        *
+        * the user sees a total request of: 7936 MB (fits)
+        * the system sees a request of: 8192 MB (fits)
+        * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which 
doesn't fit.
+        *
+        * --> check if the system properly rejects allocating this session.
+        */
+       @Ignore("The test is too resource consuming (8 GB of memory)")
+       @Test
+       public void testfullAlloc() {
+               addTestAppender(FlinkYarnClient.class, Level.WARN);
+               LOG.info("Starting testfullAlloc()");
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
+                               "-n", "2",
+                               "-jm", "256",
+                               "-tm", "3840"}, "Number of connected 
TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
+               LOG.info("Finished testfullAlloc()");
+               checkForLogString("There is not enough memory available in the 
YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: 
[4096, 4096]\n" +
+                               "After allocating the JobManager (512MB) and 
(1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
+       }
+
+       /**
+        * Test the YARN Java API
+        */
+       @Test
+       public void testJavaAPI() {
+               final int WAIT_TIME = 15;
+               LOG.info("Starting testJavaAPI()");
+
+               AbstractFlinkYarnClient flinkYarnClient = 
FlinkYarnSessionCli.getFlinkYarnClient();
+               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
+               flinkYarnClient.setTaskManagerCount(1);
+               flinkYarnClient.setJobManagerMemory(768);
+               flinkYarnClient.setTaskManagerMemory(1024);
+               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
+               
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+               String confDirPath = System.getenv("FLINK_CONF_DIR");
+               flinkYarnClient.setConfigurationDirectory(confDirPath);
+               
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
+               flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
+
+               // deploy
+               AbstractFlinkYarnCluster yarnCluster = null;
+               try {
+                       yarnCluster = flinkYarnClient.deploy();
+                       yarnCluster.connectToCluster();
+               } catch (Exception e) {
+                       LOG.warn("Failing test", e);
+                       Assert.fail("Error while deploying YARN cluster: 
"+e.getMessage());
+               }
+               GetClusterStatusResponse expectedStatus = new 
GetClusterStatusResponse(1, 1);
+               for(int second = 0; second < WAIT_TIME * 2; second++) { // run 
"forever"
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               LOG.warn("Interrupted", e);
+                       }
+                       GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
+                       if(status != null && status.equals(expectedStatus)) {
+                               LOG.info("Cluster reached status " + status);
+                               break; // all good, cluster started
+                       }
+                       if(second > WAIT_TIME) {
+                               // we waited for 15 seconds. cluster didn't 
come up correctly
+                               Assert.fail("The custer didn't start after " + 
WAIT_TIME + " seconds");
+                       }
+               }
+
+               // use the cluster
+               Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+               Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+
+               LOG.info("Shutting down cluster. All tests passed");
+               // shutdown cluster
+               yarnCluster.shutdown(false);
+               LOG.info("Finished testJavaAPI()");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
new file mode 100644
index 0000000..fc1e5bc
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -0,0 +1,619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ *
+ * The test is not thread-safe. Parallel execution of tests is not possible!
+ */
+public abstract class YarnTestBase extends TestLogger {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnTestBase.class);
+
+       protected final static PrintStream originalStdout = System.out;
+       protected final static PrintStream originalStderr = System.err;
+
+       protected static String TEST_CLUSTER_NAME_KEY = 
"flink-yarn-minicluster-name";
+
+       protected final static int NUM_NODEMANAGERS = 2;
+
+       /** The tests are scanning for these strings in the final output. */
+       protected final static String[] PROHIBITED_STRINGS = {
+                       "Exception", // we don't want any exceptions to happen
+                       "Started SelectChannelConnector@0.0.0.0:8081" // Jetty 
should start on a random port in YARN mode.
+       };
+
+       /** These strings are white-listed, overriding teh prohibited strings */
+       protected final static String[] WHITELISTED_STRINGS = {
+                       "akka.remote.RemoteTransportExceptionNoStackTrace",
+                       // workaround for annoying InterruptedException logging:
+                   // https://issues.apache.org/jira/browse/YARN-1022
+                       "java.lang.InterruptedException"
+       };
+
+       // Temp directory which is deleted after the unit test.
+       @ClassRule
+       public static TemporaryFolder tmp = new TemporaryFolder();
+
+       protected static MiniYARNCluster yarnCluster = null;
+
+       /**
+        * Uberjar (fat jar) file of Flink
+        */
+       protected static File flinkUberjar;
+
+       protected static final Configuration yarnConfiguration;
+
+       /**
+        * lib/ folder of the flink distribution.
+        */
+       protected static File flinkLibFolder;
+
+       static {
+               yarnConfiguration = new YarnConfiguration();
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
+               
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
+               
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
+               yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
 4);
+               
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+               
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+               yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+               // so we have to change the number of cores for testing.
+               
yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 
20 seconds expiry (to ensure we properly heartbeat with YARN).
+       }
+
+
+
+       /**
+        * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests)
+        */
+       @After
+       public void sleep() {
+               try {
+                       Thread.sleep(500);
+               } catch (InterruptedException e) {
+                       Assert.fail("Should not happen");
+               }
+       }
+
+       private YarnClient yarnClient = null;
+       @Before
+       public void checkClusterEmpty() throws IOException, YarnException {
+               if(yarnClient == null) {
+                       yarnClient = YarnClient.createYarnClient();
+                       yarnClient.init(yarnConfiguration);
+                       yarnClient.start();
+               }
+
+               List<ApplicationReport> apps = yarnClient.getApplications();
+               for(ApplicationReport app : apps) {
+                       if(app.getYarnApplicationState() != 
YarnApplicationState.FINISHED
+                                       && app.getYarnApplicationState() != 
YarnApplicationState.KILLED
+                                       && app.getYarnApplicationState() != 
YarnApplicationState.FAILED) {
+                               Assert.fail("There is at least one application 
on the cluster is not finished." +
+                                               "App "+app.getApplicationId()+" 
is in state "+app.getYarnApplicationState());
+                       }
+               }
+       }
+
+       /**
+        * Locate a file or directory
+        */
+       public static File findFile(String startAt, FilenameFilter fnf) {
+               File root = new File(startAt);
+               String[] files = root.list();
+               if(files == null) {
+                       return null;
+               }
+               for(String file : files) {
+                       File f = new File(startAt + File.separator + file);
+                       if(f.isDirectory()) {
+                               File r = findFile(f.getAbsolutePath(), fnf);
+                               if(r != null) {
+                                       return r;
+                               }
+                       } else if (fnf.accept(f.getParentFile(), f.getName())) {
+                               return f;
+                       }
+               }
+               return null;
+       }
+
+       /**
+        * Filter to find root dir of the flink-yarn dist.
+        */
+       public static class RootDirFilenameFilter implements FilenameFilter {
+               @Override
+               public boolean accept(File dir, String name) {
+                       return name.startsWith("flink-dist") && 
name.endsWith(".jar") && dir.toString().contains("/lib");
+               }
+       }
+
+       public static class ContainsName implements FilenameFilter {
+               private String[] names;
+               private String excludeInPath = null;
+
+               /**
+                * @param names which have to be included in the filename.
+                */
+               public ContainsName(String[] names) {
+                       this.names = names;
+               }
+
+               public ContainsName(String[] names, String excludeInPath) {
+                       this.names = names;
+                       this.excludeInPath = excludeInPath;
+               }
+
+               @Override
+               public boolean accept(File dir, String name) {
+                       if(excludeInPath == null) {
+                               for(String n: names) {
+                                       if(!name.contains(n)) {
+                                               return false;
+                                       }
+                               }
+                               return true;
+                       } else {
+                               for(String n: names) {
+                                       if(!name.contains(n)) {
+                                               return false;
+                                       }
+                               }
+                               return !dir.toString().contains(excludeInPath);
+                       }
+               }
+       }
+
+       public static File writeYarnSiteConfigXML(Configuration yarnConf) 
throws IOException {
+               tmp.create();
+               File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + 
"/yarn-site.xml");
+
+               FileWriter writer = new FileWriter(yarnSiteXML);
+               yarnConf.writeXml(writer);
+               writer.flush();
+               writer.close();
+               return yarnSiteXML;
+       }
+
+       /**
+        * This method checks the written TaskManager and JobManager log files
+        * for exceptions.
+        *
+        * WARN: Please make sure the tool doesn't find old logfiles from 
previous test runs.
+        * So always run "mvn clean" before running the tests here.
+        *
+        */
+       public static void ensureNoProhibitedStringInLogFiles(final String[] 
prohibited, final String[] whitelisted) {
+               File cwd = new File("target/" + 
yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
+               Assert.assertTrue("Expecting directory " + 
cwd.getAbsolutePath() + " to exist", cwd.exists());
+               Assert.assertTrue("Expecting directory " + 
cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
+               
+               File foundFile = findFile(cwd.getAbsolutePath(), new 
FilenameFilter() {
+                       @Override
+                       public boolean accept(File dir, String name) {
+                       // scan each file for prohibited strings.
+                       File f = new File(dir.getAbsolutePath()+ "/" + name);
+                       try {
+                               Scanner scanner = new Scanner(f);
+                               while (scanner.hasNextLine()) {
+                                       final String lineFromFile = 
scanner.nextLine();
+                                       for (String aProhibited : prohibited) {
+                                               if 
(lineFromFile.contains(aProhibited)) {
+                                                       
+                                                       boolean 
whitelistedFound = false;
+                                                       for (String white : 
whitelisted) {
+                                                               if 
(lineFromFile.contains(white)) {
+                                                                       
whitelistedFound = true;
+                                                                       break;
+                                                               }
+                                                       }
+                                                       
+                                                       if (!whitelistedFound) {
+                                                               // logging in 
FATAL to see the actual message in TRAVIS tests.
+                                                               Marker fatal = 
MarkerFactory.getMarker("FATAL");
+                                                               
LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, 
lineFromFile);
+                                                               return true;
+                                                       }
+                                               }
+                                       }
+
+                               }
+                       } catch (FileNotFoundException e) {
+                               LOG.warn("Unable to locate file: 
"+e.getMessage()+" file: "+f.getAbsolutePath());
+                       }
+
+                       return false;
+                       }
+               });
+               if(foundFile != null) {
+                       Scanner scanner =  null;
+                       try {
+                               scanner = new Scanner(foundFile);
+                       } catch (FileNotFoundException e) {
+                               Assert.fail("Unable to locate file: 
"+e.getMessage()+" file: "+foundFile.getAbsolutePath());
+                       }
+                       LOG.warn("Found a file with a prohibited string. 
Printing contents:");
+                       while (scanner.hasNextLine()) {
+                               LOG.warn("LINE: "+scanner.nextLine());
+                       }
+                       Assert.fail("Found a file "+foundFile+" with a 
prohibited string: "+Arrays.toString(prohibited));
+               }
+       }
+
+       public static void sleep(int time) {
+               try {
+                       Thread.sleep(time);
+               } catch (InterruptedException e) {
+                       LOG.warn("Interruped",e);
+               }
+       }
+
+       public static int getRunningContainers() {
+               int count = 0;
+               for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+                       NodeManager nm = yarnCluster.getNodeManager(nmId);
+                       ConcurrentMap<ContainerId, Container> containers = 
nm.getNMContext().getContainers();
+                       count += containers.size();
+               }
+               return count;
+       }
+
+       public static void startYARNWithConfig(Configuration conf) {
+               // set the home directory to a tmp directory. Flink on YARN is 
using the home dir to distribute the file
+               File homeDir = null;
+               try {
+                       homeDir = tmp.newFolder();
+               } catch (IOException e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+               System.setProperty("user.home", homeDir.getAbsolutePath());
+               String uberjarStartLoc = "..";
+               LOG.info("Trying to locate uberjar in {}", new 
File(uberjarStartLoc));
+               flinkUberjar = findFile(uberjarStartLoc, new 
RootDirFilenameFilter());
+               Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
+               String flinkDistRootDir = 
flinkUberjar.getParentFile().getParent();
+               flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar 
is located in lib/
+               Assert.assertNotNull("Flink flinkLibFolder not found", 
flinkLibFolder);
+               Assert.assertTrue("lib folder not found", 
flinkLibFolder.exists());
+               Assert.assertTrue("lib folder not found", 
flinkLibFolder.isDirectory());
+
+               if (!flinkUberjar.exists()) {
+                       Assert.fail("Unable to locate yarn-uberjar.jar");
+               }
+
+               try {
+                       LOG.info("Starting up MiniYARNCluster");
+                       if (yarnCluster == null) {
+                               yarnCluster = new 
MiniYARNCluster(conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY), NUM_NODEMANAGERS, 
1, 1);
+
+                               yarnCluster.init(conf);
+                               yarnCluster.start();
+                       }
+
+                       Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+
+                       File flinkConfDirPath = findFile(flinkDistRootDir, new 
ContainsName(new String[]{"flink-conf.yaml"}));
+                       Assert.assertNotNull(flinkConfDirPath);
+
+                       map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent());
+
+                       File yarnConfFile = writeYarnSiteConfigXML(conf);
+                       map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
+                       map.put("IN_TESTS", "yes we are in tests"); // see 
FlinkYarnClient() for more infos
+                       TestBaseUtils.setEnv(map);
+
+                       Assert.assertTrue(yarnCluster.getServiceState() == 
Service.STATE.STARTED);
+
+                       // wait for the nodeManagers to connect
+                       while(!yarnCluster.waitForNodeManagersToConnect(500)) {
+                               LOG.info("Waiting for Nodemanagers to connect");
+                       }
+               } catch (Exception ex) {
+                       ex.printStackTrace();
+                       LOG.error("setup failure", ex);
+                       Assert.fail();
+               }
+       }
+
+       /**
+        * Default @BeforeClass impl. Overwrite this for passing a different 
configuration
+        */
+       @BeforeClass
+       public static void setup() {
+               startYARNWithConfig(yarnConfiguration);
+       }
+
+       // -------------------------- Runner -------------------------- //
+
+       protected static ByteArrayOutputStream outContent;
+       protected static ByteArrayOutputStream errContent;
+       enum RunTypes {
+               YARN_SESSION, CLI_FRONTEND
+       }
+
+       /**
+        * This method returns once the "startedAfterString" has been seen.
+        */
+       protected Runner startWithArgs(String[] args, String 
startedAfterString, RunTypes type) {
+               LOG.info("Running with args {}", Arrays.toString(args));
+
+               outContent = new ByteArrayOutputStream();
+               errContent = new ByteArrayOutputStream();
+               System.setOut(new PrintStream(outContent));
+               System.setErr(new PrintStream(errContent));
+
+
+               final int START_TIMEOUT_SECONDS = 60;
+
+               Runner runner = new Runner(args, type);
+               runner.setName("Frontend (CLI/YARN Client) runner thread 
(startWithArgs()).");
+               runner.start();
+
+               for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
+                       sleep(1000);
+                       // check output for correct TaskManager startup.
+                       if(outContent.toString().contains(startedAfterString)
+                                       || 
errContent.toString().contains(startedAfterString) ) {
+                               LOG.info("Found expected output in redirected 
streams");
+                               return runner;
+                       }
+                       // check if thread died
+                       if(!runner.isAlive()) {
+                               sendOutput();
+                               Assert.fail("Runner thread died before the test 
was finished. Return value = "+runner.getReturnValue());
+                       }
+               }
+
+               sendOutput();
+               Assert.fail("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
+                               "expected string did not show up");
+               return null;
+       }
+
+       protected void runWithArgs(String[] args, String terminateAfterString, 
String[] failOnStrings, RunTypes type, int returnCode) {
+               runWithArgs(args,terminateAfterString, failOnStrings, type, 
returnCode, false);
+       }
+       /**
+        * The test has been passed once the "terminateAfterString" has been 
seen.
+        * @param args Command line arguments for the runner
+        * @param terminateAfterString the runner is searching the stdout and 
stderr for this string. as soon as it appears, the test has passed
+        * @param failOnStrings The runner is searching stdout and stderr for 
the strings specified here. If one appears, the test has failed
+        * @param type Set the type of the runner
+        * @param returnCode Expected return code from the runner.
+        * @param checkLogForTerminateString  If true, the runner checks also 
the log4j logger for the terminate string
+        */
+       protected void runWithArgs(String[] args, String terminateAfterString, 
String[] failOnStrings, RunTypes type, int returnCode, boolean 
checkLogForTerminateString) {
+               LOG.info("Running with args {}", Arrays.toString(args));
+
+               outContent = new ByteArrayOutputStream();
+               errContent = new ByteArrayOutputStream();
+               System.setOut(new PrintStream(outContent));
+               System.setErr(new PrintStream(errContent));
+
+
+               // we wait for at most three minutes
+               final int START_TIMEOUT_SECONDS = 180;
+               final long deadline = System.currentTimeMillis() + 
(START_TIMEOUT_SECONDS * 1000);
+               
+               Runner runner = new Runner(args, type);
+               runner.start();
+
+               boolean expectedStringSeen = false;
+               boolean testPassedFromLog4j = false;
+               do {
+                       sleep(1000);
+                       String outContentString = outContent.toString();
+                       String errContentString = errContent.toString();
+                       if(failOnStrings != null) {
+                               for (String failOnString : failOnStrings) {
+                                       if 
(outContentString.contains(failOnString)
+                                                       || 
errContentString.contains(failOnString)) {
+                                               LOG.warn("Failing test. Output 
contained illegal string '" + failOnString + "'");
+                                               sendOutput();
+                                               // stopping runner.
+                                               runner.sendStop();
+                                               Assert.fail("Output contained 
illegal string '" + failOnString + "'");
+                                       }
+                               }
+                       }
+                       // check output for the expected terminateAfterString.
+                       if(checkLogForTerminateString) {
+                               LoggingEvent matchedEvent = 
UtilsTest.getEventContainingString(terminateAfterString);
+                               if(matchedEvent != null) {
+                                       testPassedFromLog4j = true;
+                                       LOG.info("Found expected output in 
logging event {}", matchedEvent);
+                               }
+
+                       }
+
+                       if (outContentString.contains(terminateAfterString) || 
errContentString.contains(terminateAfterString) || testPassedFromLog4j ) {
+                               expectedStringSeen = true;
+                               LOG.info("Found expected output in redirected 
streams");
+                               // send "stop" command to command line interface
+                               LOG.info("RunWithArgs: request runner to stop");
+                               runner.sendStop();
+                               // wait for the thread to stop
+                               try {
+                                       runner.join(30000);
+                               }
+                               catch (InterruptedException e) {
+                                       LOG.warn("Interrupted while stopping 
runner", e);
+                               }
+                               LOG.warn("RunWithArgs runner stopped.");
+                       }
+                       else {
+                               // check if thread died
+                               if (!runner.isAlive()) {
+                                       if (runner.getReturnValue() != 0) {
+                                               Assert.fail("Runner thread died 
before the test was finished. Return value = "
+                                                               + 
runner.getReturnValue());
+                                       } else {
+                                               LOG.info("Runner stopped 
earlier than expected with return value = 0");
+                                       }
+                                       // leave loop: the runner died, so we 
can not expect new strings to show up.
+                                       break;
+                               }
+                       }
+               }
+               while (!expectedStringSeen && System.currentTimeMillis() < 
deadline);
+               
+               sendOutput();
+               Assert.assertTrue("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
+                               "expected string did not show up", 
expectedStringSeen);
+
+               // check for 0 return code
+               Assert.assertEquals("Expected return value", returnCode, 
runner.getReturnValue());
+               LOG.info("Test was successful");
+       }
+
+       protected static void sendOutput() {
+               System.setOut(originalStdout);
+               System.setErr(originalStderr);
+
+               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
outContent.toString());
+               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
errContent.toString());
+       }
+
+       public static class Runner extends Thread {
+               private final String[] args;
+               private int returnValue;
+               private RunTypes type;
+               private FlinkYarnSessionCli yCli;
+
+               public Runner(String[] args, RunTypes type) {
+                       this.args = args;
+                       this.type = type;
+               }
+
+               public int getReturnValue() {
+                       return returnValue;
+               }
+
+               @Override
+               public void run() {
+                       switch(type) {
+                               case YARN_SESSION:
+                                       yCli = new FlinkYarnSessionCli("", "");
+                                       returnValue = yCli.run(args);
+                                       break;
+                               case CLI_FRONTEND:
+                                       try {
+                                               CliFrontend cli = new 
CliFrontend();
+                                               returnValue = 
cli.parseParameters(args);
+                                       } catch (Exception e) {
+                                               throw new RuntimeException(e);
+                                       }
+                                       break;
+                               default:
+                                       throw new RuntimeException("Unknown 
type " + type);
+                       }
+
+                       if(returnValue != 0) {
+                               Assert.fail("The YARN session returned with 
non-null value="+returnValue);
+                       }
+               }
+
+               public void sendStop() {
+                       if(yCli != null) {
+                               yCli.stop();
+                       }
+               }
+       }
+
+       // -------------------------- Tear down -------------------------- //
+
+       @AfterClass
+       public static void copyOnTravis() {
+               // When we are on travis, we copy the tmp files of JUnit 
(containing the MiniYARNCluster log files)
+               // to <flinkRoot>/target/flink-yarn-tests-*.
+               // The files from there are picked up by the 
./tools/travis_watchdog.sh script
+               // to upload them to Amazon S3.
+               if(isOnTravis()) {
+                       File target = new File("../target" + 
yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
+                       if(!target.mkdirs()) {
+                               LOG.warn("Error creating dirs to {}", target);
+                       }
+                       File src = tmp.getRoot();
+                       LOG.info("copying the final files from {} to {}", 
src.getAbsolutePath(), target.getAbsolutePath());
+                       try {
+                               FileUtils.copyDirectoryToDirectory(src, target);
+                       } catch (IOException e) {
+                               LOG.warn("Error copying the final files from {} 
to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), 
e.getMessage(), e);
+                       }
+               }
+       }
+
+       public static boolean isOnTravis() {
+               return System.getenv("TRAVIS") != null && 
System.getenv("TRAVIS").equals("true");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties 
b/flink-yarn-tests/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..ebe0d37
--- /dev/null
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -0,0 +1,35 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
+
+# log whats going on between the tests
+log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO
+log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO
+log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO
+log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO
+log4j.logger.org.apache.flink.runtime.leaderelection=INFO
+log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
new file mode 100644
index 0000000..2f93785
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import java.util.concurrent.ExecutorService
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
+
+import scala.concurrent.duration.FiniteDuration
+
+/** [[YarnJobManager]] implementation which mixes in the 
[[TestingJobManagerLike]] mixin.
+  *
+  * This actor class is used for testing purposes on Yarn. Here we use an 
explicit class definition
+  * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
+  *
+  * @param flinkConfiguration Configuration object for the actor
+  * @param executorService Execution context which is used to execute 
concurrent tasks in the
+  *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param instanceManager Instance manager to manage the registered
+  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
+  * @param scheduler Scheduler to schedule Flink jobs
+  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param archive Archive for finished Flink jobs
+  * @param restartStrategyFactory Default restart strategy for job restarts
+  * @param timeout Timeout for futures
+  * @param leaderElectionService LeaderElectionService to participate in the 
leader election
+  */
+class TestingYarnJobManager(
+    flinkConfiguration: Configuration,
+    executorService: ExecutorService,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    restartStrategyFactory: RestartStrategyFactory,
+    timeout: FiniteDuration,
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory,
+    savepointStore: SavepointStore,
+    jobRecoveryTimeout: FiniteDuration)
+  extends YarnJobManager(
+    flinkConfiguration,
+    executorService,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    restartStrategyFactory,
+    timeout,
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory,
+    savepointStore,
+    jobRecoveryTimeout)
+  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
new file mode 100644
index 0000000..73ab7eb
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
+
+/** [[YarnTaskManager]] implementation which mixes in the 
[[TestingTaskManagerLike]] mixin.
+  *
+  * This actor class is used for testing purposes on Yarn. Here we use an 
explicit class definition
+  * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
+  *
+  * @param config Configuration object for the actor
+  * @param resourceID The Yarn container id
+  * @param connectionInfo Connection information of this actor
+  * @param memoryManager MemoryManager which is responsibel for Flink's 
managed memory allocation
+  * @param ioManager IOManager responsible for I/O
+  * @param network NetworkEnvironment for this actor
+  * @param numberOfSlots Number of slots for this TaskManager
+  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the 
current leading
+  *                              JobManager
+  */
+class TestingYarnTaskManager(
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: InstanceConnectionInfo,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    leaderRetrievalService: LeaderRetrievalService)
+  extends YarnTaskManager(
+    config,
+    resourceID,
+    connectionInfo,
+    memoryManager,
+    ioManager,
+    network,
+    numberOfSlots,
+    leaderRetrievalService)
+  with TestingTaskManagerLike {
+
+  object YarnTaskManager {
+
+    /** Entry point (main method) to run the TaskManager on YARN.
+      * @param args The command line arguments.
+      */
+    def main(args: Array[String]): Unit = {
+      YarnTaskManagerRunner.runYarnTaskManager(args, 
classOf[TestingYarnTaskManager])
+    }
+
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 63869ee..12db578 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -60,6 +60,7 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils_2.10</artifactId>
                        <version>${project.version}</version>
+                       <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
 

Reply via email to