Repository: hadoop Updated Branches: refs/heads/YARN-2928 c994596b2 -> 15962ea0c (forced update)
YARN-3461. Consolidate flow name/version/run defaults. (Sangjin Lee via Varun Saxena) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/525b30ea Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/525b30ea Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/525b30ea Branch: refs/heads/YARN-2928 Commit: 525b30ea70866b32931ecd443763d4763623e91a Parents: e31210d Author: Varun Saxena <varunsax...@apache.org> Authored: Thu Apr 7 22:10:11 2016 +0530 Committer: Sangjin Lee <sj...@apache.org> Committed: Fri Jul 8 10:19:00 2016 -0700 ---------------------------------------------------------------------- .../mapred/TestMRTimelineEventHandling.java | 46 +++++++------- .../distributedshell/TestDistributedShell.java | 18 ++++-- .../yarn/util/timeline/TimelineUtils.java | 8 ++- .../resourcemanager/amlauncher/AMLauncher.java | 67 +++++++++++++++----- .../RMTimelineCollectorManager.java | 36 +++++++++-- .../TestSystemMetricsPublisherForV2.java | 20 +++--- .../collector/AppLevelTimelineCollector.java | 11 +--- .../collector/NodeTimelineCollectorManager.java | 12 ++++ .../collector/TimelineCollectorContext.java | 5 +- 9 files changed, 148 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index f7283ae..300b4fb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -20,15 +20,12 @@ package org.apache.hadoop.mapred; import java.io.File; import java.io.IOException; - import java.util.EnumSet; import java.util.List; -import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -38,9 +35,9 @@ import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -48,7 +45,6 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; - import org.junit.Assert; import org.junit.Test; @@ -205,7 +201,7 @@ public class TestMRTimelineEventHandling { ApplicationReport appReport = apps.get(0); firstAppId = appReport.getApplicationId(); - checkNewTimelineEvent(firstAppId); + checkNewTimelineEvent(firstAppId, appReport); LOG.info("Run 2nd job which should be failed."); job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir); @@ -214,11 +210,10 @@ public class TestMRTimelineEventHandling { apps = yarnClient.getApplications(appStates); Assert.assertEquals(apps.size(), 2); - - ApplicationId secAppId = null; - secAppId = apps.get(0).getApplicationId() == firstAppId ? - apps.get(1).getApplicationId() : apps.get(0).getApplicationId(); - checkNewTimelineEvent(firstAppId); + + appReport = apps.get(0).getApplicationId().equals(firstAppId) ? + apps.get(0) : apps.get(1); + checkNewTimelineEvent(firstAppId, appReport); } finally { if (cluster != null) { @@ -235,7 +230,8 @@ public class TestMRTimelineEventHandling { } } - private void checkNewTimelineEvent(ApplicationId appId) throws IOException { + private void checkNewTimelineEvent(ApplicationId appId, + ApplicationReport appReport) throws IOException { String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + "/entities/"; @@ -243,15 +239,18 @@ public class TestMRTimelineEventHandling { File tmpRootFolder = new File(tmpRoot); Assert.assertTrue(tmpRootFolder.isDirectory()); - String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + - UserGroupInformation.getCurrentUser().getShortUserName() + - "/" + TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) + - "/1/1/" + appId.toString(); + String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + + "/" + UserGroupInformation.getCurrentUser().getShortUserName() + + "/" + appReport.getName() + + "/" + TimelineUtils.DEFAULT_FLOW_VERSION + + "/" + appReport.getStartTime() + + "/" + appId.toString(); // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs String outputDirJob = basePath + "/MAPREDUCE_JOB/"; File entityFolder = new File(outputDirJob); - Assert.assertTrue("Job output directory: " + outputDirJob + " is not exist.", + Assert.assertTrue("Job output directory: " + outputDirJob + + " does not exist.", entityFolder.isDirectory()); // check for job event file @@ -260,13 +259,15 @@ public class TestMRTimelineEventHandling { String jobEventFilePath = outputDirJob + jobEventFileName; File jobEventFile = new File(jobEventFilePath); - Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not exist.", + Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + + " does not exist.", jobEventFile.exists()); // check for task event file String outputDirTask = basePath + "/MAPREDUCE_TASK/"; File taskFolder = new File(outputDirTask); - Assert.assertTrue("Task output directory: " + outputDirTask + " is not exist.", + Assert.assertTrue("Task output directory: " + outputDirTask + + " does not exist.", taskFolder.isDirectory()); String taskEventFileName = appId.toString().replaceAll("application", "task") @@ -274,14 +275,15 @@ public class TestMRTimelineEventHandling { String taskEventFilePath = outputDirTask + taskEventFileName; File taskEventFile = new File(taskEventFilePath); - Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not exist.", + Assert.assertTrue("taskEventFileName: " + taskEventFilePath + + " does not exist.", taskEventFile.exists()); // check for task attempt event file String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/"; File taskAttemptFolder = new File(outputDirTaskAttempt); Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + - " is not exist.", taskAttemptFolder.isDirectory()); + " does not exist.", taskAttemptFolder.isDirectory()); String taskAttemptEventFileName = appId.toString().replaceAll( "application", "attempt") + "_m_000000_0" + @@ -291,7 +293,7 @@ public class TestMRTimelineEventHandling { taskAttemptEventFileName; File taskAttemptEventFile = new File(taskAttemptEventFilePath); Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + - " is not exist.", taskAttemptEventFile.exists()); + " does not exist.", taskAttemptEventFile.exists()); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 95129cb..ba11e60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -372,13 +372,14 @@ public class TestDistributedShell { boolean verified = false; String errorMessage = ""; ApplicationId appId = null; + ApplicationReport appReport = null; while(!verified) { List<ApplicationReport> apps = yarnClient.getApplications(); if (apps.size() == 0 ) { Thread.sleep(10); continue; } - ApplicationReport appReport = apps.get(0); + appReport = apps.get(0); appId = appReport.getApplicationId(); if(appReport.getHost().equals("N/A")) { Thread.sleep(10); @@ -424,7 +425,7 @@ public class TestDistributedShell { if (!isTestingTimelineV2) { checkTimelineV1(haveDomain); } else { - checkTimelineV2(haveDomain, appId, defaultFlow); + checkTimelineV2(haveDomain, appId, defaultFlow, appReport); } } @@ -481,7 +482,7 @@ public class TestDistributedShell { } private void checkTimelineV2(boolean haveDomain, ApplicationId appId, - boolean defaultFlow) throws Exception { + boolean defaultFlow, ApplicationReport appReport) throws Exception { LOG.info("Started checkTimelineV2 "); // For PoC check in /tmp/timeline_service_data YARN-3264 String tmpRoot = @@ -494,10 +495,13 @@ public class TestDistributedShell { String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + UserGroupInformation.getCurrentUser().getShortUserName() + - (defaultFlow ? "/" + - TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) + - "/1/1/" : "/test_flow_name/test_flow_version/12345678/") + - appId.toString(); + (defaultFlow ? + "/" + appReport.getName() + "/" + + TimelineUtils.DEFAULT_FLOW_VERSION +"/" + + appReport.getStartTime() +"/" : + "/test_flow_name/test_flow_version/12345678/") + + appId.toString(); + LOG.info("basePath: " + basePath); // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs // Verify DS_APP_ATTEMPT entities posted by the client http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 6faeffa..b618ac1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -49,6 +49,7 @@ public class TimelineUtils { "TIMELINE_FLOW_VERSION_TAG"; public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG"; + public final static String DEFAULT_FLOW_VERSION = "1"; private static ObjectMapper mapper; @@ -162,9 +163,12 @@ public class TimelineUtils { return SecurityUtil.buildTokenService(timelineServiceAddr); } - public static String generateDefaultFlowNameBasedOnAppId( + public static String generateDefaultFlowName(String appName, ApplicationId appId) { - return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); + return (appName != null && + !appName.equals(YarnConfiguration.DEFAULT_APPLICATION_NAME)) ? + appName : + "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index dd08dcc..4aace2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; @@ -198,6 +199,8 @@ public class AMLauncher implements Runnable { // Finalize the container setupTokens(container, containerID); + // set the flow context optionally for timeline service v.2 + setFlowContext(container); return container; } @@ -229,15 +232,6 @@ public class AMLauncher implements Runnable { .get(applicationId) .getSubmitTime())); - if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - // Set flow context info - for (String tag : - rmContext.getRMApps().get(applicationId).getApplicationTags()) { - setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag); - setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag); - } - } Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); ByteBuffer tokens = container.getTokens(); @@ -258,17 +252,58 @@ public class AMLauncher implements Runnable { container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); } - private static void setFlowTags( - Map<String, String> environment, String tagPrefix, String tag) { - if (tag.startsWith(tagPrefix + ":") || - tag.startsWith(tagPrefix.toLowerCase() + ":")) { - String value = tag.substring(tagPrefix.length() + 1); - if (!value.isEmpty()) { - environment.put(tagPrefix, value); + private void setFlowContext(ContainerLaunchContext container) { + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + Map<String, String> environment = container.getEnvironment(); + ApplicationId applicationId = + application.getAppAttemptId().getApplicationId(); + RMApp app = rmContext.getRMApps().get(applicationId); + + // initialize the flow in the environment with default values for those + // that do not specify the flow tags + // flow name: app name (or app id if app name is missing), + // flow version: "1", flow run id: start time + setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, + TimelineUtils.generateDefaultFlowName(app.getName(), applicationId)); + setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, + TimelineUtils.DEFAULT_FLOW_VERSION); + setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, + String.valueOf(app.getStartTime())); + + // Set flow context info: the flow context is received via the application + // tags + for (String tag : app.getApplicationTags()) { + String[] parts = tag.split(":", 2); + if (parts.length != 2 || parts[1].isEmpty()) { + continue; + } + switch (parts[0].toUpperCase()) { + case TimelineUtils.FLOW_NAME_TAG_PREFIX: + setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, + parts[1]); + break; + case TimelineUtils.FLOW_VERSION_TAG_PREFIX: + setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, + parts[1]); + break; + case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: + setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, + parts[1]); + break; + default: + break; + } } } } + private static void setFlowTags( + Map<String, String> environment, String tagPrefix, String value) { + if (!value.isEmpty()) { + environment.put(tagPrefix, value); + } + } + @VisibleForTesting protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() { Token<AMRMTokenIdentifier> amrmToken = http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java index ff055a1..a4f1084 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -25,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -35,6 +38,9 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @InterfaceAudience.Private @InterfaceStability.Unstable public class RMTimelineCollectorManager extends TimelineCollectorManager { + private static final Log LOG = + LogFactory.getLog(RMTimelineCollectorManager.class); + private RMContext rmContext; public RMTimelineCollectorManager(RMContext rmContext) { @@ -51,9 +57,21 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager { "non-existing app " + appId); } String userId = app.getUser(); + TimelineCollectorContext context = collector.getTimelineEntityContext(); if (userId != null && !userId.isEmpty()) { - collector.getTimelineEntityContext().setUserId(userId); + context.setUserId(userId); } + + // initialize the flow in the environment with default values for those + // that do not specify the flow tags + // flow name: app name (or app id if app name is missing), + // flow version: "1", flow run id: start time + context.setFlowName(TimelineUtils.generateDefaultFlowName( + app.getName(), appId)); + context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION); + context.setFlowRunId(app.getStartTime()); + + // the flow context is received via the application tags for (String tag : app.getApplicationTags()) { String[] parts = tag.split(":", 2); if (parts.length != 2 || parts[1].isEmpty()) { @@ -61,14 +79,22 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager { } switch (parts[0].toUpperCase()) { case TimelineUtils.FLOW_NAME_TAG_PREFIX: - collector.getTimelineEntityContext().setFlowName(parts[1]); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + parts[1]); + } + context.setFlowName(parts[1]); break; case TimelineUtils.FLOW_VERSION_TAG_PREFIX: - collector.getTimelineEntityContext().setFlowVersion(parts[1]); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + parts[1]); + } + context.setFlowVersion(parts[1]); break; case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX: - collector.getTimelineEntityContext().setFlowRunId( - Long.parseLong(parts[1])); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + parts[1]); + } + context.setFlowRunId(Long.parseLong(parts[1])); break; default: break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 1a58cb2..7a018ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -83,8 +83,6 @@ public class TestSystemMetricsPublisherForV2 { private static TimelineServiceV2Publisher metricsPublisher; private static DrainDispatcher dispatcher = new DrainDispatcher(); - private static final String DEFAULT_FLOW_VERSION = "1"; - private static final long DEFAULT_FLOW_RUN = 1; private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext; @@ -316,16 +314,14 @@ public class TestSystemMetricsPublisherForV2 { private String getTimelineEntityDir(RMApp app) { String outputDirApp = - testRootDir.getAbsolutePath()+"/" - + FileSystemTimelineWriterImpl.ENTITIES_DIR - + "/" - + YarnConfiguration.DEFAULT_RM_CLUSTER_ID - + "/" - + app.getUser() - + "/" - + TimelineUtils.generateDefaultFlowNameBasedOnAppId(app - .getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/" - + DEFAULT_FLOW_RUN + "/" + app.getApplicationId(); + testRootDir.getAbsolutePath() + "/" + + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + app.getUser() + "/" + + app.getName() + "/" + + TimelineUtils.DEFAULT_FLOW_VERSION + "/" + + app.getStartTime() + "/" + + app.getApplicationId(); return outputDirApp; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 36dd7b0..4fe445a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -18,14 +18,14 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; -import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +import com.google.common.base.Preconditions; /** * Service that handles writes to the timeline service and writes them to the @@ -54,13 +54,6 @@ public class AppLevelTimelineCollector extends TimelineCollector { // context info from NM. // Current user usually is not the app user, but keep this field non-null context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName()); - // Use app ID to generate a default flow name for orphan app - context.setFlowName( - TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId)); - // Set the flow version to string 1 if it's an orphan app - context.setFlowVersion("1"); - // Set the flow run ID to 1 if it's an orphan app - context.setFlowRunId(1L); context.setAppId(appId.toString()); super.serviceInit(conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 785fb19..75557a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -164,18 +164,30 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { getNMCollectorService().getTimelineCollectorContext(request); String userId = response.getUserId(); if (userId != null && !userId.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the user in the context: " + userId); + } collector.getTimelineEntityContext().setUserId(userId); } String flowName = response.getFlowName(); if (flowName != null && !flowName.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + flowName); + } collector.getTimelineEntityContext().setFlowName(flowName); } String flowVersion = response.getFlowVersion(); if (flowVersion != null && !flowVersion.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + flowVersion); + } collector.getTimelineEntityContext().setFlowVersion(flowVersion); } long flowRunId = response.getFlowRunId(); if (flowRunId != 0L) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + flowRunId); + } collector.getTimelineEntityContext().setFlowRunId(flowRunId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/525b30ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java index 58d68df..981ee2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java @@ -19,12 +19,12 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import org.apache.hadoop.yarn.server.timelineservice.TimelineContext; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * Encapsulates context information required by collector during a put. */ public class TimelineCollectorContext extends TimelineContext { - private String flowVersion; public TimelineCollectorContext() { @@ -34,7 +34,8 @@ public class TimelineCollectorContext extends TimelineContext { public TimelineCollectorContext(String clusterId, String userId, String flowName, String flowVersion, Long flowRunId, String appId) { super(clusterId, userId, flowName, flowRunId, appId); - this.flowVersion = flowVersion; + this.flowVersion = flowVersion == null ? + TimelineUtils.DEFAULT_FLOW_VERSION : flowVersion; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org