Repository: hadoop Updated Branches: refs/heads/YARN-2928 10a4f8ae6 -> db76a3ad0
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java index d991b42..f9eb5b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java @@ -24,7 +24,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; /** * Represents the flow run table column families. */ -public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> { +public enum FlowActivityColumnFamily + implements ColumnFamily<FlowActivityTable> { /** * Info column family houses known columns, specifically ones included in http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 21ddcc2..a5933da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -31,12 +31,13 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStor import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; /** - * Identifies partially qualified columns for the {@link FlowActivityTable} + * Identifies partially qualified columns for the {@link FlowActivityTable}. */ -public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> { +public enum FlowActivityColumnPrefix + implements ColumnPrefix<FlowActivityTable> { /** - * To store run ids of the flows + * To store run ids of the flows. */ RUN_ID(FlowActivityColumnFamily.INFO, "r", null); @@ -162,8 +163,8 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps( - Result result) throws IOException { + public <T> NavigableMap<String, NavigableMap<Long, T>> + readResultsWithTimestamps(Result result) throws IOException { return column.readResultsWithTimestamps(result, columnPrefixBytes); } @@ -179,8 +180,8 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> public static final FlowActivityColumnPrefix columnFor(String columnPrefix) { // Match column based on value, assume column family matches. - for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix - .values()) { + for (FlowActivityColumnPrefix flowActivityColPrefix : + FlowActivityColumnPrefix.values()) { // Find a match based only on name. if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) { return flowActivityColPrefix; @@ -209,8 +210,8 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> // TODO: needs unit test to confirm and need to update javadoc to explain // null prefix case. - for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix - .values()) { + for (FlowActivityColumnPrefix flowActivityColumnPrefix : + FlowActivityColumnPrefix.values()) { // Find a match based column family and on name. if (flowActivityColumnPrefix.columnFamily.equals(columnFamily) && (((columnPrefix == null) && (flowActivityColumnPrefix http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index a9598ef..80b3287 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -57,9 +57,9 @@ public class FlowActivityRowKey { /** * Constructs a row key prefix for the flow activity table as follows: - * {@code clusterId!} + * {@code clusterId!}. * - * @param clusterId + * @param clusterId Cluster Id. * @return byte array with the row key prefix */ public static byte[] getRowKeyPrefix(String clusterId) { @@ -68,10 +68,10 @@ public class FlowActivityRowKey { /** * Constructs a row key prefix for the flow activity table as follows: - * {@code clusterId!dayTimestamp!} + * {@code clusterId!dayTimestamp!}. * - * @param clusterId - * @param dayTs + * @param clusterId Cluster Id. + * @param dayTs Start of the day timestamp. * @return byte array with the row key prefix */ public static byte[] getRowKeyPrefix(String clusterId, long dayTs) { @@ -82,12 +82,13 @@ public class FlowActivityRowKey { /** * Constructs a row key for the flow activity table as follows: - * {@code clusterId!dayTimestamp!user!flowName} + * {@code clusterId!dayTimestamp!user!flowName}. + * Will insert into current day's record in the table. Uses current time to + * store top of the day timestamp. * - * Will insert into current day's record in the table - * @param clusterId - * @param userId - * @param flowName + * @param clusterId Cluster Id. + * @param userId User Id. + * @param flowName Flow Name. * @return byte array with the row key prefix */ public static byte[] getRowKey(String clusterId, String userId, @@ -99,12 +100,12 @@ public class FlowActivityRowKey { /** * Constructs a row key for the flow activity table as follows: - * {@code clusterId!dayTimestamp!user!flowName} + * {@code clusterId!dayTimestamp!user!flowName}. * - * @param clusterId - * @param dayTs - * @param userId - * @param flowName + * @param clusterId Cluster Id. + * @param dayTs Top of the day timestamp. + * @param userId User Id. + * @param flowName Flow Name. * @return byte array for the row key */ public static byte[] getRowKey(String clusterId, long dayTs, String userId, @@ -118,6 +119,9 @@ public class FlowActivityRowKey { /** * Given the raw row key as bytes, returns the row key as an object. + * + * @param rowKey Byte representation of row key. + * @return A <cite>FlowActivityRowKey</cite> object. */ public static FlowActivityRowKey parseRowKey(byte[] rowKey) { byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java index 315281f..8a0430c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; * * Example flow activity table record: * - * </pre> + * <pre> * |-------------------------------------------| * | Row key | Column Family | * | | info | @@ -52,19 +52,20 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; * </pre> */ public class FlowActivityTable extends BaseTable<FlowActivityTable> { - /** flow activity table prefix */ + /** flow activity table prefix. */ private static final String PREFIX = YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity"; - /** config param name that specifies the flowactivity table name */ + /** config param name that specifies the flowactivity table name. */ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; - /** default value for flowactivity table name */ - public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity"; + /** default value for flowactivity table name. */ + public static final String DEFAULT_TABLE_NAME = + "timelineservice.flowactivity"; private static final Log LOG = LogFactory.getLog(FlowActivityTable.class); - /** default max number of versions */ + /** default max number of versions. */ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; public FlowActivityTable() { @@ -91,16 +92,16 @@ public class FlowActivityTable extends BaseTable<FlowActivityTable> { + " already exists."); } - HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table); + HTableDescriptor flowActivityTableDescp = new HTableDescriptor(table); HColumnDescriptor infoCF = new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes()); infoCF.setBloomFilterType(BloomType.ROWCOL); - FlowActivityTableDescp.addFamily(infoCF); + flowActivityTableDescp.addFamily(infoCF); infoCF.setMinVersions(1); infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); // TODO: figure the split policy before running in production - admin.createTable(FlowActivityTableDescp); + admin.createTable(flowActivityTableDescp); LOG.info("Status of table creation for " + table.getNameAsString() + "=" + admin.tableExists(table)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index e3bb52d..3d7c40e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -194,8 +194,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps( - Result result) throws IOException { + public <T> NavigableMap<String, NavigableMap<Long, T>> + readResultsWithTimestamps(Result result) throws IOException { return column.readResultsWithTimestamps(result, columnPrefixBytes); } @@ -248,8 +248,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { // Find a match based column family and on name. if (frcp.columnFamily.equals(columnFamily) - && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp - .getColumnPrefix().equals(columnPrefix)))) { + && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || + (frcp.getColumnPrefix().equals(columnPrefix)))) { return frcp; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 1984157..9698f06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -46,6 +46,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +/** + * Coprocessor for flow run table. + */ public class FlowRunCoprocessor extends BaseRegionObserver { @SuppressWarnings("unused") @@ -53,9 +56,10 @@ public class FlowRunCoprocessor extends BaseRegionObserver { private HRegion region; /** - * generate a timestamp that is unique per row in a region this is per region + * generate a timestamp that is unique per row in a region this is per region. */ - private final TimestampGenerator timestampGenerator = new TimestampGenerator(); + private final TimestampGenerator timestampGenerator = + new TimestampGenerator(); @Override public void start(CoprocessorEnvironment e) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index 2cd9625..0585dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -56,11 +56,11 @@ public class FlowRunRowKey { /** * Constructs a row key prefix for the flow run table as follows: { - * clusterId!userI!flowName!} + * clusterId!userI!flowName!}. * - * @param clusterId - * @param userId - * @param flowName + * @param clusterId Cluster Id. + * @param userId User Id. + * @param flowName Flow Name. * @return byte array with the row key prefix */ public static byte[] getRowKeyPrefix(String clusterId, String userId, @@ -71,12 +71,12 @@ public class FlowRunRowKey { /** * Constructs a row key for the entity table as follows: { - * clusterId!userI!flowName!Inverted Flow Run Id} + * clusterId!userI!flowName!Inverted Flow Run Id}. * - * @param clusterId - * @param userId - * @param flowName - * @param flowRunId + * @param clusterId Cluster Id. + * @param userId User Id. + * @param flowName Flow Name. + * @param flowRunId Run Id for the flow name. * @return byte array with the row key */ public static byte[] getRowKey(String clusterId, String userId, @@ -91,6 +91,9 @@ public class FlowRunRowKey { /** * Given the raw row key as bytes, returns the row key as an object. + * + * @param rowKey Byte representation of row key. + * @return A <cite>FlowRunRowKey</cite> object. */ public static FlowRunRowKey parseRowKey(byte[] rowKey) { byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java index 2682fea..547bef0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -84,19 +84,19 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; * </pre> */ public class FlowRunTable extends BaseTable<FlowRunTable> { - /** entity prefix */ + /** entity prefix. */ private static final String PREFIX = YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun"; - /** config param name that specifies the flowrun table name */ + /** config param name that specifies the flowrun table name. */ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; - /** default value for flowrun table name */ + /** default value for flowrun table name. */ public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun"; private static final Log LOG = LogFactory.getLog(FlowRunTable.class); - /** default max number of versions */ + /** default max number of versions. */ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; public FlowRunTable() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index d541df0..6fefd15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -90,8 +90,8 @@ class FlowScanner implements RegionScanner, Closeable { } @Override - public boolean nextRaw(List<Cell> cells, int limit) throws IOException { - return nextInternal(cells, limit); + public boolean nextRaw(List<Cell> cells, int cellLimit) throws IOException { + return nextInternal(cells, cellLimit); } @Override @@ -100,8 +100,8 @@ class FlowScanner implements RegionScanner, Closeable { } @Override - public boolean next(List<Cell> cells, int limit) throws IOException { - return nextInternal(cells, limit); + public boolean next(List<Cell> cells, int cellLimit) throws IOException { + return nextInternal(cells, cellLimit); } private String getAggregationCompactionDimension(List<Tag> tags) { @@ -161,11 +161,12 @@ class FlowScanner implements RegionScanner, Closeable { * column or returns the cell as is. * * @param cells - * @param limit + * @param cellLimit * @return true if next row is available for the scanner, false otherwise * @throws IOException */ - private boolean nextInternal(List<Cell> cells, int limit) throws IOException { + private boolean nextInternal(List<Cell> cells, int cellLimit) + throws IOException { Cell cell = null; startNext(); // Loop through all the cells in this row @@ -183,8 +184,8 @@ class FlowScanner implements RegionScanner, Closeable { Set<String> alreadySeenAggDim = new HashSet<>(); int addedCnt = 0; ValueConverter converter = null; - while (((cell = peekAtNextCell(limit)) != null) - && (limit <= 0 || addedCnt < limit)) { + while (((cell = peekAtNextCell(cellLimit)) != null) + && (cellLimit <= 0 || addedCnt < cellLimit)) { byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (converter != null && isNumericConverter(converter)) { @@ -198,12 +199,12 @@ class FlowScanner implements RegionScanner, Closeable { } // No operation needs to be performed on non numeric converters. if (!isNumericConverter(converter)) { - nextCell(limit); + nextCell(cellLimit); continue; } collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, (NumericValueConverter)converter); - nextCell(limit); + nextCell(cellLimit); } if (!currentColumnCells.isEmpty()) { emitCells(cells, currentColumnCells, currentAggOp, @@ -220,7 +221,7 @@ class FlowScanner implements RegionScanner, Closeable { } /** - * resets the parameters to an intialized state for next loop iteration + * resets the parameters to an intialized state for next loop iteration. * * @param cell * @param currentAggOp @@ -278,14 +279,12 @@ class FlowScanner implements RegionScanner, Closeable { List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); String aggDim = getAggregationCompactionDimension(tags); - if (alreadySeenAggDim.contains(aggDim)) { - // if this agg dimension has already been seen, - // since they show up in sorted order - // we drop the rest which are older - // in other words, this cell is older than previously seen cells - // for that agg dim - } else { - // not seen this agg dim, hence consider this cell in our working set + + // If this agg dimension has already been seen, since they show up in + // sorted order, we drop the rest which are older. In other words, this + // cell is older than previously seen cells for that agg dim. + if (!alreadySeenAggDim.contains(aggDim)) { + // Not seen this agg dim, hence consider this cell in our working set currentColumnCells.add(cell); alreadySeenAggDim.add(aggDim); } @@ -424,6 +423,8 @@ class FlowScanner implements RegionScanner, Closeable { /** * Returns whether or not the underlying scanner has more rows. + * + * @return true, if there are more cells to return, false otherwise. */ public boolean hasMore() { return currentIndex < availableCells.size() ? true : hasMore; @@ -434,15 +435,16 @@ class FlowScanner implements RegionScanner, Closeable { * pointer to the next cell. This method can be called multiple times in a row * to advance through all the available cells. * - * @param limit + * @param cellLimit * the limit of number of cells to return if the next batch must be * fetched by the wrapped scanner * @return the next available cell or null if no more cells are available for * the current row - * @throws IOException + * @throws IOException if any problem is encountered while grabbing the next + * cell. */ - public Cell nextCell(int limit) throws IOException { - Cell cell = peekAtNextCell(limit); + public Cell nextCell(int cellLimit) throws IOException { + Cell cell = peekAtNextCell(cellLimit); if (cell != null) { currentIndex++; } @@ -454,19 +456,20 @@ class FlowScanner implements RegionScanner, Closeable { * pointer. Calling this method multiple times in a row will continue to * return the same cell. * - * @param limit + * @param cellLimit * the limit of number of cells to return if the next batch must be * fetched by the wrapped scanner * @return the next available cell or null if no more cells are available for * the current row - * @throws IOException + * @throws IOException if any problem is encountered while grabbing the next + * cell. */ - public Cell peekAtNextCell(int limit) throws IOException { + public Cell peekAtNextCell(int cellLimit) throws IOException { if (currentIndex >= availableCells.size()) { // done with current batch availableCells.clear(); currentIndex = 0; - hasMore = flowRunScanner.next(availableCells, limit); + hasMore = flowRunScanner.next(availableCells, cellLimit); } Cell cell = null; if (currentIndex < availableCells.size()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java new file mode 100644 index 0000000..04963f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow + * contains classes related to implementation for flow related tables, viz. flow + * run table and flow activity table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java index f652ffd..e78db2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java @@ -16,6 +16,10 @@ * limitations under the License. */ +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage contains + * classes which define and implement reading and writing to backend storage. + */ @InterfaceAudience.Private @InterfaceStability.Unstable package org.apache.hadoop.yarn.server.timelineservice.storage; http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 387f7d7..0de09e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -98,7 +98,7 @@ class ApplicationEntityReader extends GenericEntityReader { TimelineEntityFilters filters = getFilters(); if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getEventFilters() == null)) { + (isSingleEntityRead() || filters.getEventFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -107,7 +107,7 @@ class ApplicationEntityReader extends GenericEntityReader { // info not required. if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getInfoFilters() == null)) { + (isSingleEntityRead() || filters.getInfoFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -116,7 +116,7 @@ class ApplicationEntityReader extends GenericEntityReader { // is releated to not required. if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getIsRelatedTo() == null)) { + (isSingleEntityRead() || filters.getIsRelatedTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -125,7 +125,7 @@ class ApplicationEntityReader extends GenericEntityReader { // relates to not required. if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getRelatesTo() == null)) { + (isSingleEntityRead() || filters.getRelatesTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -133,7 +133,7 @@ class ApplicationEntityReader extends GenericEntityReader { } list.addFilter(infoColFamilyList); if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) || - (!singleEntityRead && filters.getConfigFilters() != null)) || + (!isSingleEntityRead() && filters.getConfigFilters() != null)) || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) { FilterList filterCfg = @@ -148,7 +148,7 @@ class ApplicationEntityReader extends GenericEntityReader { list.addFilter(filterCfg); } if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) || - (!singleEntityRead && filters.getMetricFilters() != null)) || + (!isSingleEntityRead() && filters.getMetricFilters() != null)) || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { FilterList filterMetrics = @@ -177,7 +177,7 @@ class ApplicationEntityReader extends GenericEntityReader { if (filterList != null && !filterList.getFilters().isEmpty()) { get.setFilter(filterList); } - return table.getResult(hbaseConf, conn, get); + return getTable().getResult(hbaseConf, conn, get); } @Override @@ -186,7 +186,7 @@ class ApplicationEntityReader extends GenericEntityReader { "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getEntityType(), "entityType shouldn't be null"); - if (singleEntityRead) { + if (isSingleEntityRead()) { Preconditions.checkNotNull(getContext().getAppId(), "appId shouldn't be null"); } else { @@ -201,14 +201,14 @@ class ApplicationEntityReader extends GenericEntityReader { protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { TimelineReaderContext context = getContext(); - if (singleEntityRead) { + if (isSingleEntityRead()) { if (context.getFlowName() == null || context.getFlowRunId() == null || context.getUserId() == null) { FlowContext flowContext = lookupFlowContext( context.getClusterId(), context.getAppId(), hbaseConf, conn); - context.setFlowName(flowContext.flowName); - context.setFlowRunId(flowContext.flowRunId); - context.setUserId(flowContext.userId); + context.setFlowName(flowContext.getFlowName()); + context.setFlowRunId(flowContext.getFlowRunId()); + context.setUserId(flowContext.getUserId()); } } getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); @@ -234,7 +234,7 @@ class ApplicationEntityReader extends GenericEntityReader { newList.addFilter(filterList); } scan.setFilter(newList); - return table.getResultScanner(hbaseConf, conn, scan); + return getTable().getResultScanner(hbaseConf, conn, scan); } @Override @@ -252,7 +252,7 @@ class ApplicationEntityReader extends GenericEntityReader { Number createdTime = (Number)ApplicationColumn.CREATED_TIME.readResult(result); entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && + if (!isSingleEntityRead() && (entity.getCreatedTime() < filters.getCreatedTimeBegin() || entity.getCreatedTime() > filters.getCreatedTimeEnd())) { return null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 96350da..0d2bdd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -111,7 +111,7 @@ class FlowActivityEntityReader extends TimelineEntityReader { // the scanner may still return more than the limit; therefore we need to // read the right number as we iterate scan.setFilter(new PageFilter(getFilters().getLimit())); - return table.getResultScanner(hbaseConf, conn, scan); + return getTable().getResultScanner(hbaseConf, conn, scan); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 2d1c41c..743315c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -82,7 +82,7 @@ class FlowRunEntityReader extends TimelineEntityReader { "userId shouldn't be null"); Preconditions.checkNotNull(getContext().getFlowName(), "flowName shouldn't be null"); - if (singleEntityRead) { + if (isSingleEntityRead()) { Preconditions.checkNotNull(getContext().getFlowRunId(), "flowRunId shouldn't be null"); } @@ -103,7 +103,7 @@ class FlowRunEntityReader extends TimelineEntityReader { new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // Metrics not required. - if (!singleEntityRead && + if (!isSingleEntityRead() && !dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); @@ -137,7 +137,7 @@ class FlowRunEntityReader extends TimelineEntityReader { if (filterList != null && !filterList.getFilters().isEmpty()) { get.setFilter(filterList); } - return table.getResult(hbaseConf, conn, get); + return getTable().getResult(hbaseConf, conn, get); } @Override @@ -154,7 +154,7 @@ class FlowRunEntityReader extends TimelineEntityReader { newList.addFilter(filterList); } scan.setFilter(newList); - return table.getResultScanner(hbaseConf, conn, scan); + return getTable().getResultScanner(hbaseConf, conn, scan); } @Override @@ -163,7 +163,7 @@ class FlowRunEntityReader extends TimelineEntityReader { FlowRunEntity flowRun = new FlowRunEntity(); flowRun.setUser(context.getUserId()); flowRun.setName(context.getFlowName()); - if (singleEntityRead) { + if (isSingleEntityRead()) { flowRun.setRunId(context.getFlowRunId()); } else { FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); @@ -175,7 +175,7 @@ class FlowRunEntityReader extends TimelineEntityReader { if (startTime != null) { flowRun.setStartTime(startTime.longValue()); } - if (!singleEntityRead && + if (!isSingleEntityRead() && (flowRun.getStartTime() < getFilters().getCreatedTimeBegin() || flowRun.getStartTime() > getFilters().getCreatedTimeEnd())) { return null; @@ -194,7 +194,7 @@ class FlowRunEntityReader extends TimelineEntityReader { } // read metrics - if (singleEntityRead || + if (isSingleEntityRead() || getDataToRetrieve().getFieldsToRetrieve().contains(Field.METRICS)) { readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 3bc2f3f..d8f73d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -47,8 +47,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilter import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; @@ -118,7 +118,7 @@ class GenericEntityReader extends TimelineEntityReader { // Events not required. if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.EVENTS) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getEventFilters() == null)) { + (isSingleEntityRead() || filters.getEventFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -127,7 +127,7 @@ class GenericEntityReader extends TimelineEntityReader { // info not required. if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.INFO) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getInfoFilters() == null)) { + (isSingleEntityRead() || filters.getInfoFilters() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -136,7 +136,7 @@ class GenericEntityReader extends TimelineEntityReader { // is related to not required. if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.IS_RELATED_TO) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getIsRelatedTo() == null)) { + (isSingleEntityRead() || filters.getIsRelatedTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -145,7 +145,7 @@ class GenericEntityReader extends TimelineEntityReader { // relates to not required. if (!dataToRetrieve.getFieldsToRetrieve().contains(Field.RELATES_TO) && !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) && - (singleEntityRead || filters.getRelatesTo() == null)) { + (isSingleEntityRead() || filters.getRelatesTo() == null)) { infoColFamilyList.addFilter( new QualifierFilter(CompareOp.NOT_EQUAL, new BinaryPrefixComparator( @@ -153,7 +153,7 @@ class GenericEntityReader extends TimelineEntityReader { } list.addFilter(infoColFamilyList); if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS) || - (!singleEntityRead && filters.getConfigFilters() != null)) || + (!isSingleEntityRead() && filters.getConfigFilters() != null)) || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty())) { FilterList filterCfg = @@ -167,7 +167,7 @@ class GenericEntityReader extends TimelineEntityReader { list.addFilter(filterCfg); } if ((dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS) || - (!singleEntityRead && filters.getMetricFilters() != null)) || + (!isSingleEntityRead() && filters.getMetricFilters() != null)) || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty())) { FilterList filterMetrics = @@ -201,14 +201,23 @@ class GenericEntityReader extends TimelineEntityReader { } protected static class FlowContext { - protected final String userId; - protected final String flowName; - protected final Long flowRunId; + private final String userId; + private final String flowName; + private final Long flowRunId; public FlowContext(String user, String flowName, Long flowRunId) { this.userId = user; this.flowName = flowName; this.flowRunId = flowRunId; } + protected String getUserId() { + return userId; + } + protected String getFlowName() { + return flowName; + } + protected Long getFlowRunId() { + return flowRunId; + } } @Override @@ -219,7 +228,7 @@ class GenericEntityReader extends TimelineEntityReader { "appId shouldn't be null"); Preconditions.checkNotNull(getContext().getEntityType(), "entityType shouldn't be null"); - if (singleEntityRead) { + if (isSingleEntityRead()) { Preconditions.checkNotNull(getContext().getEntityId(), "entityId shouldn't be null"); } @@ -254,7 +263,7 @@ class GenericEntityReader extends TimelineEntityReader { if (filterList != null && !filterList.getFilters().isEmpty()) { get.setFilter(filterList); } - return table.getResult(hbaseConf, conn, get); + return getTable().getResult(hbaseConf, conn, get); } @Override @@ -271,7 +280,7 @@ class GenericEntityReader extends TimelineEntityReader { if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); } - return table.getResultScanner(hbaseConf, conn, scan); + return getTable().getResultScanner(hbaseConf, conn, scan); } @Override @@ -289,7 +298,7 @@ class GenericEntityReader extends TimelineEntityReader { // fetch created time Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && + if (!isSingleEntityRead() && (entity.getCreatedTime() < filters.getCreatedTimeBegin() || entity.getCreatedTime() > filters.getCreatedTimeEnd())) { return null; @@ -401,6 +410,14 @@ class GenericEntityReader extends TimelineEntityReader { /** * Helper method for reading relationship. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isRelatedTo if true, means relationship is to be added to + * isRelatedTo, otherwise its added to relatesTo. + * @throws IOException if any problem is encountered while reading result. */ protected <T> void readRelationship( TimelineEntity entity, Result result, ColumnPrefix<T> prefix, @@ -421,6 +438,13 @@ class GenericEntityReader extends TimelineEntityReader { /** * Helper method for reading key-value pairs for either info or config. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isConfig if true, means we are reading configs, otherwise info. + * @throws IOException if any problem is encountered while reading result. */ protected <T> void readKeyValuePairs( TimelineEntity entity, Result result, ColumnPrefix<T> prefix, @@ -441,6 +465,12 @@ class GenericEntityReader extends TimelineEntityReader { * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted * if there is no info associated with the event. * + * @param entity entity to fill. + * @param result HBase Result. + * @param isApplication if true, event read is for application table, + * otherwise its being read for entity table. + * @throws IOException if any problem is encountered while reading result. + * * See {@link EntityTable} and {@link ApplicationTable} for a more detailed * schema description. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 454c179..281e901 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix public abstract class TimelineEntityReader { private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); - protected final boolean singleEntityRead; + private final boolean singleEntityRead; private TimelineReaderContext context; private TimelineDataToRetrieve dataToRetrieve; // used only for multiple entity read mode @@ -56,7 +56,7 @@ public abstract class TimelineEntityReader { /** * Main table the entity reader uses. */ - protected BaseTable<?> table; + private BaseTable<?> table; /** * Specifies whether keys for this table are sorted in a manner where entities @@ -68,6 +68,13 @@ public abstract class TimelineEntityReader { /** * Instantiates a reader for multiple-entity reads. + * + * @param ctxt Reader context which defines the scope in which query has to be + * made. + * @param entityFilters Filters which limit the entities returned. + * @param toRetrieve Data to retrieve for each entity. + * @param sortedKeys Specifies whether key for this table are sorted or not. + * If sorted, entities can be retrieved by created time. */ protected TimelineEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, @@ -78,11 +85,15 @@ public abstract class TimelineEntityReader { this.dataToRetrieve = toRetrieve; this.filters = entityFilters; - this.table = getTable(); + this.setTable(getTable()); } /** * Instantiates a reader for single-entity reads. + * + * @param ctxt Reader context which defines the scope in which query has to be + * made. + * @param toRetrieve Data to retrieve for each entity. */ protected TimelineEntityReader(TimelineReaderContext ctxt, TimelineDataToRetrieve toRetrieve) { @@ -90,13 +101,14 @@ public abstract class TimelineEntityReader { this.context = ctxt; this.dataToRetrieve = toRetrieve; - this.table = getTable(); + this.setTable(getTable()); } /** * Creates a {@link FilterList} based on fields, confs and metrics to * retrieve. This filter list will be set in Scan/Get objects to trim down * results fetched from HBase back-end storage. + * * @return a {@link FilterList} object. */ protected abstract FilterList constructFilterListBasedOnFields(); @@ -115,6 +127,12 @@ public abstract class TimelineEntityReader { /** * Reads and deserializes a single timeline entity from the HBase storage. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @return A <cite>TimelineEntity</cite> object. + * @throws IOException if there is any exception encountered while reading + * entity. */ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) throws IOException { @@ -136,6 +154,11 @@ public abstract class TimelineEntityReader { * Reads and deserializes a set of timeline entities from the HBase storage. * It goes through all the results available, and returns the number of * entries as specified in the limit in the entity's natural sort order. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @return a set of <cite>TimelineEntity</cite> objects. + * @throws IOException if any exception is encountered while reading entities. */ public Set<TimelineEntity> readEntities(Configuration hbaseConf, Connection conn) throws IOException { @@ -170,8 +193,12 @@ public abstract class TimelineEntityReader { /** * Returns the main table to be used by the entity reader. + * + * @return A reference to the table. */ - protected abstract BaseTable<?> getTable(); + protected BaseTable<?> getTable() { + return table; + } /** * Validates the required parameters to read the entities. @@ -180,6 +207,10 @@ public abstract class TimelineEntityReader { /** * Sets certain parameters to defaults if the values are not provided. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @throws IOException if any exception is encountered while setting params. */ protected abstract void augmentParams(Configuration hbaseConf, Connection conn) throws IOException; @@ -187,23 +218,35 @@ public abstract class TimelineEntityReader { /** * Fetches a {@link Result} instance for a single-entity read. * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @param filterList filter list which will be applied to HBase Get. * @return the {@link Result} instance or null if no such record is found. + * @throws IOException if any exception is encountered while getting result. */ protected abstract Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException; /** * Fetches a {@link ResultScanner} for a multi-entity read. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @param filterList filter list which will be applied to HBase Scan. + * @return the {@link ResultScanner} instance. + * @throws IOException if any exception is encountered while getting results. */ protected abstract ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException; /** - * Given a {@link Result} instance, deserializes and creates a - * {@link TimelineEntity}. + * Parses the result retrieved from HBase backend and convert it into a + * {@link TimelineEntity} object. * - * @return the {@link TimelineEntity} instance, or null if the {@link Result} - * is null or empty. + * @param result Single row result of a Get/Scan. + * @return the <cite>TimelineEntity</cite> instance or null if the entity is + * filtered. + * @throws IOException if any exception is encountered while parsing entity. */ protected abstract TimelineEntity parseEntity(Result result) throws IOException; @@ -212,6 +255,11 @@ public abstract class TimelineEntityReader { * Helper method for reading and deserializing {@link TimelineMetric} objects * using the specified column prefix. The timeline metrics then are added to * the given timeline entity. + * + * @param entity {@link TimelineEntity} object. + * @param result {@link Result} object retrieved from backend. + * @param columnPrefix Metric column prefix + * @throws IOException if any exception is encountered while reading metrics. */ protected void readMetrics(TimelineEntity entity, Result result, ColumnPrefix<?> columnPrefix) throws IOException { @@ -229,4 +277,18 @@ public abstract class TimelineEntityReader { entity.addMetric(metric); } } + + /** + * Checks whether the reader has been created to fetch single entity or + * multiple entities. + * + * @return true, if query is for single entity, false otherwise. + */ + public boolean isSingleEntityRead() { + return singleEntityRead; + } + + protected void setTable(BaseTable<?> baseTable) { + this.table = baseTable; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java index f2bdacd..b2a9476 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -25,10 +25,19 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContex /** * Factory methods for instantiating a timeline entity reader. */ -public class TimelineEntityReaderFactory { +public final class TimelineEntityReaderFactory { + private TimelineEntityReaderFactory() { + } + /** * Creates a timeline entity reader instance for reading a single entity with * the specified input. + * + * @param context Reader context which defines the scope in which query has to + * be made. + * @param dataToRetrieve Data to retrieve for each entity. + * @return An implementation of <cite>TimelineEntityReader</cite> object + * depending on entity type. */ public static TimelineEntityReader createSingleEntityReader( TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) { @@ -51,6 +60,13 @@ public class TimelineEntityReaderFactory { /** * Creates a timeline entity reader instance for reading set of entities with * the specified input and predicates. + * + * @param context Reader context which defines the scope in which query has to + * be made. + * @param filters Filters which limit the entities returned. + * @param dataToRetrieve Data to retrieve for each entity. + * @return An implementation of <cite>TimelineEntityReader</cite> object + * depending on entity type. */ public static TimelineEntityReader createMultipleEntitiesReader( TimelineReaderContext context, TimelineEntityFilters filters, http://git-wip-us.apache.org/repos/asf/hadoop/blob/db76a3ad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java index 0b3fa38..9814d6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java @@ -15,6 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader + * contains classes used to read entities from backend based on query type. + */ @InterfaceAudience.Private @InterfaceStability.Unstable package org.apache.hadoop.yarn.server.timelineservice.storage.reader;