http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java new file mode 100644 index 0000000..b533624 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -0,0 +1,723 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invoked via the coprocessor when a Get or a Scan is issued for flow run + * table. Looks through the list of cells per row, checks their tags and does + * operation on those cells as per the cell tags. Transforms reads of the stored + * metrics into calculated sums for each column Also, finds the min and max for + * start and end times in a flow run. + */ +class FlowScanner implements RegionScanner, Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(FlowScanner.class); + + /** + * use a special application id to represent the flow id this is needed since + * TimestampGenerator parses the app id to generate a cell timestamp. + */ + private static final String FLOW_APP_ID = "application_00000000000_0000"; + + private final Region region; + private final InternalScanner flowRunScanner; + private final int batchSize; + private final long appFinalValueRetentionThreshold; + private RegionScanner regionScanner; + private boolean hasMore; + private byte[] currentRow; + private List<Cell> availableCells = new ArrayList<>(); + private int currentIndex; + private FlowScannerOperation action = FlowScannerOperation.READ; + + FlowScanner(RegionCoprocessorEnvironment env, InternalScanner internalScanner, + FlowScannerOperation action) { + this(env, null, internalScanner, action); + } + + FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan, + InternalScanner internalScanner, FlowScannerOperation action) { + this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch(); + // TODO initialize other scan attributes like Scan#maxResultSize + this.flowRunScanner = internalScanner; + if (internalScanner instanceof RegionScanner) { + this.regionScanner = (RegionScanner) internalScanner; + } + this.action = action; + if (env == null) { + this.appFinalValueRetentionThreshold = + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; + this.region = null; + } else { + this.region = env.getRegion(); + Configuration hbaseConf = env.getConfiguration(); + this.appFinalValueRetentionThreshold = hbaseConf.getLong( + YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); + } + if (LOG.isDebugEnabled()) { + LOG.debug(" batch size=" + batchSize); + } + } + + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() + */ + @Override + public HRegionInfo getRegionInfo() { + return new HRegionInfo(region.getRegionInfo()); + } + + @Override + public boolean nextRaw(List<Cell> cells) throws IOException { + return nextRaw(cells, ScannerContext.newBuilder().build()); + } + + @Override + public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); + } + + @Override + public boolean next(List<Cell> cells) throws IOException { + return next(cells, ScannerContext.newBuilder().build()); + } + + @Override + public boolean next(List<Cell> cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); + } + + /** + * Get value converter associated with a column or a column prefix. If nothing + * matches, generic converter is returned. + * @param colQualifierBytes + * @return value converter implementation. + */ + private static ValueConverter getValueConverter(byte[] colQualifierBytes) { + // Iterate over all the column prefixes for flow run table and get the + // appropriate converter for the column qualifier passed if prefix matches. + for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) { + byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes(""); + if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length, + colQualifierBytes, 0, colPrefixBytes.length) == 0) { + return colPrefix.getValueConverter(); + } + } + // Iterate over all the columns for flow run table and get the + // appropriate converter for the column qualifier passed if match occurs. + for (FlowRunColumn column : FlowRunColumn.values()) { + if (Bytes.compareTo( + column.getColumnQualifierBytes(), colQualifierBytes) == 0) { + return column.getValueConverter(); + } + } + // Return generic converter if nothing matches. + return GenericConverter.getInstance(); + } + + /** + * This method loops through the cells in a given row of the + * {@link FlowRunTable}. It looks at the tags of each cell to figure out how + * to process the contents. It then calculates the sum or min or max for each + * column or returns the cell as is. + * + * @param cells + * @param scannerContext + * @return true if next row is available for the scanner, false otherwise + * @throws IOException + */ + private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext) + throws IOException { + Cell cell = null; + startNext(); + // Loop through all the cells in this row + // For min/max/metrics we do need to scan the entire set of cells to get the + // right one + // But with flush/compaction, the number of cells being scanned will go down + // cells are grouped per column qualifier then sorted by cell timestamp + // (latest to oldest) per column qualifier + // So all cells in one qualifier come one after the other before we see the + // next column qualifier + ByteArrayComparator comp = new ByteArrayComparator(); + byte[] previousColumnQualifier = Separator.EMPTY_BYTES; + AggregationOperation currentAggOp = null; + SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); + Set<String> alreadySeenAggDim = new HashSet<>(); + int addedCnt = 0; + long currentTimestamp = System.currentTimeMillis(); + ValueConverter converter = null; + int limit = batchSize; + + while (limit <= 0 || addedCnt < limit) { + cell = peekAtNextCell(scannerContext); + if (cell == null) { + break; + } + byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell); + if (previousColumnQualifier == null) { + // first time in loop + previousColumnQualifier = currentColumnQualifier; + } + + converter = getValueConverter(currentColumnQualifier); + if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + converter, currentTimestamp); + resetState(currentColumnCells, alreadySeenAggDim); + previousColumnQualifier = currentColumnQualifier; + currentAggOp = getCurrentAggOp(cell); + converter = getValueConverter(currentColumnQualifier); + } + collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, + converter, scannerContext); + nextCell(scannerContext); + } + if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter, + currentTimestamp); + if (LOG.isDebugEnabled()) { + if (addedCnt > 0) { + LOG.debug("emitted cells. " + addedCnt + " for " + this.action + + " rowKey=" + + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0)))); + } else { + LOG.debug("emitted no cells for " + this.action); + } + } + } + return hasMore(); + } + + private AggregationOperation getCurrentAggOp(Cell cell) { + List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + // We assume that all the operations for a particular column are the same + return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags); + } + + /** + * resets the parameters to an initialized state for next loop iteration. + */ + private void resetState(SortedSet<Cell> currentColumnCells, + Set<String> alreadySeenAggDim) { + currentColumnCells.clear(); + alreadySeenAggDim.clear(); + } + + private void collectCells(SortedSet<Cell> currentColumnCells, + AggregationOperation currentAggOp, Cell cell, + Set<String> alreadySeenAggDim, ValueConverter converter, + ScannerContext scannerContext) throws IOException { + + if (currentAggOp == null) { + // not a min/max/metric cell, so just return it as is + currentColumnCells.add(cell); + return; + } + + switch (currentAggOp) { + case GLOBAL_MIN: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMinCell = currentColumnCells.first(); + Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, + (NumericValueConverter) converter); + if (!currentMinCell.equals(newMinCell)) { + currentColumnCells.remove(currentMinCell); + currentColumnCells.add(newMinCell); + } + } + break; + case GLOBAL_MAX: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMaxCell = currentColumnCells.first(); + Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, + (NumericValueConverter) converter); + if (!currentMaxCell.equals(newMaxCell)) { + currentColumnCells.remove(currentMaxCell); + currentColumnCells.add(newMaxCell); + } + } + break; + case SUM: + case SUM_FINAL: + if (LOG.isTraceEnabled()) { + LOG.trace("In collect cells " + + " FlowSannerOperation=" + + this.action + + " currentAggOp=" + + currentAggOp + + " cell qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + + " cell value= " + + converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp()); + } + + // only if this app has not been seen yet, add to current column cells + List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + String aggDim = HBaseTimelineServerUtils + .getAggregationCompactionDimension(tags); + if (!alreadySeenAggDim.contains(aggDim)) { + // if this agg dimension has already been seen, + // since they show up in sorted order + // we drop the rest which are older + // in other words, this cell is older than previously seen cells + // for that agg dim + // but when this agg dim is not seen, + // consider this cell in our working set + currentColumnCells.add(cell); + alreadySeenAggDim.add(aggDim); + } + break; + default: + break; + } // end of switch case + } + + /* + * Processes the cells in input param currentColumnCells and populates + * List<Cell> cells as the output based on the input AggregationOperation + * parameter. + */ + private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells, + AggregationOperation currentAggOp, ValueConverter converter, + long currentTimestamp) throws IOException { + if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { + return 0; + } + if (currentAggOp == null) { + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("In emitCells " + this.action + " currentColumnCells size= " + + currentColumnCells.size() + " currentAggOp" + currentAggOp); + } + + switch (currentAggOp) { + case GLOBAL_MIN: + case GLOBAL_MAX: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case SUM: + case SUM_FINAL: + switch (action) { + case FLUSH: + case MINOR_COMPACTION: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case READ: + Cell sumCell = processSummation(currentColumnCells, + (NumericValueConverter) converter); + cells.add(sumCell); + return 1; + case MAJOR_COMPACTION: + List<Cell> finalCells = processSummationMajorCompaction( + currentColumnCells, (NumericValueConverter) converter, + currentTimestamp); + cells.addAll(finalCells); + return finalCells.size(); + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + } + + /* + * Returns a cell whose value is the sum of all cell values in the input set. + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + private Cell processSummation(SortedSet<Cell> currentColumnCells, + NumericValueConverter converter) throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + long mostCurrentTimestamp = 0L; + Cell mostRecentCell = null; + for (Cell cell : currentColumnCells) { + currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell)); + ts = cell.getTimestamp(); + if (mostCurrentTimestamp < ts) { + mostCurrentTimestamp = ts; + mostRecentCell = cell; + } + sum = converter.add(sum, currentValue); + } + byte[] sumBytes = converter.encodeValue(sum); + Cell sumCell = + HBaseTimelineServerUtils.createNewCell(mostRecentCell, sumBytes); + return sumCell; + } + + + /** + * Returns a list of cells that contains + * + * A) the latest cells for applications that haven't finished yet + * B) summation + * for the flow, based on applications that have completed and are older than + * a certain time + * + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + @VisibleForTesting + List<Cell> processSummationMajorCompaction( + SortedSet<Cell> currentColumnCells, NumericValueConverter converter, + long currentTimestamp) + throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + boolean summationDone = false; + List<Cell> finalCells = new ArrayList<Cell>(); + if (currentColumnCells == null) { + return finalCells; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("In processSummationMajorCompaction," + + " will drop cells older than " + currentTimestamp + + " CurrentColumnCells size=" + currentColumnCells.size()); + } + + for (Cell cell : currentColumnCells) { + AggregationOperation cellAggOp = getCurrentAggOp(cell); + // if this is the existing flow sum cell + List<Tag> tags = HBaseTimelineServerUtils.convertCellAsTagList(cell); + String appId = HBaseTimelineServerUtils + .getAggregationCompactionDimension(tags); + if (appId == FLOW_APP_ID) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("reading flow app id sum=" + sum); + } + } else { + currentValue = (Number) converter.decodeValue(CellUtil + .cloneValue(cell)); + // read the timestamp truncated by the generator + ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); + if ((cellAggOp == AggregationOperation.SUM_FINAL) + && ((ts + this.appFinalValueRetentionThreshold) + < currentTimestamp)) { + sum = converter.add(sum, currentValue); + summationDone = true; + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION loop sum= " + sum + + " discarding now: " + " qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" + + converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp() + " " + this.action); + } + } else { + // not a final value but it's the latest cell for this app + // so include this cell in the list of cells to write back + finalCells.add(cell); + } + } + } + if (summationDone) { + Cell anyCell = currentColumnCells.first(); + List<Tag> tags = new ArrayList<Tag>(); + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + t = HBaseTimelineServerUtils.createTag( + AggregationCompactionDimension.APPLICATION_ID.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); + Cell sumCell = HBaseTimelineServerUtils.createNewCell( + CellUtil.cloneRow(anyCell), + CellUtil.cloneFamily(anyCell), + CellUtil.cloneQualifier(anyCell), + TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), FLOW_APP_ID), + converter.encodeValue(sum), tagByteArray); + finalCells.add(sumCell); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " " + this.action); + } + LOG.info("After major compaction for qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with sum=" + sum.longValue() + + " with cell timestamp " + sumCell.getTimestamp()); + } else { + String qualifier = ""; + LOG.info("After major compaction for qualifier=" + qualifier + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with zero sum=" + + sum.longValue()); + } + return finalCells; + } + + /** + * Determines which cell is to be returned based on the values in each cell + * and the comparison operation MIN or MAX. + * + * @param previouslyChosenCell + * @param currentCell + * @param currentAggOp + * @return the cell which is the min (or max) cell + * @throws IOException + */ + private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, + AggregationOperation currentAggOp, NumericValueConverter converter) + throws IOException { + if (previouslyChosenCell == null) { + return currentCell; + } + try { + Number previouslyChosenCellValue = (Number)converter.decodeValue( + CellUtil.cloneValue(previouslyChosenCell)); + Number currentCellValue = (Number) converter.decodeValue(CellUtil + .cloneValue(currentCell)); + switch (currentAggOp) { + case GLOBAL_MIN: + if (converter.compare( + currentCellValue, previouslyChosenCellValue) < 0) { + // new value is minimum, hence return this cell + return currentCell; + } else { + // previously chosen value is miniumum, hence return previous min cell + return previouslyChosenCell; + } + case GLOBAL_MAX: + if (converter.compare( + currentCellValue, previouslyChosenCellValue) > 0) { + // new value is max, hence return this cell + return currentCell; + } else { + // previously chosen value is max, hence return previous max cell + return previouslyChosenCell; + } + default: + return currentCell; + } + } catch (IllegalArgumentException iae) { + LOG.error("caught iae during conversion to long ", iae); + return currentCell; + } + } + + @Override + public void close() throws IOException { + if (flowRunScanner != null) { + flowRunScanner.close(); + } else { + LOG.warn("scanner close called but scanner is null"); + } + } + + /** + * Called to signal the start of the next() call by the scanner. + */ + public void startNext() { + currentRow = null; + } + + /** + * Returns whether or not the underlying scanner has more rows. + */ + public boolean hasMore() { + return currentIndex < availableCells.size() ? true : hasMore; + } + + /** + * Returns the next available cell for the current row and advances the + * pointer to the next cell. This method can be called multiple times in a row + * to advance through all the available cells. + * + * @param scannerContext + * context information for the batch of cells under consideration + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell nextCell(ScannerContext scannerContext) throws IOException { + Cell cell = peekAtNextCell(scannerContext); + if (cell != null) { + currentIndex++; + } + return cell; + } + + /** + * Returns the next available cell for the current row, without advancing the + * pointer. Calling this method multiple times in a row will continue to + * return the same cell. + * + * @param scannerContext + * context information for the batch of cells under consideration + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException if any problem is encountered while grabbing the next + * cell. + */ + public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException { + if (currentIndex >= availableCells.size()) { + // done with current batch + availableCells.clear(); + currentIndex = 0; + hasMore = flowRunScanner.next(availableCells, scannerContext); + } + Cell cell = null; + if (currentIndex < availableCells.size()) { + cell = availableCells.get(currentIndex); + if (currentRow == null) { + currentRow = CellUtil.cloneRow(cell); + } else if (!CellUtil.matchingRow(cell, currentRow)) { + // moved on to the next row + // don't use the current cell + // also signal no more cells for this row + return null; + } + } + return cell; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() + */ + @Override + public long getMaxResultSize() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's scanner is not a RegionScanner"); + } + return regionScanner.getMaxResultSize(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() + */ + @Override + public long getMvccReadPoint() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.getMvccReadPoint(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() + */ + @Override + public boolean isFilterDone() throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.isFilterDone(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) + */ + @Override + public boolean reseek(byte[] bytes) throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.reseek() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.reseek(bytes); + } + + @Override + public int getBatch() { + return batchSize; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java new file mode 100644 index 0000000..73c666f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + + +/** + * Identifies the scanner operation on the {@link FlowRunTable}. + */ +public enum FlowScannerOperation { + + /** + * If the scanner is opened for reading + * during preGet or preScan. + */ + READ, + + /** + * If the scanner is opened during preFlush. + */ + FLUSH, + + /** + * If the scanner is opened during minor Compaction. + */ + MINOR_COMPACTION, + + /** + * If the scanner is opened during major Compaction. + */ + MAJOR_COMPACTION +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java new file mode 100644 index 0000000..04963f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow + * contains classes related to implementation for flow related tables, viz. flow + * run table and flow activity table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java new file mode 100644 index 0000000..e78db2a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-2/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage contains + * classes which define and implement reading and writing to backend storage. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml index d06907d..3ca443f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/pom.xml @@ -24,138 +24,36 @@ <groupId>org.apache.hadoop</groupId> <version>3.2.0-SNAPSHOT</version> </parent> - <modelVersion>4.0.0</modelVersion> + <artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId> - <name>Apache Hadoop YARN TimelineService HBase Server</name> <version>3.2.0-SNAPSHOT</version> - - <properties> - <!-- Needed for generating FindBugs warnings using parent pom --> - <yarn.basedir>${project.parent.parent.parent.basedir}</yarn.basedir> - </properties> - - <dependencies> - <dependency> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-sslengine</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptor>src/assembly/coprocessor.xml</descriptor> - <attach>true</attach> - </configuration> - <executions> - <execution> - <id>create-coprocessor-jar</id> - <phase>prepare-package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> + <name>Apache Hadoop YARN TimelineService HBase Servers</name> + <packaging>pom</packaging> + + <profiles> + <profile> + <id>hbase1</id> + <activation> + <property> + <name>!hbase.profile</name> + </property> + </activation> + <modules> + <module>hadoop-yarn-server-timelineservice-hbase-server-1</module> + </modules> + </profile> + <profile> + <id>hbase2</id> + <activation> + <property> + <name>hbase.profile</name> + <value>2.0</value> + </property> + </activation> + <modules> + <module>hadoop-yarn-server-timelineservice-hbase-server-2</module> + </modules> + </profile> + </profiles> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml deleted file mode 100644 index 01ff0dd..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/assembly/coprocessor.xml +++ /dev/null @@ -1,37 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.01 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd"> - <id>coprocessor</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <useProjectArtifact>true</useProjectArtifact> - <unpack>true</unpack> - <scope>runtime</scope> - <includes> - <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include> - <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include> - </includes> - </dependencySet> - </dependencySets> -</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java deleted file mode 100644 index 5c07670..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * A utility class used by hbase-server module. - */ -public final class HBaseTimelineServerUtils { - private HBaseTimelineServerUtils() { - } - - /** - * Creates a {@link Tag} from the input attribute. - * - * @param attribute Attribute from which tag has to be fetched. - * @return a HBase Tag. - */ - public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) { - // attribute could be either an Aggregation Operation or - // an Aggregation Dimension - // Get the Tag type from either - AggregationOperation aggOp = AggregationOperation - .getAggregationOperation(attribute.getKey()); - if (aggOp != null) { - Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); - return t; - } - - AggregationCompactionDimension aggCompactDim = - AggregationCompactionDimension.getAggregationCompactionDimension( - attribute.getKey()); - if (aggCompactDim != null) { - Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); - return t; - } - return null; - } - - /** - * creates a new cell based on the input cell but with the new value. - * - * @param origCell Original cell - * @param newValue new cell value - * @return cell - * @throws IOException while creating new cell. - */ - public static Cell createNewCell(Cell origCell, byte[] newValue) - throws IOException { - return CellUtil.createCell(CellUtil.cloneRow(origCell), - CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), - origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); - } - - /** - * creates a cell with the given inputs. - * - * @param row row of the cell to be created - * @param family column family name of the new cell - * @param qualifier qualifier for the new cell - * @param ts timestamp of the new cell - * @param newValue value of the new cell - * @param tags tags in the new cell - * @return cell - * @throws IOException while creating the cell. - */ - public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, - long ts, byte[] newValue, byte[] tags) throws IOException { - return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, - newValue, tags); - } - - /** - * returns app id from the list of tags. - * - * @param tags cell tags to be looked into - * @return App Id as the AggregationCompactionDimension - */ - public static String getAggregationCompactionDimension(List<Tag> tags) { - String appId = null; - for (Tag t : tags) { - if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t - .getType()) { - appId = Bytes.toString(t.getValue()); - return appId; - } - } - return appId; - } - - /** - * Returns the first seen aggregation operation as seen in the list of input - * tags or null otherwise. - * - * @param tags list of HBase tags. - * @return AggregationOperation - */ - public static AggregationOperation getAggregationOperationFromTagsList( - List<Tag> tags) { - for (AggregationOperation aggOp : AggregationOperation.values()) { - for (Tag tag : tags) { - if (tag.getType() == aggOp.getTagType()) { - return aggOp; - } - } - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java deleted file mode 100644 index 0df5b8a..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains - * a set of utility classes used across backend storage reader and writer. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java deleted file mode 100644 index 41a371b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Coprocessor for flow run table. - */ -public class FlowRunCoprocessor extends BaseRegionObserver { - - private static final Logger LOG = - LoggerFactory.getLogger(FlowRunCoprocessor.class); - - private Region region; - /** - * generate a timestamp that is unique per row in a region this is per region. - */ - private final TimestampGenerator timestampGenerator = - new TimestampGenerator(); - - @Override - public void start(CoprocessorEnvironment e) throws IOException { - if (e instanceof RegionCoprocessorEnvironment) { - RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - this.region = env.getRegion(); - } - } - - /* - * (non-Javadoc) - * - * This method adds the tags onto the cells in the Put. It is presumed that - * all the cells in one Put have the same set of Tags. The existing cell - * timestamp is overwritten for non-metric cells and each such cell gets a new - * unique timestamp generated by {@link TimestampGenerator} - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache - * .hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Put, - * org.apache.hadoop.hbase.regionserver.wal.WALEdit, - * org.apache.hadoop.hbase.client.Durability) - */ - @Override - public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, - WALEdit edit, Durability durability) throws IOException { - Map<String, byte[]> attributes = put.getAttributesMap(); - // Assumption is that all the cells in a put are the same operation. - List<Tag> tags = new ArrayList<>(); - if ((attributes != null) && (attributes.size() > 0)) { - for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) { - Tag t = HBaseTimelineServerUtils.getTagFromAttribute(attribute); - if (t != null) { - tags.add(t); - } - } - byte[] tagByteArray = Tag.fromList(tags); - NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap() - .entrySet()) { - List<Cell> newCells = new ArrayList<>(entry.getValue().size()); - for (Cell cell : entry.getValue()) { - // for each cell in the put add the tags - // Assumption is that all the cells in - // one put are the same operation - // also, get a unique cell timestamp for non-metric cells - // this way we don't inadvertently overwrite cell versions - long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); - newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), - CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), - cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), - tagByteArray)); - } - newFamilyMap.put(entry.getKey(), newCells); - } // for each entry - // Update the family map for the Put - put.setFamilyCellMap(newFamilyMap); - } - } - - /** - * Determines if the current cell's timestamp is to be used or a new unique - * cell timestamp is to be used. The reason this is done is to inadvertently - * overwrite cells when writes come in very fast. But for metric cells, the - * cell timestamp signifies the metric timestamp. Hence we don't want to - * overwrite it. - * - * @param timestamp - * @param tags - * @return cell timestamp - */ - private long getCellTimestamp(long timestamp, List<Tag> tags) { - // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default) - // then use the generator - if (timestamp == HConstants.LATEST_TIMESTAMP) { - return timestampGenerator.getUniqueTimestamp(); - } else { - return timestamp; - } - } - - /* - * (non-Javadoc) - * - * Creates a {@link FlowScanner} Scan so that it can correctly process the - * contents of {@link FlowRunTable}. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache - * .hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Get, java.util.List) - */ - @Override - public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, - Get get, List<Cell> results) throws IOException { - Scan scan = new Scan(get); - scan.setMaxVersions(); - RegionScanner scanner = null; - try { - scanner = new FlowScanner(e.getEnvironment(), scan, - region.getScanner(scan), FlowScannerOperation.READ); - scanner.next(results); - e.bypass(); - } finally { - if (scanner != null) { - scanner.close(); - } - } - } - - /* - * (non-Javadoc) - * - * Ensures that max versions are set for the Scan so that metrics can be - * correctly aggregated and min/max can be correctly determined. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org - * .apache.hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Scan, - * org.apache.hadoop.hbase.regionserver.RegionScanner) - */ - @Override - public RegionScanner preScannerOpen( - ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, - RegionScanner scanner) throws IOException { - // set max versions for scan to see all - // versions to aggregate for metrics - scan.setMaxVersions(); - return scanner; - } - - /* - * (non-Javadoc) - * - * Creates a {@link FlowScanner} Scan so that it can correctly process the - * contents of {@link FlowRunTable}. - * - * @see - * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( - * org.apache.hadoop.hbase.coprocessor.ObserverContext, - * org.apache.hadoop.hbase.client.Scan, - * org.apache.hadoop.hbase.regionserver.RegionScanner) - */ - @Override - public RegionScanner postScannerOpen( - ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, - RegionScanner scanner) throws IOException { - return new FlowScanner(e.getEnvironment(), scan, - scanner, FlowScannerOperation.READ); - } - - @Override - public InternalScanner preFlush( - ObserverContext<RegionCoprocessorEnvironment> c, Store store, - InternalScanner scanner) throws IOException { - if (LOG.isDebugEnabled()) { - if (store != null) { - LOG.debug("preFlush store = " + store.getColumnFamilyName() - + " flushableSize=" + store.getFlushableSize() - + " flushedCellsCount=" + store.getFlushedCellsCount() - + " compactedCellsCount=" + store.getCompactedCellsCount() - + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" - + store.getMemstoreFlushSize() + " memstoreSize=" - + store.getMemStoreSize() + " size=" + store.getSize() - + " storeFilesCount=" + store.getStorefilesCount()); - } - } - return new FlowScanner(c.getEnvironment(), scanner, - FlowScannerOperation.FLUSH); - } - - @Override - public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, StoreFile resultFile) { - if (LOG.isDebugEnabled()) { - if (store != null) { - LOG.debug("postFlush store = " + store.getColumnFamilyName() - + " flushableSize=" + store.getFlushableSize() - + " flushedCellsCount=" + store.getFlushedCellsCount() - + " compactedCellsCount=" + store.getCompactedCellsCount() - + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" - + store.getMemstoreFlushSize() + " memstoreSize=" - + store.getMemStoreSize() + " size=" + store.getSize() - + " storeFilesCount=" + store.getStorefilesCount()); - } - } - } - - @Override - public InternalScanner preCompact( - ObserverContext<RegionCoprocessorEnvironment> e, Store store, - InternalScanner scanner, ScanType scanType, CompactionRequest request) - throws IOException { - - FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; - if (request != null) { - requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION - : FlowScannerOperation.MINOR_COMPACTION); - LOG.info("Compactionrequest= " + request.toString() + " " - + requestOp.toString() + " RegionName=" + e.getEnvironment() - .getRegion().getRegionInfo().getRegionNameAsString()); - } - return new FlowScanner(e.getEnvironment(), scanner, requestOp); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org