http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java new file mode 100644 index 0000000..3094088 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -0,0 +1,831 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the FlowRun and FlowActivity Tables. + */ +public class TestHBaseStorageFlowRunCompaction { + + private static HBaseTestingUtility util; + + private static final String METRIC_1 = "MAP_SLOT_MILLIS"; + private static final String METRIC_2 = "HDFS_BYTES_READ"; + + private final byte[] aRowKey = Bytes.toBytes("a"); + private final byte[] aFamily = Bytes.toBytes("family"); + private final byte[] aQualifier = Bytes.toBytes("qualifier"); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + } + + /** Writes non numeric data into flow run table + * reads it back. + * + * @throws Exception + */ + @Test + public void testWriteNonNumericData() throws Exception { + String rowKey = "nonNumericRowKey"; + String column = "nonNumericColumnName"; + String value = "nonNumericValue"; + byte[] rowKeyBytes = Bytes.toBytes(rowKey); + byte[] columnNameBytes = Bytes.toBytes(column); + byte[] valueBytes = Bytes.toBytes(value); + Put p = new Put(rowKeyBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Table flowRunTable = conn.getTable(table); + flowRunTable.put(p); + + Get g = new Get(rowKeyBytes); + Result r = flowRunTable.get(g); + assertNotNull(r); + assertTrue(r.size() >= 1); + Cell actualValue = r.getColumnLatestCell( + FlowRunColumnFamily.INFO.getBytes(), columnNameBytes); + assertNotNull(CellUtil.cloneValue(actualValue)); + assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value); + } + + @Test + public void testWriteScanBatchLimit() throws Exception { + String rowKey = "nonNumericRowKey"; + String column = "nonNumericColumnName"; + String value = "nonNumericValue"; + String column2 = "nonNumericColumnName2"; + String value2 = "nonNumericValue2"; + String column3 = "nonNumericColumnName3"; + String value3 = "nonNumericValue3"; + String column4 = "nonNumericColumnName4"; + String value4 = "nonNumericValue4"; + + byte[] rowKeyBytes = Bytes.toBytes(rowKey); + byte[] columnNameBytes = Bytes.toBytes(column); + byte[] valueBytes = Bytes.toBytes(value); + byte[] columnName2Bytes = Bytes.toBytes(column2); + byte[] value2Bytes = Bytes.toBytes(value2); + byte[] columnName3Bytes = Bytes.toBytes(column3); + byte[] value3Bytes = Bytes.toBytes(value3); + byte[] columnName4Bytes = Bytes.toBytes(column4); + byte[] value4Bytes = Bytes.toBytes(value4); + + Put p = new Put(rowKeyBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, + value2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, + value3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, + value4Bytes); + + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Table flowRunTable = conn.getTable(table); + flowRunTable.put(p); + + String rowKey2 = "nonNumericRowKey2"; + byte[] rowKey2Bytes = Bytes.toBytes(rowKey2); + p = new Put(rowKey2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, + value2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, + value3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, + value4Bytes); + flowRunTable.put(p); + + String rowKey3 = "nonNumericRowKey3"; + byte[] rowKey3Bytes = Bytes.toBytes(rowKey3); + p = new Put(rowKey3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName2Bytes, + value2Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName3Bytes, + value3Bytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnName4Bytes, + value4Bytes); + flowRunTable.put(p); + + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + int batchLimit = 2; + s.setBatch(batchLimit); + ResultScanner scanner = flowRunTable.getScanner(s); + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map<byte[], byte[]> values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertTrue(values.size() <= batchLimit); + } + + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + batchLimit = 3; + s.setBatch(batchLimit); + scanner = flowRunTable.getScanner(s); + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map<byte[], byte[]> values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertTrue(values.size() <= batchLimit); + } + + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + batchLimit = 1000; + s.setBatch(batchLimit); + scanner = flowRunTable.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertTrue(result.rawCells().length <= batchLimit); + Map<byte[], byte[]> values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + assertTrue(values.size() <= batchLimit); + // we expect all back in one next call + assertEquals(4, values.size()); + rowCount++; + } + // should get back 1 row with each invocation + // if scan batch is set sufficiently high + assertEquals(3, rowCount); + + // test with a negative number + // should have same effect as setting it to a high number + s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + s.setStartRow(rowKeyBytes); + // set number of cells to fetch per scanner next invocation + batchLimit = -2992; + s.setBatch(batchLimit); + scanner = flowRunTable.getScanner(s); + rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + assertEquals(4, result.rawCells().length); + Map<byte[], byte[]> values = result + .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); + // we expect all back in one next call + assertEquals(4, values.size()); + System.out.println(" values size " + values.size() + " " + batchLimit); + rowCount++; + } + // should get back 1 row with each invocation + // if scan batch is set sufficiently high + assertEquals(3, rowCount); + } + + @Test + public void testWriteFlowRunCompaction() throws Exception { + String cluster = "kompaction_cluster1"; + String user = "kompaction_FlowRun__user1"; + String flow = "kompaction_flowRun_flow_name"; + String flowVersion = "AF1021C19F1351"; + long runid = 1449526652000L; + + int start = 10; + int count = 2000; + int appIdSuffix = 1; + HBaseTimelineWriterImpl hbi = null; + long insertTs = System.currentTimeMillis() - count; + Configuration c1 = util.getConfiguration(); + TimelineEntities te1 = null; + TimelineEntity entityApp1 = null; + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + // now insert count * ( 100 + 100) metrics + // each call to getEntityMetricsApp1 brings back 100 values + // of metric1 and 100 of metric2 + for (int i = start; i < start + count; i++) { + String appName = "application_10240000000000_" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); + te1.addEntity(entityApp1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + + appName = "application_2048000000000_7" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); + te1.addEntity(entityApp1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + } + } finally { + String appName = "application_10240000000000_" + appIdSuffix; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete( + insertTs + 1, c1); + te1.addEntity(entityApp1); + if (hbi != null) { + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.flush(); + hbi.close(); + } + } + + // check in flow run table + HRegionServer server = util.getRSForFirstRegionInTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + List<Region> regions = server.getOnlineRegions(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + assertTrue("Didn't find any regions for primary table!", + regions.size() > 0); + // flush and compact all the regions of the primary table + for (Region region : regions) { + region.flush(true); + region.compact(true); + } + + // check flow run for one flow many apps + checkFlowRunTable(cluster, user, flow, runid, c1, 4); + } + + + private void checkFlowRunTable(String cluster, String user, String flow, + long runid, Configuration c1, int valueCount) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = + new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + assertEquals(valueCount, values.size()); + + rowCount++; + // check metric1 + byte[] q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1); + assertTrue(values.containsKey(q)); + assertEquals(141, Bytes.toLong(values.get(q))); + + // check metric2 + q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2); + assertTrue(values.containsKey(q)); + assertEquals(57, Bytes.toLong(values.get(q))); + } + assertEquals(1, rowCount); + } + + + private FlowScanner getFlowScannerForTestingCompaction() { + // create a FlowScanner object with the sole purpose of invoking a process + // summation; + CompactionRequest request = new CompactionRequest(); + request.setIsMajor(true, true); + // okay to pass in nulls for the constructor arguments + // because all we want to do is invoke the process summation + FlowScanner fs = new FlowScanner(null, null, + (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION)); + assertNotNull(fs); + return fs; + } + + @Test + public void checkProcessSummationMoreCellsSumFinal2() + throws IOException { + long cellValue1 = 1236L; + long cellValue2 = 28L; + long cellValue3 = 1236L; + long cellValue4 = 1236L; + FlowScanner fs = getFlowScannerForTestingCompaction(); + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cell1Ts = 1200120L; + long cell2Ts = TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), "application_123746661110_11202"); + long cell3Ts = 1277719L; + long cell4Ts = currentTimestamp - 10; + + SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); + + List<Tag> tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_1234588888_91188"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); + currentColumnCells.add(c1); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_12700000001_29102"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + // create a cell with a recent timestamp and attribute SUM_FINAL + Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); + currentColumnCells.add(c2); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_191780000000001_8195"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp but has attribute SUM + Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); + currentColumnCells.add(c3); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_191780000000001_98104"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp but has attribute SUM + Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); + currentColumnCells.add(c4); + + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back 4 cells + // one is the flow sum cell + // two are the cells with SUM attribute + // one cell with SUM_FINAL + assertEquals(4, cells.size()); + + for (int i = 0; i < cells.size(); i++) { + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + if (returnValue == cellValue2) { + assertTrue(returnTs == cell2Ts); + } else if (returnValue == cellValue3) { + assertTrue(returnTs == cell3Ts); + } else if (returnValue == cellValue4) { + assertTrue(returnTs == cell4Ts); + } else if (returnValue == cellValue1) { + assertTrue(returnTs != cell1Ts); + assertTrue(returnTs > cell1Ts); + assertTrue(returnTs >= currentTimestamp); + } else { + // raise a failure since we expect only these two values back + Assert.fail(); + } + } + } + + // tests with many cells + // of type SUM and SUM_FINAL + // all cells of SUM_FINAL will expire + @Test + public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + int count = 200000; + + long cellValueFinal = 1000L; + long cellValueNotFinal = 28L; + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cellTsFinalStart = 10001120L; + long cellTsFinal = cellTsFinalStart; + long cellTsNotFinalStart = currentTimestamp - 5; + long cellTsNotFinal = cellTsNotFinalStart; + + SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); + List<Tag> tags = null; + Tag t = null; + Cell c1 = null; + + // insert SUM_FINAL cells + for (int i = 0; i < count; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123450000" + i + "01_19" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsFinal++; + } + + // add SUM cells + for (int i = 0; i < count; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_1987650000" + i + "83_911" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with attribute SUM + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsNotFinal++; + } + + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back count + 1 cells + // one is the flow sum cell + // others are the cells with SUM attribute + assertEquals(count + 1, cells.size()); + + for (int i = 0; i < cells.size(); i++) { + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + if (returnValue == (count * cellValueFinal)) { + assertTrue(returnTs > (cellTsFinalStart + count)); + assertTrue(returnTs >= currentTimestamp); + } else if ((returnValue >= cellValueNotFinal) + && (returnValue <= cellValueNotFinal * count)) { + assertTrue(returnTs >= cellTsNotFinalStart); + assertTrue(returnTs <= cellTsNotFinalStart * count); + } else { + // raise a failure since we expect only these values back + Assert.fail(); + } + } + } + + // tests with many cells + // of type SUM and SUM_FINAL + // NOT cells of SUM_FINAL will expire + @Test + public void checkProcessSummationMoreCellsSumFinalVariedTags() + throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + int countFinal = 20100; + int countNotFinal = 1000; + int countFinalNotExpire = 7009; + + long cellValueFinal = 1000L; + long cellValueNotFinal = 28L; + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cellTsFinalStart = 10001120L; + long cellTsFinal = cellTsFinalStart; + + long cellTsFinalStartNotExpire = + TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), "application_10266666661166_118821"); + long cellTsFinalNotExpire = cellTsFinalStartNotExpire; + + long cellTsNotFinalStart = currentTimestamp - 5; + long cellTsNotFinal = cellTsNotFinalStart; + + SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); + List<Tag> tags = null; + Tag t = null; + Cell c1 = null; + + // insert SUM_FINAL cells which will expire + for (int i = 0; i < countFinal; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123450000" + i + "01_19" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsFinal++; + } + + // insert SUM_FINAL cells which will NOT expire + for (int i = 0; i < countFinalNotExpire; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123450000" + i + "01_19" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsFinalNotExpire++; + } + + // add SUM cells + for (int i = 0; i < countNotFinal; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_1987650000" + i + "83_911" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with attribute SUM + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsNotFinal++; + } + + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back + // countNotFinal + countFinalNotExpire + 1 cells + // one is the flow sum cell + // count = the cells with SUM attribute + // count = the cells with SUM_FINAL attribute but not expired + assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size()); + + for (int i = 0; i < cells.size(); i++) { + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + if (returnValue == (countFinal * cellValueFinal)) { + assertTrue(returnTs > (cellTsFinalStart + countFinal)); + assertTrue(returnTs >= currentTimestamp); + } else if (returnValue == cellValueNotFinal) { + assertTrue(returnTs >= cellTsNotFinalStart); + assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal); + } else if (returnValue == cellValueFinal){ + assertTrue(returnTs >= cellTsFinalStartNotExpire); + assertTrue(returnTs <= cellTsFinalStartNotExpire + countFinalNotExpire); + } else { + // raise a failure since we expect only these values back + Assert.fail(); + } + } + } + + @Test + public void testProcessSummationMoreCellsSumFinal() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cellValue1 = 1236L; + long cellValue2 = 28L; + + List<Tag> tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_1234588888_999888"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); + + // create a cell with a VERY old timestamp and attribute SUM_FINAL + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + 120L, Bytes.toBytes(cellValue1), tagByteArray); + currentColumnCells.add(c1); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_100000000001_119101"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + + // create a cell with a VERY old timestamp but has attribute SUM + Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + 130L, Bytes.toBytes(cellValue2), tagByteArray); + currentColumnCells.add(c2); + List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back two cells + // one is the flow sum cell + // another is the cell with SUM attribute + assertEquals(2, cells.size()); + + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + long inputTs1 = c1.getTimestamp(); + long inputTs2 = c2.getTimestamp(); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + // the returned Ts will be far greater than input ts as well as the noted + // current timestamp + if (returnValue == cellValue2) { + assertTrue(returnTs == inputTs2); + } else if (returnValue == cellValue1) { + assertTrue(returnTs >= currentTimestamp); + assertTrue(returnTs != inputTs1); + } else { + // raise a failure since we expect only these two values back + Assert.fail(); + } + } + + @Test + public void testProcessSummationOneCellSumFinal() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + List<Tag> tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123458888888_999888"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); + + // create a cell with a VERY old timestamp + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + 120L, Bytes.toBytes(1110L), tagByteArray); + currentColumnCells.add(c1); + + List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); + assertNotNull(cells); + // we should not get the same cell back + // but we get back the flow cell + assertEquals(1, cells.size()); + + Cell returnedCell = cells.get(0); + // it's NOT the same cell + assertNotEquals(c1, returnedCell); + long inputTs = c1.getTimestamp(); + long returnTs = returnedCell.getTimestamp(); + // the returned Ts will be far greater than input ts as well as the noted + // current timestamp + assertTrue(returnTs > inputTs); + assertTrue(returnTs >= currentTimestamp); + } + + @Test + public void testProcessSummationOneCell() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + + // try for 1 cell with tag SUM + List<Tag> tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM.getTagType(), + "application_123458888888_999888"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + + SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); + + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + currentTimestamp, Bytes.toBytes(1110L), tagByteArray); + currentColumnCells.add(c1); + List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); + assertNotNull(cells); + // we expect the same cell back + assertEquals(1, cells.size()); + Cell c2 = cells.get(0); + assertEquals(c1, c2); + assertEquals(currentTimestamp, c2.getTimestamp()); + } + + @Test + public void testProcessSummationEmpty() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + long currentTimestamp = System.currentTimeMillis(); + + LongConverter longConverter = new LongConverter(); + + SortedSet<Cell> currentColumnCells = null; + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); + assertNotNull(cells); + assertEquals(0, cells.size()); + + currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); + cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); + assertNotNull(cells); + assertEquals(0, cells.size()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties new file mode 100644 index 0000000..81a3f6a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml new file mode 100644 index 0000000..64a79aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -0,0 +1,203 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hadoop-yarn-server</artifactId> + <groupId>org.apache.hadoop</groupId> + <version>2.9.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + <version>2.9.0-SNAPSHOT</version> + <name>Apache Hadoop YARN Timeline Service</name> + + <properties> + <!-- Needed for generating FindBugs warnings using parent pom --> + <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.inject</groupId> + <artifactId>guice</artifactId> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </dependency> + + <dependency> + <groupId>javax.xml.bind</groupId> + <artifactId>jaxb-api</artifactId> + </dependency> + + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </dependency> + + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </dependency> + + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-core</artifactId> + </dependency> + + <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <additionnalDependencies> + <additionnalDependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + </additionnalDependency> + </additionnalDependencies> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java new file mode 100644 index 0000000..694b709 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineContext.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice; + +/** + * Encapsulates timeline context information. + */ +public class TimelineContext { + + private String clusterId; + private String userId; + private String flowName; + private Long flowRunId; + private String appId; + + public TimelineContext() { + this(null, null, null, 0L, null); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((appId == null) ? 0 : appId.hashCode()); + result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode()); + result = prime * result + ((flowName == null) ? 0 : flowName.hashCode()); + result = prime * result + ((flowRunId == null) ? 0 : flowRunId.hashCode()); + result = prime * result + ((userId == null) ? 0 : userId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TimelineContext other = (TimelineContext) obj; + if (appId == null) { + if (other.appId != null) { + return false; + } + } else if (!appId.equals(other.appId)) { + return false; + } + if (clusterId == null) { + if (other.clusterId != null) { + return false; + } + } else if (!clusterId.equals(other.clusterId)) { + return false; + } + if (flowName == null) { + if (other.flowName != null) { + return false; + } + } else if (!flowName.equals(other.flowName)) { + return false; + } + if (flowRunId == null) { + if (other.flowRunId != null) { + return false; + } + } else if (!flowRunId.equals(other.flowRunId)) { + return false; + } + if (userId == null) { + if (other.userId != null) { + return false; + } + } else if (!userId.equals(other.userId)) { + return false; + } + return true; + } + + public TimelineContext(String clusterId, String userId, String flowName, + Long flowRunId, String appId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowName = flowName; + this.flowRunId = flowRunId; + this.appId = appId; + } + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String cluster) { + this.clusterId = cluster; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String user) { + this.userId = user; + } + + public String getFlowName() { + return flowName; + } + + public void setFlowName(String flow) { + this.flowName = flow; + } + + public Long getFlowRunId() { + return flowRunId; + } + + public void setFlowRunId(long runId) { + this.flowRunId = runId; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String app) { + this.appId = app; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java new file mode 100644 index 0000000..d276269 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Preconditions; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage for a given YARN application. + * + * App-related lifecycle management is handled by this service. + */ +@Private +@Unstable +public class AppLevelTimelineCollector extends TimelineCollector { + private static final Log LOG = LogFactory.getLog(TimelineCollector.class); + + private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1; + private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15; + private static Set<String> entityTypesSkipAggregation + = initializeSkipSet(); + + private final ApplicationId appId; + private final TimelineCollectorContext context; + private ScheduledThreadPoolExecutor appAggregationExecutor; + + public AppLevelTimelineCollector(ApplicationId appId) { + super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); + Preconditions.checkNotNull(appId, "AppId shouldn't be null"); + this.appId = appId; + context = new TimelineCollectorContext(); + } + + private static Set<String> initializeSkipSet() { + Set<String> result = new HashSet<>(); + result.add(TimelineEntityType.YARN_APPLICATION.toString()); + result.add(TimelineEntityType.YARN_FLOW_RUN.toString()); + result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString()); + return result; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID)); + // Set the default values, which will be updated with an RPC call to get the + // context info from NM. + // Current user usually is not the app user, but keep this field non-null + context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName()); + context.setAppId(appId.toString()); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + // Launch the aggregation thread + appAggregationExecutor = new ScheduledThreadPoolExecutor( + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS, + new ThreadFactoryBuilder() + .setNameFormat("TimelineCollector Aggregation thread #%d") + .build()); + appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, + TimeUnit.SECONDS); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + appAggregationExecutor.shutdown(); + if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.info("App-level aggregator shutdown timed out, shutdown now. "); + appAggregationExecutor.shutdownNow(); + } + super.serviceStop(); + } + + @Override + public TimelineCollectorContext getTimelineEntityContext() { + return context; + } + + @Override + protected Set<String> getEntityTypesSkipAggregation() { + return entityTypesSkipAggregation; + } + + private class AppLevelAggregator implements Runnable { + + @Override + public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug("App-level real-time aggregating"); + } + if (!isReadyToAggregate()) { + LOG.warn("App-level collector is not ready, skip aggregation. "); + return; + } + try { + TimelineCollectorContext currContext = getTimelineEntityContext(); + Map<String, AggregationStatusTable> aggregationGroups + = getAggregationGroups(); + if (aggregationGroups == null + || aggregationGroups.isEmpty()) { + LOG.debug("App-level collector is empty, skip aggregation. "); + return; + } + TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId( + aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(resultEntity); + getWriter().write(currContext.getClusterId(), currContext.getUserId(), + currContext.getFlowName(), currContext.getFlowVersion(), + currContext.getFlowRunId(), currContext.getAppId(), entities); + } catch (Exception e) { + LOG.error("Error aggregating timeline metrics", e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("App-level real-time aggregation complete"); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java new file mode 100644 index 0000000..0323d7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Class on the NodeManager side that manages adding and removing collectors and + * their lifecycle. Also instantiates the per-node collector webapp. + */ +@Private +@Unstable +public class NodeTimelineCollectorManager extends TimelineCollectorManager { + private static final Log LOG = + LogFactory.getLog(NodeTimelineCollectorManager.class); + + // REST server for this collector manager. + private HttpServer2 timelineRestServer; + + private String timelineRestServerBindAddress; + + private volatile CollectorNodemanagerProtocol nmCollectorService; + + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; + + @VisibleForTesting + protected NodeTimelineCollectorManager() { + super(NodeTimelineCollectorManager.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + startWebApp(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (timelineRestServer != null) { + timelineRestServer.stop(); + } + super.serviceStop(); + } + + @Override + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { + try { + // Get context info from NM + updateTimelineCollectorContext(appId, collector); + // Report to NM if a new collector is added. + reportNewCollectorToNM(appId); + } catch (YarnException | IOException e) { + // throw exception here as it cannot be used if failed communicate with NM + LOG.error("Failed to communicate with NM Collector Service for " + appId); + throw new YarnRuntimeException(e); + } + } + + /** + * Launch the REST web server for this collector manager. + */ + private void startWebApp() { + Configuration conf = getConfig(); + String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0"; + try { + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("timeline") + .setConf(conf) + .addEndpoint(URI.create( + (YarnConfiguration.useHttps(conf) ? "https://" : "http://") + + bindAddress)); + timelineRestServer = builder.build(); + // TODO: replace this by an authentication filter in future. + HashMap<String, String> options = new HashMap<>(); + String username = conf.get(HADOOP_HTTP_STATIC_USER, + DEFAULT_HADOOP_HTTP_STATIC_USER); + options.put(HADOOP_HTTP_STATIC_USER, username); + HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), + "static_user_filter_timeline", + StaticUserWebFilter.StaticUserFilter.class.getName(), + options, new String[] {"/*"}); + + timelineRestServer.addJerseyResourcePackage( + TimelineCollectorWebService.class.getPackage().getName() + ";" + + GenericExceptionHandler.class.getPackage().getName() + ";" + + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + "/*"); + timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this); + timelineRestServer.start(); + } catch (Exception e) { + String msg = "The per-node collector webapp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + //TODO: We need to think of the case of multiple interfaces + this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( + timelineRestServer.getConnectorAddress(0)); + LOG.info("Instantiated the per-node collector webapp at " + + timelineRestServerBindAddress); + } + + private void reportNewCollectorToNM(ApplicationId appId) + throws YarnException, IOException { + ReportNewCollectorInfoRequest request = + ReportNewCollectorInfoRequest.newInstance(appId, + this.timelineRestServerBindAddress); + LOG.info("Report a new collector for application: " + appId + + " to the NM Collector Service."); + getNMCollectorService().reportNewCollectorInfo(request); + } + + private void updateTimelineCollectorContext( + ApplicationId appId, TimelineCollector collector) + throws YarnException, IOException { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance(appId); + LOG.info("Get timeline collector context for " + appId); + GetTimelineCollectorContextResponse response = + getNMCollectorService().getTimelineCollectorContext(request); + String userId = response.getUserId(); + if (userId != null && !userId.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the user in the context: " + userId); + } + collector.getTimelineEntityContext().setUserId(userId); + } + String flowName = response.getFlowName(); + if (flowName != null && !flowName.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow name: " + flowName); + } + collector.getTimelineEntityContext().setFlowName(flowName); + } + String flowVersion = response.getFlowVersion(); + if (flowVersion != null && !flowVersion.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow version: " + flowVersion); + } + collector.getTimelineEntityContext().setFlowVersion(flowVersion); + } + long flowRunId = response.getFlowRunId(); + if (flowRunId != 0L) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting the flow run id: " + flowRunId); + } + collector.getTimelineEntityContext().setFlowRunId(flowRunId); + } + } + + @VisibleForTesting + protected CollectorNodemanagerProtocol getNMCollectorService() { + if (nmCollectorService == null) { + synchronized (this) { + if (nmCollectorService == null) { + Configuration conf = getConfig(); + InetSocketAddress nmCollectorServiceAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); + LOG.info("nmCollectorServiceAddress: " + nmCollectorServiceAddress); + final YarnRPC rpc = YarnRPC.create(conf); + + // TODO Security settings. + nmCollectorService = (CollectorNodemanagerProtocol) rpc.getProxy( + CollectorNodemanagerProtocol.class, + nmCollectorServiceAddress, conf); + } + } + } + return nmCollectorService; + } + + @VisibleForTesting + public String getRestServerBindAddress() { + return timelineRestServerBindAddress; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java new file mode 100644 index 0000000..041e7c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import java.nio.ByteBuffer; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.server.api.ContainerType; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The top-level server for the per-node timeline collector manager. Currently + * it is defined as an auxiliary service to accommodate running within another + * daemon (e.g. node manager). + */ +@Private +@Unstable +public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { + private static final Log LOG = + LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class); + private static final int SHUTDOWN_HOOK_PRIORITY = 30; + + private final NodeTimelineCollectorManager collectorManager; + private long collectorLingerPeriod; + private ScheduledExecutorService scheduler; + + public PerNodeTimelineCollectorsAuxService() { + this(new NodeTimelineCollectorManager()); + } + + @VisibleForTesting PerNodeTimelineCollectorsAuxService( + NodeTimelineCollectorManager collectorsManager) { + super("timeline_collector"); + this.collectorManager = collectorsManager; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (!YarnConfiguration.timelineServiceV2Enabled(conf)) { + throw new YarnException("Timeline service v2 is not enabled"); + } + collectorLingerPeriod = + conf.getLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS, + YarnConfiguration.DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS); + scheduler = Executors.newSingleThreadScheduledExecutor(); + collectorManager.init(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + collectorManager.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + scheduler.shutdown(); + if (!scheduler.awaitTermination(collectorLingerPeriod, + TimeUnit.MILLISECONDS)) { + LOG.warn( + "Scheduler terminated before removing the application collectors"); + } + collectorManager.stop(); + super.serviceStop(); + } + + // these methods can be used as the basis for future service methods if the + // per-node collector runs separate from the node manager + /** + * Creates and adds an app level collector for the specified application id. + * The collector is also initialized and started. If the service already + * exists, no new service is created. + * + * @param appId Application Id to be added. + * @return whether it was added successfully + */ + public boolean addApplication(ApplicationId appId) { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + return (collectorManager.putIfAbsent(appId, collector) + == collector); + } + + /** + * Removes the app level collector for the specified application id. The + * collector is also stopped as a result. If the collector does not exist, no + * change is made. + * + * @param appId Application Id to be removed. + * @return whether it was removed successfully + */ + public boolean removeApplication(ApplicationId appId) { + return collectorManager.remove(appId); + } + + /** + * Creates and adds an app level collector for the specified application id. + * The collector is also initialized and started. If the collector already + * exists, no new collector is created. + */ + @Override + public void initializeContainer(ContainerInitializationContext context) { + // intercept the event of the AM container being created and initialize the + // app level collector service + if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { + ApplicationId appId = context.getContainerId(). + getApplicationAttemptId().getApplicationId(); + addApplication(appId); + } + } + + /** + * Removes the app level collector for the specified application id. The + * collector is also stopped as a result. If the collector does not exist, no + * change is made. + */ + @Override + public void stopContainer(ContainerTerminationContext context) { + // intercept the event of the AM container being stopped and remove the app + // level collector service + if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { + final ApplicationId appId = + context.getContainerId().getApplicationAttemptId().getApplicationId(); + scheduler.schedule(new Runnable() { + public void run() { + removeApplication(appId); + } + }, collectorLingerPeriod, TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + boolean hasApplication(ApplicationId appId) { + return collectorManager.containsTimelineCollector(appId); + } + + @Override + public void initializeApplication(ApplicationInitializationContext context) { + } + + @Override + public void stopApplication(ApplicationTerminationContext context) { + } + + @Override + public ByteBuffer getMetaData() { + // TODO currently it is not used; we can return a more meaningful data when + // we connect it with an AM + return ByteBuffer.allocate(0); + } + + @VisibleForTesting + public static PerNodeTimelineCollectorsAuxService + launchServer(String[] args, NodeTimelineCollectorManager collectorManager, + Configuration conf) { + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage( + PerNodeTimelineCollectorsAuxService.class, args, LOG); + PerNodeTimelineCollectorsAuxService auxService = null; + try { + auxService = collectorManager == null ? + new PerNodeTimelineCollectorsAuxService() : + new PerNodeTimelineCollectorsAuxService(collectorManager); + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService), + SHUTDOWN_HOOK_PRIORITY); + auxService.init(conf); + auxService.start(); + } catch (Throwable t) { + LOG.fatal("Error starting PerNodeTimelineCollectorServer", t); + ExitUtil.terminate(-1, "Error starting PerNodeTimelineCollectorServer"); + } + return auxService; + } + + private static class ShutdownHook implements Runnable { + private final PerNodeTimelineCollectorsAuxService auxService; + + public ShutdownHook(PerNodeTimelineCollectorsAuxService auxService) { + this.auxService = auxService; + } + + public void run() { + auxService.stop(); + } + } + + public static void main(String[] args) { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + launchServer(args, null, conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java new file mode 100644 index 0000000..2fc3033 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; + +/** + * Service that handles writes to the timeline service and writes them to the + * backing storage. + * + * Classes that extend this can add their own lifecycle management or + * customization of request handling. + */ +@Private +@Unstable +public abstract class TimelineCollector extends CompositeService { + + private static final Log LOG = LogFactory.getLog(TimelineCollector.class); + public static final String SEPARATOR = "_"; + + private TimelineWriter writer; + private ConcurrentMap<String, AggregationStatusTable> aggregationGroups + = new ConcurrentHashMap<>(); + private static Set<String> entityTypesSkipAggregation + = new HashSet<>(); + + private volatile boolean readyToAggregate = false; + + public TimelineCollector(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + protected void setWriter(TimelineWriter w) { + this.writer = w; + } + + protected TimelineWriter getWriter() { + return writer; + } + + protected Map<String, AggregationStatusTable> getAggregationGroups() { + return aggregationGroups; + } + + protected void setReadyToAggregate() { + readyToAggregate = true; + } + + protected boolean isReadyToAggregate() { + return readyToAggregate; + } + + /** + * Method to decide the set of timeline entity types the collector should + * skip on aggregations. Subclasses may want to override this method to + * customize their own behaviors. + * + * @return A set of strings consists of all types the collector should skip. + */ + protected Set<String> getEntityTypesSkipAggregation() { + return entityTypesSkipAggregation; + } + + public abstract TimelineCollectorContext getTimelineEntityContext(); + + + /** + * Handles entity writes. These writes are synchronous and are written to the + * backing storage without buffering/batching. If any entity already exists, + * it results in an update of the entity. + * + * This method should be reserved for selected critical entities and events. + * For normal voluminous writes one should use the async method + * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * + * @param entities entities to post + * @param callerUgi the caller UGI + * @return the response that contains the result of the post. + * @throws IOException if there is any exception encountered while putting + * entities. + */ + public TimelineWriteResponse putEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); + LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + TimelineCollectorContext context = getTimelineEntityContext(); + + // Update application metrics for aggregation + updateAggregateStatus(entities, aggregationGroups, + getEntityTypesSkipAggregation()); + + return writer.write(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), + context.getAppId(), entities); + } + + /** + * Handles entity writes in an asynchronous manner. The method returns as soon + * as validation is done. No promises are made on how quickly it will be + * written to the backing storage or if it will always be written to the + * backing storage. Multiple writes to the same entities may be batched and + * appropriate values updated and result in fewer writes to the backing + * storage. + * + * @param entities entities to post + * @param callerUgi the caller UGI + */ + public void putEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) { + // TODO implement + if (LOG.isDebugEnabled()) { + LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + + callerUgi + ")"); + } + } + + /** + * Aggregate all metrics in given timeline entities with no predefined states. + * + * @param entities Entities to aggregate + * @param resultEntityId Id of the result entity + * @param resultEntityType Type of the result entity + * @param needsGroupIdInResult Marks if we want the aggregation group id in + * each aggregated metrics. + * @return A timeline entity that contains all aggregated TimelineMetric. + */ + public static TimelineEntity aggregateEntities( + TimelineEntities entities, String resultEntityId, + String resultEntityType, boolean needsGroupIdInResult) { + ConcurrentMap<String, AggregationStatusTable> aggregationGroups + = new ConcurrentHashMap<>(); + updateAggregateStatus(entities, aggregationGroups, null); + if (needsGroupIdInResult) { + return aggregate(aggregationGroups, resultEntityId, resultEntityType); + } else { + return aggregateWithoutGroupId( + aggregationGroups, resultEntityId, resultEntityType); + } + } + + /** + * Update the aggregation status table for a timeline collector. + * + * @param entities Entities to update + * @param aggregationGroups Aggregation status table + * @param typesToSkip Entity types that we can safely assume to skip updating + */ + static void updateAggregateStatus( + TimelineEntities entities, + ConcurrentMap<String, AggregationStatusTable> aggregationGroups, + Set<String> typesToSkip) { + for (TimelineEntity e : entities.getEntities()) { + if ((typesToSkip != null && typesToSkip.contains(e.getType())) + || e.getMetrics().isEmpty()) { + continue; + } + AggregationStatusTable aggrTable = aggregationGroups.get(e.getType()); + if (aggrTable == null) { + AggregationStatusTable table = new AggregationStatusTable(); + aggrTable = aggregationGroups.putIfAbsent(e.getType(), + table); + if (aggrTable == null) { + aggrTable = table; + } + } + aggrTable.update(e); + } + } + + /** + * Aggregate internal status and generate timeline entities for the + * aggregation results. + * + * @param aggregationGroups Aggregation status table + * @param resultEntityId Id of the result entity + * @param resultEntityType Type of the result entity + * @return A timeline entity that contains all aggregated TimelineMetric. + */ + static TimelineEntity aggregate( + Map<String, AggregationStatusTable> aggregationGroups, + String resultEntityId, String resultEntityType) { + TimelineEntity result = new TimelineEntity(); + result.setId(resultEntityId); + result.setType(resultEntityType); + for (Map.Entry<String, AggregationStatusTable> entry + : aggregationGroups.entrySet()) { + entry.getValue().aggregateAllTo(result, entry.getKey()); + } + return result; + } + + /** + * Aggregate internal status and generate timeline entities for the + * aggregation results. The result metrics will not have aggregation group + * information. + * + * @param aggregationGroups Aggregation status table + * @param resultEntityId Id of the result entity + * @param resultEntityType Type of the result entity + * @return A timeline entity that contains all aggregated TimelineMetric. + */ + static TimelineEntity aggregateWithoutGroupId( + Map<String, AggregationStatusTable> aggregationGroups, + String resultEntityId, String resultEntityType) { + TimelineEntity result = new TimelineEntity(); + result.setId(resultEntityId); + result.setType(resultEntityType); + for (Map.Entry<String, AggregationStatusTable> entry + : aggregationGroups.entrySet()) { + entry.getValue().aggregateAllTo(result, ""); + } + return result; + } + + // Note: In memory aggregation is performed in an eventually consistent + // fashion. + protected static class AggregationStatusTable { + // On aggregation, for each metric, aggregate all per-entity accumulated + // metrics. We only use the id and type for TimelineMetrics in the key set + // of this table. + private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>> + aggregateTable; + + public AggregationStatusTable() { + aggregateTable = new ConcurrentHashMap<>(); + } + + public void update(TimelineEntity incoming) { + String entityId = incoming.getId(); + for (TimelineMetric m : incoming.getMetrics()) { + // Skip if the metric does not need aggregation + if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) { + continue; + } + // Update aggregateTable + Map<String, TimelineMetric> aggrRow = aggregateTable.get(m); + if (aggrRow == null) { + Map<String, TimelineMetric> tempRow = new ConcurrentHashMap<>(); + aggrRow = aggregateTable.putIfAbsent(m, tempRow); + if (aggrRow == null) { + aggrRow = tempRow; + } + } + aggrRow.put(entityId, m); + } + } + + public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e, + String aggregationGroupId) { + if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) { + return e; + } + Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric); + if (aggrRow != null) { + TimelineMetric aggrMetric = new TimelineMetric(); + if (aggregationGroupId.length() > 0) { + aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId); + } else { + aggrMetric.setId(metric.getId()); + } + aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP); + Map<Object, Object> status = new HashMap<>(); + for (TimelineMetric m : aggrRow.values()) { + TimelineMetric.aggregateTo(m, aggrMetric, status); + // getRealtimeAggregationOp returns an enum so we can directly + // compare with "!=". + if (m.getRealtimeAggregationOp() + != aggrMetric.getRealtimeAggregationOp()) { + aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp()); + } + } + Set<TimelineMetric> metrics = e.getMetrics(); + metrics.remove(aggrMetric); + metrics.add(aggrMetric); + } + return e; + } + + public TimelineEntity aggregateAllTo(TimelineEntity e, + String aggregationGroupId) { + for (TimelineMetric m : aggregateTable.keySet()) { + aggregateTo(m, e, aggregationGroupId); + } + return e; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org