YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4fa1afdb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4fa1afdb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4fa1afdb Branch: refs/heads/trunk Commit: 4fa1afdb883dab8786d2fb5c72a195dd2e87d711 Parents: 5690b51 Author: Sangjin Lee <sj...@apache.org> Authored: Thu Feb 16 11:41:04 2017 -0800 Committer: Sangjin Lee <sj...@apache.org> Committed: Thu Feb 16 11:41:04 2017 -0800 ---------------------------------------------------------------------- .../jobhistory/JobHistoryEventHandler.java | 57 +- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 14 +- .../v2/app/rm/RMContainerAllocator.java | 4 +- .../jobhistory/TestJobHistoryEventHandler.java | 8 +- .../distributedshell/ApplicationMaster.java | 98 ++- .../hadoop/yarn/client/api/AMRMClient.java | 40 +- .../yarn/client/api/async/AMRMClientAsync.java | 21 +- .../api/async/impl/AMRMClientAsyncImpl.java | 5 +- .../yarn/client/api/impl/YarnClientImpl.java | 15 +- .../hadoop/yarn/client/api/TimelineClient.java | 94 +-- .../yarn/client/api/TimelineV2Client.java | 92 +++ .../client/api/impl/TimelineClientImpl.java | 825 ++----------------- .../yarn/client/api/impl/TimelineConnector.java | 440 ++++++++++ .../client/api/impl/TimelineV2ClientImpl.java | 459 +++++++++++ .../client/api/impl/TestTimelineClient.java | 39 +- .../api/impl/TestTimelineClientV2Impl.java | 4 +- .../timelineservice/NMTimelinePublisher.java | 22 +- .../TestNMTimelinePublisher.java | 10 +- .../TestTimelineServiceClientIntegration.java | 10 +- 19 files changed, 1272 insertions(+), 985 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 0cc605c..285d36e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -72,13 +72,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; @@ -90,8 +89,6 @@ import com.sun.jersey.api.client.ClientHandlerException; */ public class JobHistoryEventHandler extends AbstractService implements EventHandler<JobHistoryEvent> { - private static final JsonNodeFactory FACTORY = - new ObjectMapper().getNodeFactory(); private final AppContext context; private final int startCount; @@ -133,9 +130,10 @@ public class JobHistoryEventHandler extends AbstractService // should job completion be force when the AM shuts down? protected volatile boolean forceJobCompletion = false; + @VisibleForTesting protected TimelineClient timelineClient; - - private boolean timelineServiceV2Enabled = false; + @VisibleForTesting + protected TimelineV2Client timelineV2Client; private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB"; private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK"; @@ -268,12 +266,17 @@ public class JobHistoryEventHandler extends AbstractService MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) { LOG.info("Emitting job history data to the timeline service is enabled"); if (YarnConfiguration.timelineServiceEnabled(conf)) { - - timelineClient = - ((MRAppMaster.RunningAppContext)context).getTimelineClient(); - timelineClient.init(conf); - timelineServiceV2Enabled = - YarnConfiguration.timelineServiceV2Enabled(conf); + boolean timelineServiceV2Enabled = + ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2); + if(timelineServiceV2Enabled) { + timelineV2Client = + ((MRAppMaster.RunningAppContext)context).getTimelineV2Client(); + timelineV2Client.init(conf); + } else { + timelineClient = + ((MRAppMaster.RunningAppContext) context).getTimelineClient(); + timelineClient.init(conf); + } LOG.info("Timeline service is enabled; version: " + YarnConfiguration.getTimelineServiceVersion(conf)); } else { @@ -324,6 +327,8 @@ public class JobHistoryEventHandler extends AbstractService protected void serviceStart() throws Exception { if (timelineClient != null) { timelineClient.start(); + } else if (timelineV2Client != null) { + timelineV2Client.start(); } eventHandlingThread = new Thread(new Runnable() { @Override @@ -448,6 +453,8 @@ public class JobHistoryEventHandler extends AbstractService } if (timelineClient != null) { timelineClient.stop(); + } else if (timelineV2Client != null) { + timelineV2Client.stop(); } LOG.info("Stopped JobHistoryEventHandler. super.stop()"); super.serviceStop(); @@ -605,14 +612,12 @@ public class JobHistoryEventHandler extends AbstractService } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); - if (timelineClient != null) { - if (timelineServiceV2Enabled) { - processEventForNewTimelineService(historyEvent, event.getJobID(), - event.getTimestamp()); - } else { - processEventForTimelineServer(historyEvent, event.getJobID(), - event.getTimestamp()); - } + if (timelineV2Client != null) { + processEventForNewTimelineService(historyEvent, event.getJobID(), + event.getTimestamp()); + } else if (timelineClient != null) { + processEventForTimelineServer(historyEvent, event.getJobID(), + event.getTimestamp()); } if (LOG.isDebugEnabled()) { LOG.debug("In HistoryEventHandler " @@ -1162,8 +1167,8 @@ public class JobHistoryEventHandler extends AbstractService configSize += size; if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) { if (jobEntityForConfigs.getConfigs().size() > 0) { - timelineClient.putEntities(jobEntityForConfigs); - timelineClient.putEntities(appEntityForConfigs); + timelineV2Client.putEntities(jobEntityForConfigs); + timelineV2Client.putEntities(appEntityForConfigs); jobEntityForConfigs = createJobEntity(jobId); appEntityForConfigs = new ApplicationEntity(); appEntityForConfigs.setId(appId); @@ -1174,8 +1179,8 @@ public class JobHistoryEventHandler extends AbstractService appEntityForConfigs.addConfig(entry.getKey(), entry.getValue()); } if (configSize > 0) { - timelineClient.putEntities(jobEntityForConfigs); - timelineClient.putEntities(appEntityForConfigs); + timelineV2Client.putEntities(jobEntityForConfigs); + timelineV2Client.putEntities(appEntityForConfigs); } } catch (IOException | YarnException e) { LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " + @@ -1295,9 +1300,9 @@ public class JobHistoryEventHandler extends AbstractService } try { if (appEntityWithJobMetrics == null) { - timelineClient.putEntitiesAsync(tEntity); + timelineV2Client.putEntitiesAsync(tEntity); } else { - timelineClient.putEntities(tEntity, appEntityWithJobMetrics); + timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics); } } catch (IOException | YarnException e) { LOG.error("Failed to process Event " + event.getEventType() http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 835c0aa..12df83d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.crypto.KeyGenerator; + import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -141,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -154,8 +157,6 @@ import org.apache.hadoop.yarn.util.SystemClock; import com.google.common.annotations.VisibleForTesting; -import javax.crypto.KeyGenerator; - /** * The Map-Reduce Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -1066,6 +1067,7 @@ public class MRAppMaster extends CompositeService { private final ClusterInfo clusterInfo = new ClusterInfo(); private final ClientToAMTokenSecretManager clientToAMTokenSecretManager; private TimelineClient timelineClient = null; + private TimelineV2Client timelineV2Client = null; private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor; @@ -1081,7 +1083,7 @@ public class MRAppMaster extends CompositeService { if (YarnConfiguration.timelineServiceV2Enabled(conf)) { // create new version TimelineClient - timelineClient = TimelineClient.createTimelineClient( + timelineV2Client = TimelineV2Client.createTimelineClient( appAttemptID.getApplicationId()); } else { timelineClient = TimelineClient.createTimelineClient(); @@ -1177,10 +1179,14 @@ public class MRAppMaster extends CompositeService { return taskAttemptFinishingMonitor; } - // Get Timeline Collector's address (get sync from RM) public TimelineClient getTimelineClient() { return timelineClient; } + + // Get Timeline Collector's address (get sync from RM) + public TimelineV2Client getTimelineV2Client() { + return timelineV2Client; + } } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 31bc380..1f88a2c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -882,8 +882,8 @@ public class RMContainerAllocator extends RMContainerRequestor MRAppMaster.RunningAppContext appContext = (MRAppMaster.RunningAppContext)this.getContext(); if (collectorAddr != null && !collectorAddr.isEmpty() - && appContext.getTimelineClient() != null) { - appContext.getTimelineClient().setTimelineServiceAddress( + && appContext.getTimelineV2Client() != null) { + appContext.getTimelineV2Client().setTimelineServiceAddress( response.getCollectorAddr()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 0b33d6b..6c5e604 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -29,8 +29,8 @@ import static org.mockito.Mockito.when; import java.io.File; import java.io.FileOutputStream; -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import org.apache.commons.logging.Log; @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.MiniYARNCluster; @@ -829,6 +830,9 @@ public class TestJobHistoryEventHandler { if (mockContext instanceof RunningAppContext) { when(((RunningAppContext)mockContext).getTimelineClient()). thenReturn(TimelineClient.createTimelineClient()); + when(((RunningAppContext) mockContext).getTimelineV2Client()) + .thenReturn(TimelineV2Client + .createTimelineClient(ApplicationId.newInstance(0, 1))); } return mockContext; } @@ -937,6 +941,8 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { protected void serviceStart() { if (timelineClient != null) { timelineClient.start(); + } else if (timelineV2Client != null) { + timelineV2Client.start(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 5a06ef6..4daebb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; @@ -219,7 +220,9 @@ public class ApplicationMaster { // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; - private boolean timelineServiceV2 = false; + private boolean timelineServiceV2Enabled = false; + + private boolean timelineServiceV1Enabled = false; // App Master configuration // No. of containers to run shell command on @@ -293,6 +296,10 @@ public class ApplicationMaster { // Timeline Client @VisibleForTesting TimelineClient timelineClient; + + // Timeline v2 Client + private TimelineV2Client timelineV2Client; + static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; static final String APPID_TIMELINE_FILTER_NAME = "appId"; static final String USER_TIMELINE_FILTER_NAME = "user"; @@ -556,9 +563,12 @@ public class ApplicationMaster { "container_retry_interval", "0")); if (YarnConfiguration.timelineServiceEnabled(conf)) { - timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf); + timelineServiceV2Enabled = + ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2); + timelineServiceV1Enabled = !timelineServiceV2Enabled; } else { timelineClient = null; + timelineV2Client = null; LOG.warn("Timeline service is not enabled"); } @@ -621,18 +631,17 @@ public class ApplicationMaster { nmClientAsync.start(); startTimelineClient(conf); - if (timelineServiceV2) { + if (timelineServiceV2Enabled) { // need to bind timelineClient - amRMClient.registerTimelineClient(timelineClient); + amRMClient.registerTimelineV2Client(timelineV2Client); } - if(timelineClient != null) { - if (timelineServiceV2) { - publishApplicationAttemptEventOnTimelineServiceV2( - DSEvent.DS_APP_ATTEMPT_START); - } else { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); - } + + if (timelineServiceV2Enabled) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_START); + } else if (timelineServiceV1Enabled) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); } // Setup local RPC Server to accept status requests directly from clients @@ -704,18 +713,21 @@ public class ApplicationMaster { public Void run() throws Exception { if (YarnConfiguration.timelineServiceEnabled(conf)) { // Creating the Timeline Client - if (timelineServiceV2) { - timelineClient = TimelineClient.createTimelineClient( + if (timelineServiceV2Enabled) { + timelineV2Client = TimelineV2Client.createTimelineClient( appAttemptID.getApplicationId()); + timelineV2Client.init(conf); + timelineV2Client.start(); LOG.info("Timeline service V2 client is enabled"); } else { timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); LOG.info("Timeline service V1 client is enabled"); } - timelineClient.init(conf); - timelineClient.start(); } else { timelineClient = null; + timelineV2Client = null; LOG.warn("Timeline service is not enabled"); } return null; @@ -741,14 +753,12 @@ public class ApplicationMaster { } catch (InterruptedException ex) {} } - if (timelineClient != null) { - if (timelineServiceV2) { - publishApplicationAttemptEventOnTimelineServiceV2( - DSEvent.DS_APP_ATTEMPT_END); - } else { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); - } + if (timelineServiceV2Enabled) { + publishApplicationAttemptEventOnTimelineServiceV2( + DSEvent.DS_APP_ATTEMPT_END); + } else if (timelineServiceV1Enabled) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } // Join all launched threads @@ -797,8 +807,10 @@ public class ApplicationMaster { amRMClient.stop(); // Stop Timeline Client - if(timelineClient != null) { + if(timelineServiceV1Enabled) { timelineClient.stop(); + } else if (timelineServiceV2Enabled) { + timelineV2Client.stop(); } return success; @@ -853,16 +865,14 @@ public class ApplicationMaster { LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); } - if(timelineClient != null) { - if (timelineServiceV2) { - publishContainerEndEventOnTimelineServiceV2(containerStatus); - } else { - publishContainerEndEvent( - timelineClient, containerStatus, domainId, appSubmitterUgi); - } + if (timelineServiceV2Enabled) { + publishContainerEndEventOnTimelineServiceV2(containerStatus); + } else if (timelineServiceV1Enabled) { + publishContainerEndEvent(timelineClient, containerStatus, domainId, + appSubmitterUgi); } } - + // ask for more containers if any failed int askCount = numTotalContainers - numRequestedContainers.get(); numRequestedContainers.addAndGet(askCount); @@ -983,15 +993,13 @@ public class ApplicationMaster { applicationMaster.nmClientAsync.getContainerStatusAsync( containerId, container.getNodeId()); } - if(applicationMaster.timelineClient != null) { - if (applicationMaster.timelineServiceV2) { - applicationMaster.publishContainerStartEventOnTimelineServiceV2( - container); - } else { - applicationMaster.publishContainerStartEvent( - applicationMaster.timelineClient, container, - applicationMaster.domainId, applicationMaster.appSubmitterUgi); - } + if (applicationMaster.timelineServiceV2Enabled) { + applicationMaster + .publishContainerStartEventOnTimelineServiceV2(container); + } else if (applicationMaster.timelineServiceV1Enabled) { + applicationMaster.publishContainerStartEvent( + applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); } } @@ -1371,7 +1379,7 @@ public class ApplicationMaster { appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntities(entity); + timelineV2Client.putEntities(entity); return null; } }); @@ -1404,7 +1412,7 @@ public class ApplicationMaster { appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntities(entity); + timelineV2Client.putEntities(entity); return null; } }); @@ -1438,7 +1446,7 @@ public class ApplicationMaster { appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public TimelinePutResponse run() throws Exception { - timelineClient.putEntitiesAsync(entity); + timelineV2Client.putEntitiesAsync(entity); return null; } }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 15d0065..69f3777 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -41,12 +42,13 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; -import org.apache.hadoop.yarn.util.resource.Resources; @InterfaceAudience.Public @InterfaceStability.Stable @@ -54,7 +56,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClient.class); - private TimelineClient timelineClient; + private TimelineV2Client timelineV2Client; + private boolean timelineServiceV2Enabled; /** * Create a new instance of AMRMClient. @@ -79,6 +82,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends nmTokenCache = NMTokenCache.getSingleton(); } + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf); + } + /** * Object to represent a single container request for resources. Scheduler * documentation should be consulted for the specifics of how the parameters @@ -682,19 +691,30 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends } /** - * Register TimelineClient to AMRMClient. - * @param client the timeline client to register + * Register TimelineV2Client to AMRMClient. Writer's address for the timeline + * V2 client will be updated dynamically if registered. + * + * @param client the timeline v2 client to register + * @throws YarnException when this method is invoked even when ATS V2 is not + * configured. */ - public void registerTimelineClient(TimelineClient client) { - this.timelineClient = client; + public void registerTimelineV2Client(TimelineV2Client client) + throws YarnException { + if (timelineServiceV2Enabled) { + timelineV2Client = client; + } else { + LOG.error("Trying to register timeline v2 client when not configured."); + throw new YarnException( + "register timeline v2 client when not configured."); + } } /** - * Get registered timeline client. - * @return the registered timeline client + * Get registered timeline v2 client. + * @return the registered timeline v2 client */ - public TimelineClient getRegisteredTimelineClient() { - return this.timelineClient; + public TimelineV2Client getRegisteredTimelineV2Client() { + return this.timelineV2Client; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 4cb27cd..1ecfe1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.client.api.async; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -29,8 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; @@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; -import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; /** * <code>AMRMClientAsync</code> handles communication with the ResourceManager @@ -346,17 +346,20 @@ extends AbstractService { /** * Register TimelineClient to AMRMClient. * @param timelineClient + * @throws YarnException when this method is invoked even when ATS V2 is not + * configured. */ - public void registerTimelineClient(TimelineClient timelineClient) { - client.registerTimelineClient(timelineClient); + public void registerTimelineV2Client(TimelineV2Client timelineClient) + throws YarnException { + client.registerTimelineV2Client(timelineClient); } /** * Get registered timeline client. * @return the registered timeline client */ - public TimelineClient getRegisteredTimelineClient() { - return client.getRegisteredTimelineClient(); + public TimelineV2Client getRegisteredTimelineV2Client() { + return client.getRegisteredTimelineV2Client(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 9e2c0e5..6711da2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; @@ -326,7 +326,8 @@ extends AMRMClientAsync<T> { AllocateResponse response = (AllocateResponse) object; String collectorAddress = response.getCollectorAddr(); - TimelineClient timelineClient = client.getRegisteredTimelineClient(); + TimelineV2Client timelineClient = + client.getRegisteredTimelineV2Client(); if (timelineClient != null && collectorAddress != null && !collectorAddress.isEmpty()) { if (collectorAddr == null http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index e406862..4a27fee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient { Text timelineService; @VisibleForTesting String timelineDTRenewer; - protected boolean timelineServiceEnabled; + private boolean timelineV1ServiceEnabled; protected boolean timelineServiceBestEffort; private static final String ROOT = "root"; @@ -167,9 +167,14 @@ public class YarnClientImpl extends YarnClient { YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); } + float timelineServiceVersion = + conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { - timelineServiceEnabled = true; + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) + && ((Float.compare(timelineServiceVersion, 1.0f) == 0) + || (Float.compare(timelineServiceVersion, 1.5f) == 0))) { + timelineV1ServiceEnabled = true; timelineDTRenewer = getTimelineDelegationTokenRenewer(conf); timelineService = TimelineUtils.buildTimelineTokenService(conf); } @@ -178,7 +183,7 @@ public class YarnClientImpl extends YarnClient { // TimelineServer which means we are able to get history information // for applications/applicationAttempts/containers by using ahsClient // when the TimelineServer is running. - if (timelineServiceEnabled || conf.getBoolean( + if (timelineV1ServiceEnabled || conf.getBoolean( YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { historyServiceEnabled = true; @@ -257,7 +262,7 @@ public class YarnClientImpl extends YarnClient { // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled - if (isSecurityEnabled() && timelineServiceEnabled) { + if (isSecurityEnabled() && timelineV1ServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index cc76718..4835239 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -21,14 +21,12 @@ package org.apache.hadoop.yarn.client.api; import java.io.Flushable; import java.io.IOException; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; @@ -39,24 +37,22 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; /** * A client library that can be used to post some information in terms of a - * number of conceptual entities. + * number of conceptual entities. This client library needs to be used along + * with Timeline V.1.x server versions. + * Refer {@link TimelineV2Client} for ATS V2 interface. */ @Public @Evolving -public abstract class TimelineClient extends AbstractService implements +public abstract class TimelineClient extends CompositeService implements Flushable { /** - * Create a timeline client. The current UGI when the user initialize the - * client will be used to do the put and the delegation token operations. The - * current user may use {@link UserGroupInformation#doAs} another user to - * construct and initialize a timeline client if the following operations are - * supposed to be conducted by that user. - */ - private ApplicationId contextAppId; - - /** * Creates an instance of the timeline v.1.x client. + * The current UGI when the user initialize the client will be used to do the + * put and the delegation token operations. The current user may use + * {@link UserGroupInformation#doAs} another user to construct and initialize + * a timeline client if the following operations are supposed to be conducted + * by that user. * * @return the created timeline client instance */ @@ -66,23 +62,8 @@ public abstract class TimelineClient extends AbstractService implements return client; } - /** - * Creates an instance of the timeline v.2 client. - * - * @param appId the application id with which the timeline client is - * associated - * @return the created timeline client instance - */ - @Public - public static TimelineClient createTimelineClient(ApplicationId appId) { - TimelineClient client = new TimelineClientImpl(appId); - return client; - } - - @Private - protected TimelineClient(String name, ApplicationId appId) { + protected TimelineClient(String name) { super(name); - setContextAppId(appId); } /** @@ -207,57 +188,4 @@ public abstract class TimelineClient extends AbstractService implements public abstract void cancelDelegationToken( Token<TimelineDelegationTokenIdentifier> timelineDT) throws IOException, YarnException; - - /** - * <p> - * Send the information of a number of conceptual entities to the timeline - * service v.2 collector. It is a blocking API. The method will not return - * until all the put entities have been persisted. If this method is invoked - * for a non-v.2 timeline client instance, a YarnException is thrown. - * </p> - * - * @param entities the collection of {@link - * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * @throws IOException - * @throws YarnException - */ - @Public - public abstract void putEntities( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException; - - /** - * <p> - * Send the information of a number of conceptual entities to the timeline - * service v.2 collector. It is an asynchronous API. The method will return - * once all the entities are received. If this method is invoked for a - * non-v.2 timeline client instance, a YarnException is thrown. - * </p> - * - * @param entities the collection of {@link - * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * @throws IOException - * @throws YarnException - */ - @Public - public abstract void putEntitiesAsync( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException; - - /** - * <p> - * Update the timeline service address where the request will be sent to. - * </p> - * @param address - * the timeline service address - */ - public abstract void setTimelineServiceAddress(String address); - - protected ApplicationId getContextAppId() { - return contextAppId; - } - - protected void setContextAppId(ApplicationId appId) { - this.contextAppId = appId; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java new file mode 100644 index 0000000..32cf1e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * A client library that can be used to post some information in terms of a + * number of conceptual entities. This client library needs to be used along + * with time line v.2 server version. + * Refer {@link TimelineClient} for ATS V1 interface. + */ +public abstract class TimelineV2Client extends CompositeService { + /** + * Creates an instance of the timeline v.2 client. + * + * @param appId the application id with which the timeline client is + * associated + * @return the created timeline client instance + */ + @Public + public static TimelineV2Client createTimelineClient(ApplicationId appId) { + TimelineV2Client client = new TimelineV2ClientImpl(appId); + return client; + } + + protected TimelineV2Client(String name) { + super(name); + } + + /** + * <p> + * Send the information of a number of conceptual entities to the timeline + * service v.2 collector. It is a blocking API. The method will not return + * until all the put entities have been persisted. + * </p> + * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putEntities(TimelineEntity... entities) + throws IOException, YarnException; + + /** + * <p> + * Send the information of a number of conceptual entities to the timeline + * service v.2 collector. It is an asynchronous API. The method will return + * once all the entities are received. + * </p> + * + * @param entities the collection of {@link TimelineEntity} + * @throws IOException if there are I/O errors + * @throws YarnException if entities are incomplete/invalid + */ + @Public + public abstract void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException; + + /** + * <p> + * Update the timeline service address where the request will be sent to. + * </p> + * + * @param address the timeline service address + */ + public abstract void setTimelineServiceAddress(String address); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 4506c48..f49618b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -20,32 +20,10 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.File; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.ConnectException; -import java.net.HttpURLConnection; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; import java.net.URI; -import java.net.URL; -import java.net.URLConnection; -import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSocketFactory; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; - -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -57,16 +35,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; -import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; -import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; -import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; -import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -79,19 +50,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientRequest; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; -import com.sun.jersey.core.util.MultivaluedMapImpl; @Private @Evolving @@ -100,9 +61,6 @@ public class TimelineClientImpl extends TimelineClient { private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/"; - private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; - private static final Joiner JOINER = Joiner.on(""); - public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute private static Options opts; private static final String ENTITY_DATA_TYPE = "entity"; @@ -117,179 +75,38 @@ public class TimelineClientImpl extends TimelineClient { opts.addOption("help", false, "Print usage"); } - private Client client; - private ConnectionConfigurator connConfigurator; - private DelegationTokenAuthenticator authenticator; - private DelegationTokenAuthenticatedURL.Token token; - private UserGroupInformation authUgi; - private String doAsUser; - private Configuration configuration; - private float timelineServiceVersion; - private TimelineWriter timelineWriter; - private SSLFactory sslFactory; - - private volatile String timelineServiceAddress; - - // Retry parameters for identifying new timeline service - // TODO consider to merge with connection retry - private int maxServiceRetries; - private long serviceRetryInterval; - private boolean timelineServiceV2 = false; - - @Private @VisibleForTesting - TimelineClientConnectionRetry connectionRetry; - - private TimelineEntityDispatcher entityDispatcher; - - // Abstract class for an operation that should be retried by timeline client - @Private + protected DelegationTokenAuthenticatedURL.Token token; @VisibleForTesting - public static abstract class TimelineClientRetryOp { - // The operation that should be retried - public abstract Object run() throws IOException; - // The method to indicate if we should retry given the incoming exception - public abstract boolean shouldRetryOn(Exception e); - } - - // Class to handle retry - // Outside this class, only visible to tests - @Private + protected UserGroupInformation authUgi; @VisibleForTesting - static class TimelineClientConnectionRetry { - - // maxRetries < 0 means keep trying - @Private - @VisibleForTesting - public int maxRetries; - - @Private - @VisibleForTesting - public long retryInterval; - - // Indicates if retries happened last time. Only tests should read it. - // In unit tests, retryOn() calls should _not_ be concurrent. - private boolean retried = false; + protected String doAsUser; - @Private - @VisibleForTesting - boolean getRetired() { - return retried; - } - - // Constructor with default retry settings - public TimelineClientConnectionRetry(Configuration conf) { - Preconditions.checkArgument(conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1, - "%s property value should be greater than or equal to -1", - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - Preconditions - .checkArgument( - conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0, - "%s property value should be greater than zero", - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - maxRetries = conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - retryInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - } - - public Object retryOn(TimelineClientRetryOp op) - throws RuntimeException, IOException { - int leftRetries = maxRetries; - retried = false; - - // keep trying - while (true) { - try { - // try perform the op, if fail, keep retrying - return op.run(); - } catch (IOException | RuntimeException e) { - // break if there's no retries left - if (leftRetries == 0) { - break; - } - if (op.shouldRetryOn(e)) { - logException(e, leftRetries); - } else { - throw e; - } - } - if (leftRetries > 0) { - leftRetries--; - } - retried = true; - try { - // sleep for the given time interval - Thread.sleep(retryInterval); - } catch (InterruptedException ie) { - LOG.warn("Client retry sleep interrupted! "); - } - } - throw new RuntimeException("Failed to connect to timeline server. " - + "Connection retries limit exceeded. " - + "The posted timeline event may be missing"); - }; - - private void logException(Exception e, int leftRetries) { - if (leftRetries > 0) { - LOG.info("Exception caught by TimelineClientConnectionRetry," - + " will try " + leftRetries + " more time(s).\nMessage: " - + e.getMessage()); - } else { - // note that maxRetries may be -1 at the very beginning - LOG.info("ConnectionException caught by TimelineClientConnectionRetry," - + " will keep retrying.\nMessage: " - + e.getMessage()); - } - } - } + private float timelineServiceVersion; + private TimelineWriter timelineWriter; - private class TimelineJerseyRetryFilter extends ClientFilter { - @Override - public ClientResponse handle(final ClientRequest cr) - throws ClientHandlerException { - // Set up the retry operation - TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() { - @Override - public Object run() { - // Try pass the request, if fail, keep retrying - return getNext().handle(cr); - } + private String timelineServiceAddress; - @Override - public boolean shouldRetryOn(Exception e) { - // Only retry on connection exceptions - return (e instanceof ClientHandlerException) - && (e.getCause() instanceof ConnectException || - e.getCause() instanceof SocketTimeoutException); - } - }; - try { - return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); - } catch (IOException e) { - throw new ClientHandlerException("Jersey retry failed!\nMessage: " - + e.getMessage()); - } - } - } + @Private + @VisibleForTesting + TimelineConnector connector; public TimelineClientImpl() { - super(TimelineClientImpl.class.getName(), null); - } - - public TimelineClientImpl(ApplicationId applicationId) { - super(TimelineClientImpl.class.getName(), applicationId); - this.timelineServiceV2 = true; + super(TimelineClientImpl.class.getName()); } protected void serviceInit(Configuration conf) throws Exception { - this.configuration = conf; + timelineServiceVersion = + conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); + LOG.info("Timeline service address: " + getTimelineServiceAddress()); + if (!YarnConfiguration.timelineServiceEnabled(conf) + || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0) + || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) { + throw new IOException("Timeline V1 client is not properly configured. " + + "Either timeline service is not enabled or version is not set to" + + " 1.x"); + } UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation realUgi = ugi.getRealUser(); if (realUgi != null) { @@ -299,62 +116,34 @@ public class TimelineClientImpl extends TimelineClient { authUgi = ugi; doAsUser = null; } - ClientConfig cc = new DefaultClientConfig(); - cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); - connConfigurator = initConnConfigurator(conf); - if (UserGroupInformation.isSecurityEnabled()) { - authenticator = new KerberosDelegationTokenAuthenticator(); - } else { - authenticator = new PseudoDelegationTokenAuthenticator(); - } - authenticator.setConnectionConfigurator(connConfigurator); token = new DelegationTokenAuthenticatedURL.Token(); + connector = createTimelineConnector(); - connectionRetry = new TimelineClientConnectionRetry(conf); - client = new Client(new URLConnectionClientHandler( - new TimelineURLConnectionFactory()), cc); - TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter(); - // TODO need to cleanup filter retry later. - if (!timelineServiceV2) { - client.addFilter(retryFilter); - } - - // old version timeline service need to get address from configuration - // while new version need to auto discovery (with retry). - if (timelineServiceV2) { - maxServiceRetries = conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - serviceRetryInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - entityDispatcher = new TimelineEntityDispatcher(conf); + if (YarnConfiguration.useHttps(conf)) { + timelineServiceAddress = + conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); } else { - if (YarnConfiguration.useHttps(conf)) { - setTimelineServiceAddress(conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS)); - } else { - setTimelineServiceAddress(conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS)); - } - timelineServiceVersion = - conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); - LOG.info("Timeline service address: " + getTimelineServiceAddress()); + timelineServiceAddress = + conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); } super.serviceInit(conf); } + @VisibleForTesting + protected TimelineConnector createTimelineConnector() { + TimelineConnector newConnector = + new TimelineConnector(true, authUgi, doAsUser, token); + addIfService(newConnector); + return newConnector; + } + @Override protected void serviceStart() throws Exception { - if (timelineServiceV2) { - entityDispatcher.start(); - } else { - timelineWriter = createTimelineWriter(configuration, authUgi, client, - constructResURI(getConfig(), timelineServiceAddress, false)); - } + timelineWriter = createTimelineWriter(getConfig(), authUgi, + connector.getClient(), TimelineConnector.constructResURI(getConfig(), + timelineServiceAddress, RESOURCE_URI_STR_V1)); } protected TimelineWriter createTimelineWriter(Configuration conf, @@ -373,12 +162,6 @@ public class TimelineClientImpl extends TimelineClient { if (this.timelineWriter != null) { this.timelineWriter.close(); } - if (timelineServiceV2) { - entityDispatcher.stop(); - } - if (this.sslFactory != null) { - this.sslFactory.destroy(); - } super.serviceStop(); } @@ -390,132 +173,17 @@ public class TimelineClientImpl extends TimelineClient { } @Override - public TimelinePutResponse putEntities( - TimelineEntity... entities) throws IOException, YarnException { + public TimelinePutResponse putEntities(TimelineEntity... entities) + throws IOException, YarnException { return timelineWriter.putEntities(entities); } @Override - public void putEntities( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException { - if (!timelineServiceV2) { - throw new YarnException("v.2 method is invoked on a v.1.x client"); - } - entityDispatcher.dispatchEntities(true, entities); - } - - @Override - public void putEntitiesAsync( - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... - entities) throws IOException, YarnException { - if (!timelineServiceV2) { - throw new YarnException("v.2 method is invoked on a v.1.x client"); - } - entityDispatcher.dispatchEntities(false, entities); - } - - @Override public void putDomain(TimelineDomain domain) throws IOException, YarnException { timelineWriter.putDomain(domain); } - // Used for new timeline service only - @Private - protected void putObjects(String path, MultivaluedMap<String, String> params, - Object obj) throws IOException, YarnException { - - int retries = verifyRestEndPointAvailable(); - - // timelineServiceAddress could be stale, add retry logic here. - boolean needRetry = true; - while (needRetry) { - try { - URI uri = constructResURI(getConfig(), timelineServiceAddress, true); - putObjects(uri, path, params, obj); - needRetry = false; - } catch (IOException e) { - // handle exception for timelineServiceAddress being updated. - checkRetryWithSleep(retries, e); - retries--; - } - } - } - - private int verifyRestEndPointAvailable() throws YarnException { - // timelineServiceAddress could haven't be initialized yet - // or stale (only for new timeline service) - int retries = pollTimelineServiceAddress(this.maxServiceRetries); - if (timelineServiceAddress == null) { - String errMessage = "TimelineClient has reached to max retry times : " - + this.maxServiceRetries - + ", but failed to fetch timeline service address. Please verify" - + " Timeline Auxiliary Service is configured in all the NMs"; - LOG.error(errMessage); - throw new YarnException(errMessage); - } - return retries; - } - - /** - * Check if reaching to maximum of retries. - * @param retries - * @param e - */ - private void checkRetryWithSleep(int retries, IOException e) - throws YarnException, IOException { - if (retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new YarnException("Interrupted while retrying to connect to ATS"); - } - } else { - StringBuilder msg = - new StringBuilder("TimelineClient has reached to max retry times : "); - msg.append(this.maxServiceRetries); - msg.append(" for service address: "); - msg.append(timelineServiceAddress); - LOG.error(msg.toString()); - throw new IOException(msg.toString(), e); - } - } - - protected void putObjects( - URI base, String path, MultivaluedMap<String, String> params, Object obj) - throws IOException, YarnException { - ClientResponse resp; - try { - resp = client.resource(base).path(path).queryParams(params) - .accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON) - .put(ClientResponse.class, obj); - } catch (RuntimeException re) { - // runtime exception is expected if the client cannot connect the server - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg, re); - throw new IOException(re); - } - if (resp == null || - resp.getStatusInfo().getStatusCode() != - ClientResponse.Status.OK.getStatusCode()) { - String msg = "Response from the timeline server is " + - ((resp == null) ? "null": - "not successful," + " HTTP error code: " + resp.getStatus() - + ", Server response:\n" + resp.getEntity(String.class)); - LOG.error(msg); - throw new YarnException(msg); - } - } - - @Override - public void setTimelineServiceAddress(String address) { - this.timelineServiceAddress = address; - } - private String getTimelineServiceAddress() { return this.timelineServiceAddress; } @@ -532,17 +200,17 @@ public class TimelineClientImpl extends TimelineClient { public Token<TimelineDelegationTokenIdentifier> run() throws Exception { DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(authenticator, - connConfigurator); + connector.getDelegationTokenAuthenticatedURL(); // TODO we should add retry logic here if timelineServiceAddress is // not available immediately. return (Token) authUrl.getDelegationToken( - constructResURI(getConfig(), - getTimelineServiceAddress(), false).toURL(), + TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(), token, renewer, doAsUser); } }; - return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction); + return (Token<TimelineDelegationTokenIdentifier>) connector + .operateDelegationToken(getDTAction); } @SuppressWarnings("unchecked") @@ -568,26 +236,26 @@ public class TimelineClientImpl extends TimelineClient { token.setDelegationToken((Token) timelineDT); } DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(authenticator, - connConfigurator); + connector.getDelegationTokenAuthenticatedURL(); // If the token service address is not available, fall back to use // the configured service address. - final URI serviceURI = isTokenServiceAddrEmpty ? - constructResURI(getConfig(), getTimelineServiceAddress(), false) + final URI serviceURI = isTokenServiceAddrEmpty + ? TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1) : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR_V1, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); return authUrl .renewDelegationToken(serviceURI.toURL(), token, doAsUser); } }; - return (Long) operateDelegationToken(renewDTAction); + return (Long) connector.operateDelegationToken(renewDTAction); } @SuppressWarnings("unchecked") @Override public void cancelDelegationToken( final Token<TimelineDelegationTokenIdentifier> timelineDT) - throws IOException, YarnException { + throws IOException, YarnException { final boolean isTokenServiceAddrEmpty = timelineDT.getService().toString().isEmpty(); final String scheme = isTokenServiceAddrEmpty ? null @@ -607,134 +275,29 @@ public class TimelineClientImpl extends TimelineClient { token.setDelegationToken((Token) timelineDT); } DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(authenticator, - connConfigurator); + connector.getDelegationTokenAuthenticatedURL(); // If the token service address is not available, fall back to use // the configured service address. - final URI serviceURI = isTokenServiceAddrEmpty ? - constructResURI(getConfig(), getTimelineServiceAddress(), false) + final URI serviceURI = isTokenServiceAddrEmpty + ? TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1) : new URI(scheme, null, address.getHostName(), - address.getPort(), RESOURCE_URI_STR_V1, null, null); + address.getPort(), RESOURCE_URI_STR_V1, null, null); authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser); return null; } }; - operateDelegationToken(cancelDTAction); + connector.operateDelegationToken(cancelDTAction); } @Override public String toString() { return super.toString() + " with timeline server " - + constructResURI(getConfig(), getTimelineServiceAddress(), false) + + TimelineConnector.constructResURI(getConfig(), + getTimelineServiceAddress(), RESOURCE_URI_STR_V1) + " and writer " + timelineWriter; } - private Object operateDelegationToken( - final PrivilegedExceptionAction<?> action) - throws IOException, YarnException { - // Set up the retry operation - TimelineClientRetryOp tokenRetryOp = - createTimelineClientRetryOpForOperateDelegationToken(action); - - return connectionRetry.retryOn(tokenRetryOp); - } - - /** - * Poll TimelineServiceAddress for maximum of retries times if it is null. - * - * @param retries - * @return the left retry times - * @throws IOException - */ - private int pollTimelineServiceAddress(int retries) throws YarnException { - while (timelineServiceAddress == null && retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new YarnException("Interrupted while trying to connect ATS"); - } - retries--; - } - return retries; - } - - private class TimelineURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) throws IOException { - authUgi.checkTGTAndReloginFromKeytab(); - try { - return new DelegationTokenAuthenticatedURL( - authenticator, connConfigurator).openConnection(url, token, - doAsUser); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } catch (AuthenticationException ae) { - throw new IOException(ae); - } - } - - } - - private ConnectionConfigurator initConnConfigurator(Configuration conf) { - try { - return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf); - } catch (Exception e) { - LOG.debug("Cannot load customized ssl related configuration. " + - "Fallback to system-generic settings.", e); - return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; - } - } - - private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = - new ConnectionConfigurator() { - @Override - public HttpURLConnection configure(HttpURLConnection conn) - throws IOException { - setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT); - return conn; - } - }; - - private ConnectionConfigurator initSslConnConfigurator(final int timeout, - Configuration conf) throws IOException, GeneralSecurityException { - final SSLSocketFactory sf; - final HostnameVerifier hv; - - sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); - sslFactory.init(); - sf = sslFactory.createSSLSocketFactory(); - hv = sslFactory.getHostnameVerifier(); - - return new ConnectionConfigurator() { - @Override - public HttpURLConnection configure(HttpURLConnection conn) - throws IOException { - if (conn instanceof HttpsURLConnection) { - HttpsURLConnection c = (HttpsURLConnection) conn; - c.setSSLSocketFactory(sf); - c.setHostnameVerifier(hv); - } - setTimeouts(conn, timeout); - return conn; - } - }; - } - - private static void setTimeouts(URLConnection connection, int socketTimeout) { - connection.setConnectTimeout(socketTimeout); - connection.setReadTimeout(socketTimeout); - } - - private static URI constructResURI( - Configuration conf, String address, boolean v2) { - return URI.create( - JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://", - address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1)); - } - public static void main(String[] argv) throws Exception { CommandLine cliParser = new GnuParser().parse(opts, argv); if (cliParser.hasOption("put")) { @@ -870,266 +433,4 @@ public class TimelineClientImpl extends TimelineClient { public void setTimelineWriter(TimelineWriter writer) { this.timelineWriter = writer; } - - @Private - @VisibleForTesting - public TimelineClientRetryOp - createTimelineClientRetryOpForOperateDelegationToken( - final PrivilegedExceptionAction<?> action) throws IOException { - return new TimelineClientRetryOpForOperateDelegationToken( - this.authUgi, action); - } - - @Private - @VisibleForTesting - public class TimelineClientRetryOpForOperateDelegationToken - extends TimelineClientRetryOp { - - private final UserGroupInformation authUgi; - private final PrivilegedExceptionAction<?> action; - - public TimelineClientRetryOpForOperateDelegationToken( - UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) { - this.authUgi = authUgi; - this.action = action; - } - - @Override - public Object run() throws IOException { - // Try pass the request, if fail, keep retrying - authUgi.checkTGTAndReloginFromKeytab(); - try { - return authUgi.doAs(action); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public boolean shouldRetryOn(Exception e) { - // retry on connection exceptions - // and SocketTimeoutException - return (e instanceof ConnectException - || e instanceof SocketTimeoutException); - } - } - - private final class EntitiesHolder extends FutureTask<Void> { - private final - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entities; - private final boolean isSync; - - EntitiesHolder( - final - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entities, - final boolean isSync) { - super(new Callable<Void>() { - // publishEntities() - public Void call() throws Exception { - MultivaluedMap<String, String> params = new MultivaluedMapImpl(); - params.add("appid", getContextAppId().toString()); - params.add("async", Boolean.toString(!isSync)); - putObjects("entities", params, entities); - return null; - } - }); - this.entities = entities; - this.isSync = isSync; - } - - public boolean isSync() { - return isSync; - } - - public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - getEntities() { - return entities; - } - } - - /** - * This class is responsible for collecting the timeline entities and - * publishing them in async. - */ - private class TimelineEntityDispatcher { - /** - * Time period for which the timelineclient will wait for draining after - * stop. - */ - private static final long DRAIN_TIME_PERIOD = 2000L; - - private int numberOfAsyncsToMerge; - private final BlockingQueue<EntitiesHolder> timelineEntityQueue; - private ExecutorService executor; - - TimelineEntityDispatcher(Configuration conf) { - timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>(); - numberOfAsyncsToMerge = - conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, - YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE); - } - - Runnable createRunnable() { - return new Runnable() { - @Override - public void run() { - try { - EntitiesHolder entitiesHolder; - while (!Thread.currentThread().isInterrupted()) { - // Merge all the async calls and make one push, but if its sync - // call push immediately - try { - entitiesHolder = timelineEntityQueue.take(); - } catch (InterruptedException ie) { - LOG.info("Timeline dispatcher thread was interrupted "); - Thread.currentThread().interrupt(); - return; - } - if (entitiesHolder != null) { - publishWithoutBlockingOnQueue(entitiesHolder); - } - } - } finally { - if (!timelineEntityQueue.isEmpty()) { - LOG.info("Yet to publish " + timelineEntityQueue.size() - + " timelineEntities, draining them now. "); - } - // Try to drain the remaining entities to be published @ the max for - // 2 seconds - long timeTillweDrain = - System.currentTimeMillis() + DRAIN_TIME_PERIOD; - while (!timelineEntityQueue.isEmpty()) { - publishWithoutBlockingOnQueue(timelineEntityQueue.poll()); - if (System.currentTimeMillis() > timeTillweDrain) { - // time elapsed stop publishing further.... - if (!timelineEntityQueue.isEmpty()) { - LOG.warn("Time to drain elapsed! Remaining " - + timelineEntityQueue.size() + "timelineEntities will not" - + " be published"); - // if some entities were not drained then we need interrupt - // the threads which had put sync EntityHolders to the queue. - EntitiesHolder nextEntityInTheQueue = null; - while ((nextEntityInTheQueue = - timelineEntityQueue.poll()) != null) { - nextEntityInTheQueue.cancel(true); - } - } - break; - } - } - } - } - - /** - * Publishes the given EntitiesHolder and return immediately if sync - * call, else tries to fetch the EntitiesHolder from the queue in non - * blocking fashion and collate the Entities if possible before - * publishing through REST. - * - * @param entitiesHolder - */ - private void publishWithoutBlockingOnQueue( - EntitiesHolder entitiesHolder) { - if (entitiesHolder.isSync()) { - entitiesHolder.run(); - return; - } - int count = 1; - while (true) { - // loop till we find a sync put Entities or there is nothing - // to take - EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll(); - if (nextEntityInTheQueue == null) { - // Nothing in the queue just publish and get back to the - // blocked wait state - entitiesHolder.run(); - break; - } else if (nextEntityInTheQueue.isSync()) { - // flush all the prev async entities first - entitiesHolder.run(); - // and then flush the sync entity - nextEntityInTheQueue.run(); - break; - } else { - // append all async entities together and then flush - entitiesHolder.getEntities().addEntities( - nextEntityInTheQueue.getEntities().getEntities()); - count++; - if (count == numberOfAsyncsToMerge) { - // Flush the entities if the number of the async - // putEntites merged reaches the desired limit. To avoid - // collecting multiple entities and delaying for a long - // time. - entitiesHolder.run(); - break; - } - } - } - } - }; - } - - public void dispatchEntities(boolean sync, - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[] - entitiesTobePublished) throws YarnException { - if (executor.isShutdown()) { - throw new YarnException("Timeline client is in the process of stopping," - + " not accepting any more TimelineEntities"); - } - - // wrap all TimelineEntity into TimelineEntities object - org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities - entities = - new org.apache.hadoop.yarn.api.records.timelineservice. - TimelineEntities(); - for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity - entity : entitiesTobePublished) { - entities.addEntity(entity); - } - - // created a holder and place it in queue - EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); - try { - timelineEntityQueue.put(entitiesHolder); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new YarnException( - "Failed while adding entity to the queue for publishing", e); - } - - if (sync) { - // In sync call we need to wait till its published and if any error then - // throw it back - try { - entitiesHolder.get(); - } catch (ExecutionException e) { - throw new YarnException("Failed while publishing entity", - e.getCause()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new YarnException("Interrupted while publishing entity", e); - } - } - } - - public void start() { - executor = Executors.newSingleThreadExecutor(); - executor.execute(createRunnable()); - } - - public void stop() { - LOG.info("Stopping TimelineClient."); - executor.shutdownNow(); - try { - executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - e.printStackTrace(); - } - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org