Repository: hadoop Updated Branches: refs/heads/YARN-2928 db76a3ad0 -> d491ef080
YARN-3367. Replace starting a separate thread for post entity with event loop in TimelineClient (Naganarasimha G R via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d491ef08 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d491ef08 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d491ef08 Branch: refs/heads/YARN-2928 Commit: d491ef080096c62964b8327555bf47ceae6e9292 Parents: db76a3a Author: Sangjin Lee <sj...@apache.org> Authored: Tue Feb 9 09:07:37 2016 -0800 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue Feb 9 09:07:37 2016 -0800 ---------------------------------------------------------------------- .../jobhistory/JobHistoryEventHandler.java | 61 +--- .../mapred/JobHistoryFileReplayMapper.java | 8 +- .../hadoop/mapred/TimelineEntityConverter.java | 12 +- hadoop-yarn-project/CHANGES.txt | 3 + .../timelineservice/TimelineEntities.java | 17 +- .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../distributedshell/ApplicationMaster.java | 78 +---- .../api/async/impl/AMRMClientAsyncImpl.java | 26 +- .../hadoop/yarn/client/api/TimelineClient.java | 8 +- .../client/api/impl/TimelineClientImpl.java | 286 ++++++++++++++--- .../src/main/resources/yarn-default.xml | 7 + .../api/impl/TestTimelineClientV2Impl.java | 304 +++++++++++++++++++ .../nodemanager/NodeStatusUpdaterImpl.java | 4 +- 13 files changed, 623 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 6e5afb1..1c5446f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -27,10 +27,7 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,7 +72,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The job history events get routed to this class. This class writes the Job * history events to the DFS directly into a staging dir and then moved to a @@ -129,10 +125,6 @@ public class JobHistoryEventHandler extends AbstractService private boolean timelineServiceV2Enabled = false; - // For posting entities in new timeline service in a non-blocking way - // TODO YARN-3367 replace with event loop in TimelineClient. - private ExecutorService threadPool; - private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE = @@ -272,10 +264,6 @@ public class JobHistoryEventHandler extends AbstractService YarnConfiguration.timelineServiceV2Enabled(conf); LOG.info("Timeline service is enabled; version: " + YarnConfiguration.getTimelineServiceVersion(conf)); - if (timelineServiceV2Enabled) { - // initialize the thread pool for v.2 timeline service - threadPool = createThreadPool(); - } } else { LOG.info("Timeline service is not enabled"); } @@ -449,35 +437,9 @@ public class JobHistoryEventHandler extends AbstractService if (timelineClient != null) { timelineClient.stop(); } - if (threadPool != null) { - shutdownAndAwaitTermination(); - } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); } - - // TODO remove threadPool after adding non-blocking call in TimelineClient - private ExecutorService createThreadPool() { - return Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); - } - - private void shutdownAndAwaitTermination() { - threadPool.shutdown(); - try { - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - LOG.error("ThreadPool did not terminate"); - } - } - } catch (InterruptedException ie) { - threadPool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } protected EventWriter createEventWriter(Path historyFilePath) throws IOException { @@ -1072,21 +1034,6 @@ public class JobHistoryEventHandler extends AbstractService } } - private void putEntityWithoutBlocking(final TimelineClient client, - final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity) { - Runnable publishWrapper = new Runnable() { - public void run() { - try { - client.putEntities(entity); - } catch (IOException|YarnException e) { - LOG.error("putEntityNonBlocking get failed: " + e); - throw new RuntimeException(e.toString()); - } - } - }; - threadPool.execute(publishWrapper); - } - // create JobEntity from HistoryEvent with adding other info, like: // jobId, timestamp and entityType. private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity @@ -1247,7 +1194,13 @@ public class JobHistoryEventHandler extends AbstractService taskId, setCreatedTime); } } - putEntityWithoutBlocking(timelineClient, tEntity); + try { + timelineClient.putEntitiesAsync(tEntity); + } catch (IOException | YarnException e) { + LOG.error("Failed to process Event " + event.getEventType() + + " for the job : " + jobId, e); + } + } private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java index 802b78f..4fb5308 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java @@ -21,8 +21,8 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -176,7 +176,7 @@ class JobHistoryFileReplayMapper extends EntityWriter { // create entities from job history and write them long totalTime = 0; - Set<TimelineEntity> entitySet = + List<TimelineEntity> entitySet = converter.createTimelineEntities(jobInfo, jobConf); LOG.info("converted them into timeline entities for job " + jobIdStr); // use the current user for this purpose @@ -215,7 +215,7 @@ class JobHistoryFileReplayMapper extends EntityWriter { } private void writeAllEntities(AppLevelTimelineCollector collector, - Set<TimelineEntity> entitySet, UserGroupInformation ugi) + List<TimelineEntity> entitySet, UserGroupInformation ugi) throws IOException { TimelineEntities entities = new TimelineEntities(); entities.setEntities(entitySet); @@ -223,7 +223,7 @@ class JobHistoryFileReplayMapper extends EntityWriter { } private void writePerEntity(AppLevelTimelineCollector collector, - Set<TimelineEntity> entitySet, UserGroupInformation ugi) + List<TimelineEntity> entitySet, UserGroupInformation ugi) throws IOException { for (TimelineEntity entity : entitySet) { TimelineEntities entities = new TimelineEntities(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java index 880014b..0e2eb72 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java @@ -18,7 +18,9 @@ package org.apache.hadoop.mapred; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -57,16 +59,16 @@ class TimelineEntityConverter { * Note that we also do not add info to the YARN application entity, which * would be needed for aggregation. */ - public Set<TimelineEntity> createTimelineEntities(JobInfo jobInfo, + public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo, Configuration conf) { - Set<TimelineEntity> entities = new HashSet<>(); + List<TimelineEntity> entities = new ArrayList<>(); // create the job entity TimelineEntity job = createJobEntity(jobInfo, conf); entities.add(job); // create the task and task attempt entities - Set<TimelineEntity> tasksAndAttempts = + List<TimelineEntity> tasksAndAttempts = createTaskAndTaskAttemptEntities(jobInfo); entities.addAll(tasksAndAttempts); @@ -125,9 +127,9 @@ class TimelineEntityConverter { } } - private Set<TimelineEntity> createTaskAndTaskAttemptEntities( + private List<TimelineEntity> createTaskAndTaskAttemptEntities( JobInfo jobInfo) { - Set<TimelineEntity> entities = new HashSet<>(); + List<TimelineEntity> entities = new ArrayList<>(); Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks(); LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + " tasks"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f6bf667..4c77b67 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -167,6 +167,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4446. Refactor reader API for better extensibility (Varun Saxena via sjlee) + YARN-3367. Replace starting a separate thread for post entity with event + loop in TimelineClient (Naganarasimha G R via sjlee) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java index f08a0ec..63989e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java @@ -17,15 +17,16 @@ */ package org.apache.hadoop.yarn.api.records.timelineservice; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import java.util.ArrayList; +import java.util.List; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import java.util.HashSet; -import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * This class hosts a set of timeline entities. @@ -36,22 +37,22 @@ import java.util.Set; @InterfaceStability.Unstable public class TimelineEntities { - private Set<TimelineEntity> entities = new HashSet<>(); + private List<TimelineEntity> entities = new ArrayList<>(); public TimelineEntities() { } @XmlElement(name = "entities") - public Set<TimelineEntity> getEntities() { + public List<TimelineEntity> getEntities() { return entities; } - public void setEntities(Set<TimelineEntity> timelineEntities) { + public void setEntities(List<TimelineEntity> timelineEntities) { this.entities = timelineEntities; } - public void addEntities(Set<TimelineEntity> timelineEntities) { + public void addEntities(List<TimelineEntity> timelineEntities) { this.entities.addAll(timelineEntities); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9b43fbd..6ac6fb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1762,6 +1762,12 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000; + public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = + TIMELINE_SERVICE_PREFIX + + "timeline-client.number-of-async-entities-to-merge"; + + public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 1c68086..cb5f53b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -39,9 +39,6 @@ import java.util.Set; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; @@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * An ApplicationMaster for executing shell commands on a set of launched @@ -219,10 +215,6 @@ public class ApplicationMaster { private boolean timelineServiceV2 = false; - // For posting entities in new timeline service in a non-blocking way - // TODO replace with event loop in TimelineClient. - private ExecutorService threadPool; - // App Master configuration // No. of containers to run shell command on @VisibleForTesting @@ -311,10 +303,6 @@ public class ApplicationMaster { } appMaster.run(); result = appMaster.finish(); - - if (appMaster.threadPool != null) { - appMaster.shutdownAndAwaitTermination(); - } } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -329,29 +317,6 @@ public class ApplicationMaster { } } - //TODO remove threadPool after adding non-blocking call in TimelineClient - private ExecutorService createThreadPool() { - return Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") - .build()); - } - - private void shutdownAndAwaitTermination() { - threadPool.shutdown(); - try { - // Wait a while for existing tasks to terminate - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) - LOG.error("ThreadPool did not terminate"); - } - } catch (InterruptedException ie) { - threadPool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - /** * Dump out contents of $CWD and the environment to stdout for debugging */ @@ -547,11 +512,7 @@ public class ApplicationMaster { .getOptionValue("priority", "0")); if (YarnConfiguration.timelineServiceEnabled(conf)) { - timelineServiceV2 = - YarnConfiguration.timelineServiceV2Enabled(conf); - if (timelineServiceV2) { - threadPool = createThreadPool(); - } + timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf); } else { timelineClient = null; LOG.warn("Timeline service is not enabled"); @@ -701,8 +662,10 @@ public class ApplicationMaster { if (timelineServiceV2) { timelineClient = TimelineClient.createTimelineClient( appAttemptID.getApplicationId()); + LOG.info("Timeline service V2 client is enabled"); } else { timelineClient = TimelineClient.createTimelineClient(); + LOG.info("Timeline service V1 client is enabled"); } timelineClient.init(conf); timelineClient.start(); @@ -1304,18 +1267,8 @@ public class ApplicationMaster { shellId); return new Thread(runnableLaunchContainer); } - - private void publishContainerStartEventOnTimelineServiceV2( - final Container container) { - Runnable publishWrapper = new Runnable() { - public void run() { - publishContainerStartEventOnTimelineServiceV2Base(container); - } - }; - threadPool.execute(publishWrapper); - } - private void publishContainerStartEventOnTimelineServiceV2Base( + private void publishContainerStartEventOnTimelineServiceV2( Container container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); @@ -1349,16 +1302,6 @@ public class ApplicationMaster { private void publishContainerEndEventOnTimelineServiceV2( final ContainerStatus container) { - Runnable publishWrapper = new Runnable() { - public void run() { - publishContainerEndEventOnTimelineServiceV2Base(container); - } - }; - threadPool.execute(publishWrapper); - } - - private void publishContainerEndEventOnTimelineServiceV2Base( - final ContainerStatus container) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); @@ -1389,17 +1332,6 @@ public class ApplicationMaster { } private void publishApplicationAttemptEventOnTimelineServiceV2( - final DSEvent appEvent) { - - Runnable publishWrapper = new Runnable() { - public void run() { - publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent); - } - }; - threadPool.execute(publishWrapper); - } - - private void publishApplicationAttemptEventOnTimelineServiceV2Base( DSEvent appEvent) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); @@ -1417,7 +1349,7 @@ public class ApplicationMaster { appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntities(entity); + timelineClient.putEntitiesAsync(entity); return null; } }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 212f721..8af0c78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -41,9 +41,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -327,6 +327,19 @@ extends AMRMClientAsync<T> { LOG.info("Interrupted while waiting for queue", ex); continue; } + + String collectorAddress = response.getCollectorAddr(); + TimelineClient timelineClient = client.getRegisteredTimeineClient(); + if (timelineClient != null && collectorAddress != null + && !collectorAddress.isEmpty()) { + if (collectorAddr == null + || !collectorAddr.equals(collectorAddress)) { + collectorAddr = collectorAddress; + timelineClient.setTimelineServiceAddress(collectorAddress); + LOG.info("collectorAddress " + collectorAddress); + } + } + List<NodeReport> updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { handler.onNodesUpdated(updatedNodes); @@ -354,17 +367,6 @@ extends AMRMClientAsync<T> { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } - - String collectorAddress = response.getCollectorAddr(); - TimelineClient timelineClient = client.getRegisteredTimeineClient(); - if (timelineClient != null && collectorAddress != null - && !collectorAddress.isEmpty()) { - if (collectorAddr == null || - !collectorAddr.equals(collectorAddress)) { - collectorAddr = collectorAddress; - timelineClient.setTimelineServiceAddress(collectorAddress); - } - } progress = handler.getProgress(); } catch (Throwable ex) { handler.onError(ex); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index ade4f9a..24d9f32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -27,12 +27,12 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -53,7 +53,7 @@ public abstract class TimelineClient extends AbstractService { * construct and initialize a timeline client if the following operations are * supposed to be conducted by that user. */ - private ApplicationId contextAppId; + protected ApplicationId contextAppId; /** * Creates an instance of the timeline v.1.x client. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index a158a56..c8e6481 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -29,6 +29,14 @@ import java.net.URL; import java.net.URLConnection; import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; @@ -129,6 +137,8 @@ public class TimelineClientImpl extends TimelineClient { @VisibleForTesting TimelineClientConnectionRetry connectionRetry; + private TimelineEntityDispatcher entityDispatcher; + // Abstract class for an operation that should be retried by timeline client private static abstract class TimelineClientRetryOp { // The operation that should be retried @@ -312,6 +322,7 @@ public class TimelineClientImpl extends TimelineClient { serviceRetryInterval = conf.getLong( YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + entityDispatcher = new TimelineEntityDispatcher(conf); } else { if (YarnConfiguration.useHttps(conf)) { setTimelineServiceAddress(conf.get( @@ -332,7 +343,9 @@ public class TimelineClientImpl extends TimelineClient { @Override protected void serviceStart() throws Exception { - if (!timelineServiceV2) { + if (timelineServiceV2) { + entityDispatcher.start(); + } else { timelineWriter = createTimelineWriter(configuration, authUgi, client, constructResURI(getConfig(), timelineServiceAddress, false)); } @@ -354,6 +367,9 @@ public class TimelineClientImpl extends TimelineClient { if (this.timelineWriter != null) { this.timelineWriter.close(); } + if (timelineServiceV2) { + entityDispatcher.stop(); + } super.serviceStop(); } @@ -366,37 +382,21 @@ public class TimelineClientImpl extends TimelineClient { @Override public void putEntities( org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) - throws IOException, YarnException { - putEntities(false, entities); + throws IOException, YarnException { + if (!timelineServiceV2) { + throw new YarnException("v.2 method is invoked on a v.1.x client"); + } + entityDispatcher.dispatchEntities(true, entities); } @Override public void putEntitiesAsync( org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) throws IOException, YarnException { - putEntities(true, entities); - } - - private void putEntities(boolean async, - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) - throws IOException, YarnException { if (!timelineServiceV2) { throw new YarnException("v.2 method is invoked on a v.1.x client"); } - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entitiesContainer = - new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); - for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) { - entitiesContainer.addEntity(entity); - } - MultivaluedMap<String, String> params = new MultivaluedMapImpl(); - if (getContextAppId() != null) { - params.add("appid", getContextAppId().toString()); - } - if (async) { - params.add("async", Boolean.TRUE.toString()); - } - putObjects("entities", params, entitiesContainer); + entityDispatcher.dispatchEntities(false, entities); } @Override @@ -407,20 +407,10 @@ public class TimelineClientImpl extends TimelineClient { // Used for new timeline service only @Private - public void putObjects(String path, MultivaluedMap<String, String> params, + protected void putObjects(String path, MultivaluedMap<String, String> params, Object obj) throws IOException, YarnException { - // timelineServiceAddress could haven't be initialized yet - // or stale (only for new timeline service) - int retries = pollTimelineServiceAddress(this.maxServiceRetries); - if (timelineServiceAddress == null) { - String errMessage = "TimelineClient has reached to max retry times : " - + this.maxServiceRetries - + ", but failed to fetch timeline service address. Please verify" - + " Timeline Auxillary Service is configured in all the NMs"; - LOG.error(errMessage); - throw new YarnException(errMessage); - } + int retries = verifyRestEndPointAvailable(); // timelineServiceAddress could be stale, add retry logic here. boolean needRetry = true; @@ -438,6 +428,21 @@ public class TimelineClientImpl extends TimelineClient { } } + private int verifyRestEndPointAvailable() throws YarnException { + // timelineServiceAddress could haven't be initialized yet + // or stale (only for new timeline service) + int retries = pollTimelineServiceAddress(this.maxServiceRetries); + if (timelineServiceAddress == null) { + String errMessage = "TimelineClient has reached to max retry times : " + + this.maxServiceRetries + + ", but failed to fetch timeline service address. Please verify" + + " Timeline Auxillary Service is configured in all the NMs"; + LOG.error(errMessage); + throw new YarnException(errMessage); + } + return retries; + } + /** * Check if reaching to maximum of retries. * @param retries @@ -643,7 +648,7 @@ public class TimelineClientImpl extends TimelineClient { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - timelineServiceAddress = getTimelineServiceAddress(); + // timelineServiceAddress = getTimelineServiceAddress(); retries--; } return retries; @@ -862,4 +867,213 @@ public class TimelineClientImpl extends TimelineClient { public void setTimelineWriter(TimelineWriter writer) { this.timelineWriter = writer; } + + private final class EntitiesHolder extends FutureTask<Void> { + private final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities; + private final boolean isSync; + + EntitiesHolder( + final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities, + final boolean isSync) { + super(new Callable<Void>() { + // publishEntities() + public Void call() throws Exception { + MultivaluedMap<String, String> params = new MultivaluedMapImpl(); + params.add("appid", contextAppId.toString()); + params.add("async", Boolean.toString(!isSync)); + putObjects("entities", params, entities); + return null; + } + }); + this.entities = entities; + this.isSync = isSync; + } + + public boolean isSync() { + return isSync; + } + + public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities getEntities() { + return entities; + } + } + + /** + * This class is responsible for collecting the timeline entities and + * publishing them in async. + */ + private class TimelineEntityDispatcher { + /** + * Time period for which the timelineclient will wait for draining after + * stop + */ + private static final long DRAIN_TIME_PERIOD = 2000L; + + private int numberOfAsyncsToMerge; + private final BlockingQueue<EntitiesHolder> timelineEntityQueue; + private ExecutorService executor; + + TimelineEntityDispatcher(Configuration conf) { + timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>(); + numberOfAsyncsToMerge = + conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, + YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE); + } + + Runnable createRunnable() { + return new Runnable() { + @Override + public void run() { + try { + EntitiesHolder entitiesHolder; + while (!Thread.currentThread().isInterrupted()) { + // Merge all the async calls and make one push, but if its sync + // call push immediately + try { + entitiesHolder = timelineEntityQueue.take(); + } catch (InterruptedException ie) { + LOG.info("Timeline dispatcher thread was interrupted "); + Thread.currentThread().interrupt(); + return; + } + if (entitiesHolder != null) { + publishWithoutBlockingOnQueue(entitiesHolder); + } + } + } finally { + if (!timelineEntityQueue.isEmpty()) { + LOG.info("Yet to publish " + timelineEntityQueue.size() + + " timelineEntities, draining them now. "); + } + // Try to drain the remaining entities to be published @ the max for + // 2 seconds + long timeTillweDrain = + System.currentTimeMillis() + DRAIN_TIME_PERIOD; + while (!timelineEntityQueue.isEmpty()) { + publishWithoutBlockingOnQueue(timelineEntityQueue.poll()); + if (System.currentTimeMillis() > timeTillweDrain) { + // time elapsed stop publishing further.... + if (!timelineEntityQueue.isEmpty()) { + LOG.warn("Time to drain elapsed! Remaining " + + timelineEntityQueue.size() + "timelineEntities will not" + + " be published"); + // if some entities were not drained then we need interrupt + // the threads which had put sync EntityHolders to the queue. + EntitiesHolder nextEntityInTheQueue = null; + while ((nextEntityInTheQueue = + timelineEntityQueue.poll()) != null) { + nextEntityInTheQueue.cancel(true); + } + } + break; + } + } + } + } + + /** + * Publishes the given EntitiesHolder and return immediately if sync + * call, else tries to fetch the EntitiesHolder from the queue in non + * blocking fashion and collate the Entities if possible before + * publishing through REST. + * + * @param entitiesHolder + */ + private void publishWithoutBlockingOnQueue( + EntitiesHolder entitiesHolder) { + if (entitiesHolder.isSync()) { + entitiesHolder.run(); + return; + } + int count = 1; + while (true) { + // loop till we find a sync put Entities or there is nothing + // to take + EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll(); + if (nextEntityInTheQueue == null) { + // Nothing in the queue just publish and get back to the + // blocked wait state + entitiesHolder.run(); + break; + } else if (nextEntityInTheQueue.isSync()) { + // flush all the prev async entities first + entitiesHolder.run(); + // and then flush the sync entity + nextEntityInTheQueue.run(); + break; + } else { + // append all async entities together and then flush + entitiesHolder.getEntities().addEntities( + nextEntityInTheQueue.getEntities().getEntities()); + count++; + if (count == numberOfAsyncsToMerge) { + // Flush the entities if the number of the async + // putEntites merged reaches the desired limit. To avoid + // collecting multiple entities and delaying for a long + // time. + entitiesHolder.run(); + break; + } + } + } + } + }; + } + + public void dispatchEntities(boolean sync, + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] entitiesTobePublished) + throws YarnException { + if (executor.isShutdown()) { + throw new YarnException("Timeline client is in the process of stopping," + + " not accepting any more TimelineEntities"); + } + + // wrap all TimelineEntity into TimelineEntities object + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities entities = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); + for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entitiesTobePublished) { + entities.addEntity(entity); + } + + // created a holder and place it in queue + EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); + try { + timelineEntityQueue.put(entitiesHolder); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException( + "Failed while adding entity to the queue for publishing", e); + } + + if (sync) { + // In sync call we need to wait till its published and if any error then + // throw it back + try { + entitiesHolder.get(); + } catch (ExecutionException e) { + throw new YarnException("Failed while publishing entity", + e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while publishing entity", e); + } + } + } + + public void start() { + executor = Executors.newSingleThreadExecutor(); + executor.execute(createRunnable()); + } + + public void stop() { + LOG.info("Stopping TimelineClient."); + executor.shutdownNow(); + try { + executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index b521599..2cbc836 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2081,6 +2081,13 @@ <value>1000</value> </property> + <property> + <description>Time line V2 client tries to merge these many number of + async entities (if available) and then call the REST ATS V2 API to submit. + </description> + <name>yarn.timeline-service.timeline-client.number-of-async-entities-to-merge</name> + <value>10</value> + </property> <!-- Shared Cache Configuration --> <property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java new file mode 100644 index 0000000..7803f94 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestTimelineClientV2Impl { + private static final Log LOG = + LogFactory.getLog(TestTimelineClientV2Impl.class); + private TestV2TimelineClient client; + private static long TIME_TO_SLEEP = 150; + + @Before + public void setup() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3); + client = createTimelineClient(conf); + } + + private TestV2TimelineClient createTimelineClient(YarnConfiguration conf) { + ApplicationId id = ApplicationId.newInstance(0, 0); + TestV2TimelineClient client = new TestV2TimelineClient(id); + client.init(conf); + client.start(); + return client; + } + + private class TestV2TimelineClient extends TimelineClientImpl { + private boolean sleepBeforeReturn; + private boolean throwException; + + private List<TimelineEntities> publishedEntities; + + public TimelineEntities getPublishedEntities(int putIndex) { + Assert.assertTrue("Not So many entities Published", + putIndex < publishedEntities.size()); + return publishedEntities.get(putIndex); + } + + public void setSleepBeforeReturn(boolean sleepBeforeReturn) { + this.sleepBeforeReturn = sleepBeforeReturn; + } + + public void setThrowException(boolean throwException) { + this.throwException = throwException; + } + + public int getNumOfTimelineEntitiesPublished() { + return publishedEntities.size(); + } + + public TestV2TimelineClient(ApplicationId id) { + super(id); + publishedEntities = new ArrayList<TimelineEntities>(); + } + + protected void putObjects(String path, + MultivaluedMap<String, String> params, Object obj) + throws IOException, YarnException { + if (throwException) { + throw new YarnException("ActualException"); + } + publishedEntities.add((TimelineEntities) obj); + if (sleepBeforeReturn) { + try { + Thread.sleep(TIME_TO_SLEEP); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + @Test + public void testPostEntities() throws Exception { + try { + client.putEntities(generateEntity("1")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testASyncCallMerge() throws Exception { + client.setSleepBeforeReturn(true); + try { + client.putEntitiesAsync(generateEntity("1")); + Thread.sleep(TIME_TO_SLEEP / 2); + // by the time first put response comes push 2 entities in the queue + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + for (int i = 0; i < 4; i++) { + if (client.getNumOfTimelineEntitiesPublished() == 2) { + break; + } + Thread.sleep(TIME_TO_SLEEP); + } + Assert.assertEquals("two merged TimelineEntities needs to be published", 2, + client.getNumOfTimelineEntitiesPublished()); + TimelineEntities secondPublishedEntities = client.getPublishedEntities(1); + Assert.assertEquals( + "Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2, + secondPublishedEntities.getEntities().size()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "2", + secondPublishedEntities.getEntities().get(0).getId()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "3", + secondPublishedEntities.getEntities().get(1).getId()); + } + + @Test + public void testSyncCall() throws Exception { + try { + // sync entity should not be be merged with Async + client.putEntities(generateEntity("1")); + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + // except for the sync call above 2 should be merged + client.putEntities(generateEntity("4")); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + for (int i = 0; i < 4; i++) { + if (client.getNumOfTimelineEntitiesPublished() == 3) { + break; + } + Thread.sleep(TIME_TO_SLEEP); + } + printReceivedEntities(); + Assert.assertEquals("TimelineEntities not published as desired", 3, + client.getNumOfTimelineEntitiesPublished()); + TimelineEntities firstPublishedEntities = client.getPublishedEntities(0); + Assert.assertEquals("sync entities should not be merged with async", 1, + firstPublishedEntities.getEntities().size()); + + // test before pushing the sync entities asyncs are merged and pushed + TimelineEntities secondPublishedEntities = client.getPublishedEntities(1); + Assert.assertEquals( + "async entities should be merged before publishing sync", 2, + secondPublishedEntities.getEntities().size()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "2", + secondPublishedEntities.getEntities().get(0).getId()); + Assert.assertEquals("Order of Async Events Needs to be FIFO", "3", + secondPublishedEntities.getEntities().get(1).getId()); + + // test the last entity published is sync put + TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2); + Assert.assertEquals("sync entities had to be published at the last", 1, + thirdPublishedEntities.getEntities().size()); + Assert.assertEquals("Expected last sync Event is not proper", "4", + thirdPublishedEntities.getEntities().get(0).getId()); + } + + @Test + public void testExceptionCalls() throws Exception { + client.setThrowException(true); + try { + client.putEntitiesAsync(generateEntity("1")); + } catch (YarnException e) { + Assert.fail("Async calls are not expected to throw exception"); + } + + try { + client.putEntities(generateEntity("2")); + Assert.fail("Sync calls are expected to throw exception"); + } catch (YarnException e) { + Assert.assertEquals("Same exception needs to be thrown", + "ActualException", e.getCause().getMessage()); + } + } + + @Test + public void testConfigurableNumberOfMerges() throws Exception { + client.setSleepBeforeReturn(true); + try { + // At max 3 entities need to be merged + client.putEntitiesAsync(generateEntity("1")); + client.putEntitiesAsync(generateEntity("2")); + client.putEntitiesAsync(generateEntity("3")); + client.putEntitiesAsync(generateEntity("4")); + client.putEntities(generateEntity("5")); + client.putEntitiesAsync(generateEntity("6")); + client.putEntitiesAsync(generateEntity("7")); + client.putEntitiesAsync(generateEntity("8")); + client.putEntitiesAsync(generateEntity("9")); + client.putEntitiesAsync(generateEntity("10")); + } catch (YarnException e) { + Assert.fail("No exception expected"); + } + // not having the same logic here as it doesn't depend on how many times + // events are published. + Thread.sleep(2 * TIME_TO_SLEEP); + printReceivedEntities(); + for (TimelineEntities publishedEntities : client.publishedEntities) { + Assert.assertTrue( + "Number of entities should not be greater than 3 for each publish," + + " but was " + publishedEntities.getEntities().size(), + publishedEntities.getEntities().size() <= 3); + } + } + + @Test + public void testAfterStop() throws Exception { + client.setSleepBeforeReturn(true); + try { + // At max 3 entities need to be merged + client.putEntities(generateEntity("1")); + for (int i = 2; i < 20; i++) { + client.putEntitiesAsync(generateEntity("" + i)); + } + client.stop(); + try { + client.putEntitiesAsync(generateEntity("50")); + Assert.fail("Exception expected"); + } catch (YarnException e) { + // expected + } + } catch (YarnException e) { + Assert.fail("No exception expected"); + } + // not having the same logic here as it doesn't depend on how many times + // events are published. + for (int i = 0; i < 5; i++) { + TimelineEntities publishedEntities = + client.publishedEntities.get(client.publishedEntities.size() - 1); + TimelineEntity timelineEntity = publishedEntities.getEntities() + .get(publishedEntities.getEntities().size() - 1); + if (!timelineEntity.getId().equals("19")) { + Thread.sleep(2 * TIME_TO_SLEEP); + } + } + printReceivedEntities(); + TimelineEntities publishedEntities = + client.publishedEntities.get(client.publishedEntities.size() - 1); + TimelineEntity timelineEntity = publishedEntities.getEntities() + .get(publishedEntities.getEntities().size() - 1); + Assert.assertEquals("", "19", timelineEntity.getId()); + } + + private void printReceivedEntities() { + for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) { + TimelineEntities publishedEntities = client.getPublishedEntities(i); + StringBuilder entitiesPerPublish = new StringBuilder(); + ; + for (TimelineEntity entity : publishedEntities.getEntities()) { + entitiesPerPublish.append(entity.getId()); + entitiesPerPublish.append(","); + } + LOG.info("Entities Published @ index " + i + " : " + + entitiesPerPublish.toString()); + } + } + + private static TimelineEntity generateEntity(String id) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(id); + entity.setType("testEntity"); + entity.setCreatedTime(System.currentTimeMillis()); + return entity; + } + + @After + public void tearDown() { + if (client != null) { + client.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d491ef08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 544a6f5..20ca7f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -876,7 +876,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Map<ApplicationId, String> knownCollectorsMap = response.getAppCollectorsMap(); if (knownCollectorsMap == null) { - LOG.warn("the collectors map is null"); + if (LOG.isDebugEnabled()) { + LOG.debug("No collectors to update RM"); + } } else { Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = knownCollectorsMap.entrySet();