Repository: hadoop Updated Branches: refs/heads/YARN-2928 3c36922d7 -> be95107aa (forced update)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8b5ab64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index 5adae71..0f51656 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -138,7 +138,7 @@ public class TimelineCollectorWebService { LOG.error("Application not found"); throw new NotFoundException(); // different exception? } - collector.postEntities(entities, callerUgi); + collector.putEntities(entities, callerUgi); return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8b5ab64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index f5603f6..41b6ac9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -52,7 +52,9 @@ public class FileSystemTimelineWriterImpl extends AbstractService /** default value for storage location on local disk */ public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT - = "/tmp/timeline_service_data/"; + = "/tmp/timeline_service_data"; + + private static final String ENTITIES_DIR = "entities"; /** Default extension for output files */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; @@ -61,38 +63,25 @@ public class FileSystemTimelineWriterImpl extends AbstractService super((FileSystemTimelineWriterImpl.class.getName())); } - /** - * Stores the entire information in {@link TimelineEntity} to the - * timeline store. Any errors occurring for individual write request objects - * will be reported in the response. - * - * @param data - * a {@link TimelineEntity} object - * @return {@link TimelineWriteResponse} object. - * @throws IOException - */ @Override - public TimelineWriteResponse write(TimelineEntities entities) - throws IOException { + public TimelineWriteResponse write(String clusterId, String userId, + String flowId, String flowRunId, String appId, + TimelineEntities entities) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { - write(entity, response); + write(clusterId, userId, flowId, flowRunId, appId, entity, response); } return response; } - private void write(TimelineEntity entity, + private void write(String clusterId, String userId, + String flowId, String flowRunId, String appId, TimelineEntity entity, TimelineWriteResponse response) throws IOException { PrintWriter out = null; try { - File outputDir = new File(outputRoot + entity.getType()); - String fileName = outputDir + "/" + entity.getId() - + TIMELINE_SERVICE_STORAGE_EXTENSION; - if (!outputDir.exists()) { - if (!outputDir.mkdirs()) { - throw new IOException("Could not create directories for " + fileName); - } - } + String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowId, + flowRunId, appId, entity.getType()); + String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION; out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true))); out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); out.write("\n"); @@ -112,20 +101,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService } } - /** - * Aggregates the entity information to the timeline store based on which - * track this entity is to be rolled up to The tracks along which aggregations - * are to be done are given by {@link TimelineAggregationTrack} - * - * Any errors occurring for individual write request objects will be reported - * in the response. - * - * @param data - * a {@link TimelineEntity} object - * a {@link TimelineAggregationTrack} enum value - * @return a {@link TimelineWriteResponse} object. - * @throws IOException - */ + @Override public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException { return null; @@ -141,4 +117,23 @@ public class FileSystemTimelineWriterImpl extends AbstractService outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); } + + @Override + public void serviceStart() throws Exception { + mkdirs(outputRoot, ENTITIES_DIR); + } + + private static String mkdirs(String... dirStrs) throws IOException { + StringBuilder path = new StringBuilder(); + for (String dirStr : dirStrs) { + path.append(dirStr).append('/'); + File dir = new File(path.toString()); + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Could not create directories for " + dir); + } + } + } + return path.toString(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8b5ab64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 71ad7ab..492e3a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -39,12 +39,19 @@ public interface TimelineWriter extends Service { * timeline store. Any errors occurring for individual write request objects * will be reported in the response. * + * @param clusterId context cluster ID + * @param userId context user ID + * @param flowId context flow ID + * @param flowRunId context flow run ID + * @param appId context app ID * @param data * a {@link TimelineEntities} object. * @return a {@link TimelineWriteResponse} object. * @throws IOException */ - TimelineWriteResponse write(TimelineEntities data) throws IOException; + TimelineWriteResponse write(String clusterId, String userId, + String flowId, String flowRunId, String appId, + TimelineEntities data) throws IOException; /** * Aggregates the entity information to the timeline store based on which http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8b5ab64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index 3b20352..1de8d6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -29,16 +30,25 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.junit.After; import org.junit.Test; +import java.io.IOException; + public class TestPerNodeTimelineCollectorsAuxService { private ApplicationAttemptId appAttemptId; + private PerNodeTimelineCollectorsAuxService auxService; public TestPerNodeTimelineCollectorsAuxService() { ApplicationId appId = @@ -46,10 +56,16 @@ public class TestPerNodeTimelineCollectorsAuxService { appAttemptId = ApplicationAttemptId.newInstance(appId, 1); } + @After + public void tearDown() throws Shell.ExitCodeException { + if (auxService != null) { + auxService.stop(); + } + } + @Test public void testAddApplication() throws Exception { - PerNodeTimelineCollectorsAuxService auxService = - createCollectorAndAddApplication(); + auxService = createCollectorAndAddApplication(); // auxService should have a single app assertTrue(auxService.hasApplication( appAttemptId.getApplicationId().toString())); @@ -58,7 +74,7 @@ public class TestPerNodeTimelineCollectorsAuxService { @Test public void testAddApplicationNonAMContainer() throws Exception { - PerNodeTimelineCollectorsAuxService auxService = createCollector(); + auxService = createCollector(); ContainerId containerId = getContainerId(2L); // not an AM ContainerInitializationContext context = @@ -72,8 +88,7 @@ public class TestPerNodeTimelineCollectorsAuxService { @Test public void testRemoveApplication() throws Exception { - PerNodeTimelineCollectorsAuxService auxService = - createCollectorAndAddApplication(); + auxService = createCollectorAndAddApplication(); // auxService should have a single app String appIdStr = appAttemptId.getApplicationId().toString(); assertTrue(auxService.hasApplication(appIdStr)); @@ -90,8 +105,7 @@ public class TestPerNodeTimelineCollectorsAuxService { @Test public void testRemoveApplicationNonAMContainer() throws Exception { - PerNodeTimelineCollectorsAuxService auxService = - createCollectorAndAddApplication(); + auxService = createCollectorAndAddApplication(); // auxService should have a single app String appIdStr = appAttemptId.getApplicationId().toString(); assertTrue(auxService.hasApplication(appIdStr)); @@ -109,7 +123,6 @@ public class TestPerNodeTimelineCollectorsAuxService { @Test(timeout = 60000) public void testLaunch() throws Exception { ExitUtil.disableSystemExit(); - PerNodeTimelineCollectorsAuxService auxService = null; try { auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], @@ -118,10 +131,6 @@ public class TestPerNodeTimelineCollectorsAuxService { assertEquals(0, e.status); ExitUtil.resetFirstExitException(); fail(); - } finally { - if (auxService != null) { - auxService.stop(); - } } } @@ -141,6 +150,8 @@ public class TestPerNodeTimelineCollectorsAuxService { TimelineCollectorManager collectorManager = createCollectorManager(); PerNodeTimelineCollectorsAuxService auxService = spy(new PerNodeTimelineCollectorsAuxService(collectorManager)); + auxService.init(new YarnConfiguration()); + auxService.start(); return auxService; } @@ -150,6 +161,14 @@ public class TestPerNodeTimelineCollectorsAuxService { doReturn(new Configuration()).when(collectorManager).getConfig(); CollectorNodemanagerProtocol nmCollectorService = mock(CollectorNodemanagerProtocol.class); + GetTimelineCollectorContextResponse response = + GetTimelineCollectorContextResponse.newInstance(null, null, null); + try { + when(nmCollectorService.getTimelineCollectorContext(any( + GetTimelineCollectorContextRequest.class))).thenReturn(response); + } catch (YarnException | IOException e) { + fail(); + } doReturn(nmCollectorService).when(collectorManager).getNMCollectorService(); return collectorManager; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8b5ab64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java index 541665b..38227ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java @@ -20,10 +20,14 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -33,15 +37,34 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class TestTimelineCollectorManager { + private TimelineCollectorManager collectorManager; + + @Before + public void setup() throws Exception { + collectorManager = createCollectorManager(); + collectorManager.init(new YarnConfiguration()); + collectorManager.start(); + } + + @After + public void tearDown() throws Exception { + if (collectorManager != null) { + collectorManager.stop(); + } + } @Test(timeout=60000) public void testMultithreadedAdd() throws Exception { - final TimelineCollectorManager collectorManager = createCollectorManager(); - final int NUM_APPS = 5; List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); for (int i = 0; i < NUM_APPS; i++) { @@ -49,7 +72,7 @@ public class TestTimelineCollectorManager { Callable<Boolean> task = new Callable<Boolean>() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId.toString()); + new AppLevelTimelineCollector(appId); return (collectorManager.putIfAbsent(appId, collector) == collector); } }; @@ -73,8 +96,6 @@ public class TestTimelineCollectorManager { @Test public void testMultithreadedAddAndRemove() throws Exception { - final TimelineCollectorManager collectorManager = createCollectorManager(); - final int NUM_APPS = 5; List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(); for (int i = 0; i < NUM_APPS; i++) { @@ -82,7 +103,7 @@ public class TestTimelineCollectorManager { Callable<Boolean> task = new Callable<Boolean>() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId.toString()); + new AppLevelTimelineCollector(appId); boolean successPut = (collectorManager.putIfAbsent(appId, collector) == collector); return successPut && collectorManager.remove(appId.toString()); @@ -112,6 +133,14 @@ public class TestTimelineCollectorManager { doReturn(new Configuration()).when(collectorManager).getConfig(); CollectorNodemanagerProtocol nmCollectorService = mock(CollectorNodemanagerProtocol.class); + GetTimelineCollectorContextResponse response = + GetTimelineCollectorContextResponse.newInstance(null, null, null); + try { + when(nmCollectorService.getTimelineCollectorContext(any( + GetTimelineCollectorContextRequest.class))).thenReturn(response); + } catch (YarnException | IOException e) { + fail(); + } doReturn(nmCollectorService).when(collectorManager).getNMCollectorService(); return collectorManager; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8b5ab64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 7f919f0..407b5f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -28,9 +28,9 @@ import java.nio.file.Paths; import java.util.List; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Test; @@ -52,13 +52,16 @@ public class TestFileSystemTimelineWriterImpl { entity.setModifiedTime(1425016502000L); te.addEntity(entity); - try (FileSystemTimelineWriterImpl fsi = - new FileSystemTimelineWriterImpl()) { - fsi.serviceInit(new Configuration()); - fsi.write(te); + FileSystemTimelineWriterImpl fsi = null; + try { + fsi = new FileSystemTimelineWriterImpl(); + fsi.init(new YarnConfiguration()); + fsi.start(); + fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te); - String fileName = fsi.getOutputRoot() + "/" + type + "/" + id - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + String fileName = fsi.getOutputRoot() + + "/entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type + + "/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; Path path = Paths.get(fileName); File f = new File(fileName); assertTrue(f.exists() && !f.isDirectory()); @@ -73,6 +76,11 @@ public class TestFileSystemTimelineWriterImpl { File outputDir = new File(fsi.getOutputRoot()); FileUtils.deleteDirectory(outputDir); assertTrue(!(f.exists())); + } finally { + if (fsi != null) { + fsi.stop(); + FileUtils.deleteDirectory(new File(fsi.getOutputRoot())); + } } } }