This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b6af3d348ee854e51a55e58371d0592fbee1cb8 Author: wangyang0918 <danrtsey...@alibaba-inc.com> AuthorDate: Sat Apr 3 01:03:42 2021 +0800 [hotfix][yarn-tests] Check the applicationId in verifyStringsInNamedLogFiles --- .../org/apache/flink/yarn/YARNSessionFIFOITCase.java | 5 ++++- .../flink/yarn/YARNSessionFIFOSecuredITCase.java | 16 +++++++++------- .../test/java/org/apache/flink/yarn/YarnTestBase.java | 18 ++++++++++++------ 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index e82396c..8ab421f 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -92,7 +92,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { } /** Test regular operation, including command line parameter parsing. */ - void runDetachedModeTest(Map<String, String> securityProperties) throws Exception { + ApplicationId runDetachedModeTest(Map<String, String> securityProperties) throws Exception { LOG.info("Starting testDetachedMode()"); File exampleJarLocation = getTestJarPath("StreamingWordCount.jar"); @@ -169,10 +169,12 @@ public class YARNSessionFIFOITCase extends YarnTestBase { + ")"); LOG.info("Waiting until the job reaches FINISHED state"); + final ApplicationId applicationId = getOnlyApplicationReport().getApplicationId(); CommonTestUtils.waitUntilCondition( () -> verifyStringsInNamedLogFiles( new String[] {"switched from state RUNNING to FINISHED"}, + applicationId, "jobmanager.log"), Deadline.fromNow(timeout), testConditionIntervalInMillis, @@ -242,6 +244,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { } LOG.info("Finished testDetachedMode()"); + return applicationId; } /** diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java index b3e188a..eae6d42 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.yarn.util.TestHadoopModuleFactory; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -150,8 +151,8 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase { SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), SecureTestEnvironment.getHadoopServicePrincipal()); } - runDetachedModeTest(securityProperties); - verifyResultContainsKerberosKeytab(); + final ApplicationId applicationId = runDetachedModeTest(securityProperties); + verifyResultContainsKerberosKeytab(applicationId); }); } @@ -171,17 +172,18 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase { SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), SecureTestEnvironment.getHadoopServicePrincipal()); } - runDetachedModeTest(securityProperties); - verifyResultContainsKerberosKeytab(); + final ApplicationId applicationId = runDetachedModeTest(securityProperties); + verifyResultContainsKerberosKeytab(applicationId); }); } - private static void verifyResultContainsKerberosKeytab() throws Exception { + private static void verifyResultContainsKerberosKeytab(ApplicationId applicationId) + throws Exception { final String[] mustHave = {"Login successful for user", "using keytab file"}; final boolean jobManagerRunsWithKerberos = - verifyStringsInNamedLogFiles(mustHave, "jobmanager.log"); + verifyStringsInNamedLogFiles(mustHave, applicationId, "jobmanager.log"); final boolean taskManagerRunsWithKerberos = - verifyStringsInNamedLogFiles(mustHave, "taskmanager.log"); + verifyStringsInNamedLogFiles(mustHave, applicationId, "taskmanager.log"); Assert.assertThat( "The JobManager and the TaskManager should both run with Kerberos.", diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 0fd7a62..d214f9c 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -95,6 +95,7 @@ import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.flink.util.Preconditions.checkState; import static org.junit.Assert.assertEquals; @@ -599,14 +600,14 @@ public abstract class YarnTestBase extends TestLogger { } public static boolean verifyStringsInNamedLogFiles( - final String[] mustHave, final String fileName) { - List<String> mustHaveList = Arrays.asList(mustHave); - File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); + final String[] mustHave, final ApplicationId applicationId, final String fileName) { + final List<String> mustHaveList = Arrays.asList(mustHave); + final File cwd = new File("target", YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); if (!cwd.exists() || !cwd.isDirectory()) { return false; } - File foundFile = + final File foundFile = TestUtils.findFile( cwd.getAbsolutePath(), new FilenameFilter() { @@ -615,10 +616,15 @@ public abstract class YarnTestBase extends TestLogger { if (fileName != null && !name.equals(fileName)) { return false; } - File f = new File(dir.getAbsolutePath() + "/" + name); + final File f = new File(dir.getAbsolutePath(), name); + // Only check the specified application logs + if (StreamSupport.stream(f.toPath().spliterator(), false) + .noneMatch(p -> p.endsWith(applicationId.toString()))) { + return false; + } LOG.info("Searching in {}", f.getAbsolutePath()); try (Scanner scanner = new Scanner(f)) { - Set<String> foundSet = new HashSet<>(mustHave.length); + final Set<String> foundSet = new HashSet<>(mustHave.length); while (scanner.hasNextLine()) { final String lineFromFile = scanner.nextLine(); for (String str : mustHave) {