http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java new file mode 100644 index 0000000..af8df99 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; + +/** + * The flow activity table has column family info + * Stores the daily activity record for flows + * Useful as a quick lookup of what flows were + * running on a given day + * + * Example flow activity table record: + * + * </pre> + * |-------------------------------------------| + * | Row key | Column Family | + * | | info | + * |-------------------------------------------| + * | clusterId! | r!runid1:version1 | + * | inv Top of | | + * | Day! | r!runid2:version7 | + * | userName! | | + * | flowId | | + * |-------------------------------------------| + * </pre> + */ +public class FlowActivityTable extends BaseTable<FlowActivityTable> { + /** flow activity table prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity"; + + /** config param name that specifies the flowactivity table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowactivity table name */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity"; + + private static final Log LOG = LogFactory.getLog(FlowActivityTable.class); + + /** default max number of versions */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowActivityTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + FlowActivityTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy before running in production + admin.createTable(FlowActivityTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java new file mode 100644 index 0000000..ad30add --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies fully qualified columns for the {@link FlowRunTable}. + */ +public enum FlowRunColumn implements Column<FlowRunTable> { + + /** + * When the flow was started. This is the minimum of currently known + * application start times. + */ + MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", + AggregationOperation.MIN), + + /** + * When the flow ended. This is the maximum of currently known application end + * times. + */ + MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", + AggregationOperation.MAX), + + /** + * The version of the flow that this flow belongs to. + */ + FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null); + + private final ColumnHelper<FlowRunTable> column; + private final ColumnFamily<FlowRunTable> columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + private final AggregationOperation aggOp; + + private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily, + String columnQualifier, AggregationOperation aggOp) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + this.aggOp = aggOp; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE + .encode(columnQualifier)); + this.column = new ColumnHelper<FlowRunTable>(columnFamily); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes.clone(); + } + + public AggregationOperation getAggregationOperation() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store + * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + public void store(byte[] rowKey, + TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, aggOp); + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue, combinedAttributes); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link FlowRunColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumn} or null + */ + public static final FlowRunColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (FlowRunColumn ec : FlowRunColumn.values()) { + // Find a match based only on name. + if (ec.getColumnQualifier().equals(columnQualifier)) { + return ec; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowRunColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param name + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumn} or null if both arguments + * don't match. + */ + public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily, + String name) { + + for (FlowRunColumn ec : FlowRunColumn.values()) { + // Find a match based column family and on name. + if (ec.columnFamily.equals(columnFamily) + && ec.getColumnQualifier().equals(name)) { + return ec; + } + } + + // Default to null + return null; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java new file mode 100644 index 0000000..8faf5f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the flow run table column families. + */ +public enum FlowRunColumnFamily implements ColumnFamily<FlowRunTable> { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value + * create a column family with this name. Must be lower case and + * without spaces. + */ + private FlowRunColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java new file mode 100644 index 0000000..d55f510 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the {@link FlowRunTable}. + */ +public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { + + /** + * To store flow run info values. + */ + METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM); + + private final ColumnHelper<FlowRunTable> column; + private final ColumnFamily<FlowRunTable> columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private final AggregationOperation aggOp; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily + * that this column is stored in. + * @param columnPrefix + * for this column. + */ + private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily, + String columnPrefix, AggregationOperation fra) { + column = new ColumnHelper<FlowRunTable>(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE + .encode(columnPrefix)); + } + this.aggOp = fra; + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + public byte[] getColumnPrefixBytes() { + return columnPrefixBytes.clone(); + } + + public AggregationOperation getAttribute() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator<FlowRunTable> tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator<FlowRunTable> tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map<String, Object> readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps( + Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is + * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumnPrefix} or null + */ + public static final FlowRunColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { + // Find a match based only on name. + if (frcp.getColumnPrefix().equals(columnPrefix)) { + return frcp; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is + * no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumnPrefix} or null if both + * arguments don't match. + */ + public static final FlowRunColumnPrefix columnFor( + FlowRunColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { + // Find a match based column family and on name. + if (frcp.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp + .getColumnPrefix().equals(columnPrefix)))) { + return frcp; + } + } + + // Default to null + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java new file mode 100644 index 0000000..f743e5e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; + +public class FlowRunCoprocessor extends BaseRegionObserver { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); + + private HRegion region; + /** + * generate a timestamp that is unique per row in a region this is per region + */ + private final TimestampGenerator timestampGenerator = new TimestampGenerator(); + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + this.region = env.getRegion(); + } + } + + /* + * (non-Javadoc) + * + * This method adds the tags onto the cells in the Put. It is presumed that + * all the cells in one Put have the same set of Tags. The existing cell + * timestamp is overwritten for non-metric cells and each such cell gets a new + * unique timestamp generated by {@link TimestampGenerator} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Put, + * org.apache.hadoop.hbase.regionserver.wal.WALEdit, + * org.apache.hadoop.hbase.client.Durability) + */ + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, + WALEdit edit, Durability durability) throws IOException { + Map<String, byte[]> attributes = put.getAttributesMap(); + + // Assumption is that all the cells in a put are the same operation. + List<Tag> tags = new ArrayList<>(); + if ((attributes != null) && (attributes.size() > 0)) { + for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) { + Tag t = TimelineWriterUtils.getTagFromAttribute(attribute); + tags.add(t); + } + byte[] tagByteArray = Tag.fromList(tags); + NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap() + .entrySet()) { + List<Cell> newCells = new ArrayList<>(entry.getValue().size()); + for (Cell cell : entry.getValue()) { + // for each cell in the put add the tags + // Assumption is that all the cells in + // one put are the same operation + // also, get a unique cell timestamp for non-metric cells + // this way we don't inadvertently overwrite cell versions + long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); + newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), + CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), + tagByteArray)); + } + newFamilyMap.put(entry.getKey(), newCells); + } // for each entry + // Update the family map for the Put + put.setFamilyCellMap(newFamilyMap); + } + } + + /** + * Determines if the current cell's timestamp is to be used or a new unique + * cell timestamp is to be used. The reason this is done is to inadvertently + * overwrite cells when writes come in very fast. But for metric cells, the + * cell timestamp signifies the metric timestamp. Hence we don't want to + * overwrite it. + * + * @param timestamp + * @param tags + * @return cell timestamp + */ + private long getCellTimestamp(long timestamp, List<Tag> tags) { + // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default) + // then use the generator + if (timestamp == HConstants.LATEST_TIMESTAMP) { + return timestampGenerator.getUniqueTimestamp(); + } else { + return timestamp; + } + } + + /* + * (non-Javadoc) + * + * Creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable}. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Get, java.util.List) + */ + @Override + public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, + Get get, List<Cell> results) throws IOException { + Scan scan = new Scan(get); + scan.setMaxVersions(); + RegionScanner scanner = null; + try { + scanner = new FlowScanner(region, scan.getBatch(), + region.getScanner(scan)); + scanner.next(results); + e.bypass(); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } + + /* + * (non-Javadoc) + * + * Ensures that max versions are set for the Scan so that metrics can be + * correctly aggregated and min/max can be correctly determined. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org + * .apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner preScannerOpen( + ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, + RegionScanner s) throws IOException { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + return s; + } + + /* + * (non-Javadoc) + * + * Creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable}. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( + * org.apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner postScannerOpen( + ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, + RegionScanner scanner) throws IOException { + return new FlowScanner(region, scan.getBatch(), scanner); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java new file mode 100644 index 0000000..e133241 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +/** + * Represents a rowkey for the flow run table. + */ +public class FlowRunRowKey { + // TODO: more methods are needed for this class like parse row key + + /** + * Constructs a row key for the entity table as follows: { + * clusterId!userI!flowId!Inverted Flow Run Id} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @return byte array with the row key + */ + public static byte[] getRowKey(String clusterId, String userId, + String flowId, Long flowRunId) { + byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, + userId, flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); + return Separator.QUALIFIERS.join(first, second); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java new file mode 100644 index 0000000..b1b93c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; + +/** + * The flow run table has column family info + * Stores per flow run information + * aggregated across applications. + * + * Metrics are also stored in the info column family. + * + * Example flow run table record: + * + * <pre> + * flow_run table + * |-------------------------------------------| + * | Row key | Column Family | + * | | info | + * |-------------------------------------------| + * | clusterId! | flow_version:version7 | + * | userName! | | + * | flowId! | running_apps:1 | + * | flowRunId | | + * | | min_start_time:1392995080000 | + * | | #0:"" | + * | | | + * | | min_start_time:1392995081012 | + * | | #0:appId2 | + * | | | + * | | min_start_time:1392993083210 | + * | | #0:appId3 | + * | | | + * | | | + * | | max_end_time:1392993084018 | + * | | #0:"" | + * | | | + * | | | + * | | m!mapInputRecords:127 | + * | | #0:"" | + * | | | + * | | m!mapInputRecords:31 | + * | | #2:appId2 | + * | | | + * | | m!mapInputRecords:37 | + * | | #1:appId3 | + * | | | + * | | | + * | | m!mapOutputRecords:181 | + * | | #0:"" | + * | | | + * | | m!mapOutputRecords:37 | + * | | #1:appId3 | + * | | | + * | | | + * |-------------------------------------------| + * </pre> + */ +public class FlowRunTable extends BaseTable<FlowRunTable> { + /** entity prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun"; + + /** config param name that specifies the flowrun table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowrun table name */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun"; + + private static final Log LOG = LogFactory.getLog(FlowRunTable.class); + + /** default max number of versions */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowRunTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor flowRunTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + flowRunTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy + flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class + .getCanonicalName()); + admin.createTable(flowRunTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java new file mode 100644 index 0000000..a1948aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Invoked via the coprocessor when a Get or a Scan is issued for flow run + * table. Looks through the list of cells per row, checks their tags and does + * operation on those cells as per the cell tags. Transforms reads of the stored + * metrics into calculated sums for each column Also, finds the min and max for + * start and end times in a flow run. + */ +class FlowScanner implements RegionScanner, Closeable { + + private static final Log LOG = LogFactory.getLog(FlowScanner.class); + + private final HRegion region; + private final InternalScanner flowRunScanner; + private RegionScanner regionScanner; + private final int limit; + private boolean hasMore; + private byte[] currentRow; + private List<Cell> availableCells = new ArrayList<>(); + private int currentIndex; + + FlowScanner(HRegion region, int limit, InternalScanner internalScanner) { + this.region = region; + this.limit = limit; + this.flowRunScanner = internalScanner; + if (internalScanner instanceof RegionScanner) { + this.regionScanner = (RegionScanner) internalScanner; + } + // TODO: note if it's compaction/flush + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() + */ + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean nextRaw(List<Cell> cells) throws IOException { + return nextRaw(cells, limit); + } + + @Override + public boolean nextRaw(List<Cell> cells, int limit) throws IOException { + return nextInternal(cells, limit); + } + + @Override + public boolean next(List<Cell> cells) throws IOException { + return next(cells, limit); + } + + @Override + public boolean next(List<Cell> cells, int limit) throws IOException { + return nextInternal(cells, limit); + } + + private String getAggregationCompactionDimension(List<Tag> tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + } + } + return appId; + } + + /** + * This method loops through the cells in a given row of the + * {@link FlowRunTable}. It looks at the tags of each cell to figure out how + * to process the contents. It then calculates the sum or min or max for each + * column or returns the cell as is. + * + * @param cells + * @param limit + * @return true if next row is available for the scanner, false otherwise + * @throws IOException + */ + private boolean nextInternal(List<Cell> cells, int limit) throws IOException { + Cell cell = null; + startNext(); + // Loop through all the cells in this row + // For min/max/metrics we do need to scan the entire set of cells to get the + // right one + // But with flush/compaction, the number of cells being scanned will go down + // cells are grouped per column qualifier then sorted by cell timestamp + // (latest to oldest) per column qualifier + // So all cells in one qualifier come one after the other before we see the + // next column qualifier + ByteArrayComparator comp = new ByteArrayComparator(); + byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES; + AggregationOperation currentAggOp = null; + SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); + Set<String> alreadySeenAggDim = new HashSet<>(); + int addedCnt = 0; + while (((cell = peekAtNextCell(limit)) != null) + && (limit <= 0 || addedCnt < limit)) { + byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); + if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp); + resetState(currentColumnCells, alreadySeenAggDim); + currentColumnQualifier = newColumnQualifier; + currentAggOp = getCurrentAggOp(cell); + } + collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim); + nextCell(limit); + } + if (!currentColumnCells.isEmpty()) { + emitCells(cells, currentColumnCells, currentAggOp); + } + return hasMore(); + } + + private AggregationOperation getCurrentAggOp(Cell cell) { + List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + // We assume that all the operations for a particular column are the same + return TimelineWriterUtils.getAggregationOperationFromTagsList(tags); + } + + /** + * resets the parameters to an intialized state for next loop iteration + * + * @param cell + * @param currentAggOp + * @param currentColumnCells + * @param alreadySeenAggDim + * @param collectedButNotEmitted + */ + private void resetState(SortedSet<Cell> currentColumnCells, + Set<String> alreadySeenAggDim) { + currentColumnCells.clear(); + alreadySeenAggDim.clear(); + } + + private void collectCells(SortedSet<Cell> currentColumnCells, + AggregationOperation currentAggOp, Cell cell, + Set<String> alreadySeenAggDim) throws IOException { + if (currentAggOp == null) { + // not a min/max/metric cell, so just return it as is + currentColumnCells.add(cell); + nextCell(limit); + return; + } + + switch (currentAggOp) { + case MIN: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMinCell = currentColumnCells.first(); + Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp); + if (!currentMinCell.equals(newMinCell)) { + currentColumnCells.remove(currentMinCell); + currentColumnCells.add(newMinCell); + } + } + break; + case MAX: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMaxCell = currentColumnCells.first(); + Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp); + if (!currentMaxCell.equals(newMaxCell)) { + currentColumnCells.remove(currentMaxCell); + currentColumnCells.add(newMaxCell); + } + } + break; + case SUM: + case SUM_FINAL: + // only if this app has not been seen yet, add to current column cells + List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + String aggDim = getAggregationCompactionDimension(tags); + if (alreadySeenAggDim.contains(aggDim)) { + // if this agg dimension has already been seen, + // since they show up in sorted order + // we drop the rest which are older + // in other words, this cell is older than previously seen cells + // for that agg dim + } else { + // not seen this agg dim, hence consider this cell in our working set + currentColumnCells.add(cell); + alreadySeenAggDim.add(aggDim); + } + break; + default: + break; + } // end of switch case + } + + /* + * Processes the cells in input param currentColumnCells and populates + * List<Cell> cells as the output based on the input AggregationOperation + * parameter. + */ + private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells, + AggregationOperation currentAggOp) throws IOException { + if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { + return 0; + } + if (currentAggOp == null) { + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + + switch (currentAggOp) { + case MIN: + case MAX: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case SUM: + case SUM_FINAL: + Cell sumCell = processSummation(currentColumnCells); + cells.add(sumCell); + return 1; + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + } + + /* + * Returns a cell whose value is the sum of all cell values in the input set. + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + private Cell processSummation(SortedSet<Cell> currentColumnCells) + throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + long mostCurrentTimestamp = 0l; + Cell mostRecentCell = null; + for (Cell cell : currentColumnCells) { + currentValue = (Number) GenericObjectMapper.read(CellUtil + .cloneValue(cell)); + ts = cell.getTimestamp(); + if (mostCurrentTimestamp < ts) { + mostCurrentTimestamp = ts; + mostRecentCell = cell; + } + sum = sum.longValue() + currentValue.longValue(); + } + Cell sumCell = createNewCell(mostRecentCell, sum); + return sumCell; + } + + /** + * Determines which cell is to be returned based on the values in each cell + * and the comparison operation MIN or MAX. + * + * @param previouslyChosenCell + * @param currentCell + * @param currentAggOp + * @return the cell which is the min (or max) cell + * @throws IOException + */ + private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, + AggregationOperation currentAggOp) throws IOException { + if (previouslyChosenCell == null) { + return currentCell; + } + try { + long previouslyChosenCellValue = ((Number) GenericObjectMapper + .read(CellUtil.cloneValue(previouslyChosenCell))).longValue(); + long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil + .cloneValue(currentCell))).longValue(); + switch (currentAggOp) { + case MIN: + if (currentCellValue < previouslyChosenCellValue) { + // new value is minimum, hence return this cell + return currentCell; + } else { + // previously chosen value is miniumum, hence return previous min cell + return previouslyChosenCell; + } + case MAX: + if (currentCellValue > previouslyChosenCellValue) { + // new value is max, hence return this cell + return currentCell; + } else { + // previously chosen value is max, hence return previous max cell + return previouslyChosenCell; + } + default: + return currentCell; + } + } catch (IllegalArgumentException iae) { + LOG.error("caught iae during conversion to long ", iae); + return currentCell; + } + } + + private Cell createNewCell(Cell origCell, Number number) throws IOException { + byte[] newValue = GenericObjectMapper.write(number); + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + @Override + public void close() throws IOException { + flowRunScanner.close(); + } + + /** + * Called to signal the start of the next() call by the scanner. + */ + public void startNext() { + currentRow = null; + } + + /** + * Returns whether or not the underlying scanner has more rows. + */ + public boolean hasMore() { + return currentIndex < availableCells.size() ? true : hasMore; + } + + /** + * Returns the next available cell for the current row and advances the + * pointer to the next cell. This method can be called multiple times in a row + * to advance through all the available cells. + * + * @param limit + * the limit of number of cells to return if the next batch must be + * fetched by the wrapped scanner + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell nextCell(int limit) throws IOException { + Cell cell = peekAtNextCell(limit); + if (cell != null) { + currentIndex++; + } + return cell; + } + + /** + * Returns the next available cell for the current row, without advancing the + * pointer. Calling this method multiple times in a row will continue to + * return the same cell. + * + * @param limit + * the limit of number of cells to return if the next batch must be + * fetched by the wrapped scanner + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell peekAtNextCell(int limit) throws IOException { + if (currentIndex >= availableCells.size()) { + // done with current batch + availableCells.clear(); + currentIndex = 0; + hasMore = flowRunScanner.next(availableCells, limit); + } + Cell cell = null; + if (currentIndex < availableCells.size()) { + cell = availableCells.get(currentIndex); + if (currentRow == null) { + currentRow = CellUtil.cloneRow(cell); + } else if (!CellUtil.matchingRow(cell, currentRow)) { + // moved on to the next row + // don't use the current cell + // also signal no more cells for this row + return null; + } + } + return cell; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() + */ + @Override + public long getMaxResultSize() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's scanner is not a RegionScanner"); + } + return regionScanner.getMaxResultSize(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() + */ + @Override + public long getMvccReadPoint() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.getMvccReadPoint(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() + */ + @Override + public boolean isFilterDone() throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.isFilterDone(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) + */ + @Override + public boolean reseek(byte[] bytes) throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.reseek() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.reseek(bytes); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 2875e01..3962341 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; @@ -88,20 +87,15 @@ public class TestHBaseTimelineStorage { } private static void createSchema() throws IOException { - new EntityTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); - new AppToFlowTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); - new ApplicationTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } @Test public void testWriteApplicationToHBase() throws Exception { TimelineEntities te = new TimelineEntities(); ApplicationEntity entity = new ApplicationEntity(); - String id = "hello"; - entity.setId(id); + String appId = "application_1000178881110_2002"; + entity.setId(appId); long cTime = 1425016501000L; long mTime = 1425026901000L; entity.setCreatedTime(cTime); @@ -173,12 +167,12 @@ public class TestHBaseTimelineStorage { String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, id, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te); hbi.stop(); // retrieve the row byte[] rowKey = - ApplicationRowKey.getRowKey(cluster, user, flow, runid, id); + ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -190,11 +184,11 @@ public class TestHBaseTimelineStorage { // check the row key byte[] row1 = result.getRow(); assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, - id)); + appId)); // check info column family String id1 = ApplicationColumn.ID.readResult(result).toString(); - assertEquals(id, id1); + assertEquals(appId, id1); Number val = (Number) ApplicationColumn.CREATED_TIME.readResult(result); @@ -252,17 +246,17 @@ public class TestHBaseTimelineStorage { assertEquals(metricValues, metricMap); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id, + TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid, - id, entity.getType(), null, null, null, null, null, null, null, + appId, entity.getType(), null, null, null, null, null, null, null, null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); // verify attributes - assertEquals(id, e1.getId()); + assertEquals(appId, e1.getId()); assertEquals(TimelineEntityType.YARN_APPLICATION.toString(), e1.getType()); assertEquals(cTime, e1.getCreatedTime()); @@ -576,7 +570,7 @@ public class TestHBaseTimelineStorage { String flow = "other_flow_name"; String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; - String appName = "some app name"; + String appName = "application_123465899910_1001"; hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.stop(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e1f30320/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java new file mode 100644 index 0000000..f8331fa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; + +/** + * Generates the data/entities for the FlowRun and FlowActivity Tables + */ +class TestFlowDataGenerator { + + private final static String metric1 = "MAP_SLOT_MILLIS"; + private final static String metric2 = "HDFS_BYTES_READ"; + + + static TimelineEntity getEntityMetricsApp1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 2); + metricValues.put(ts - 80000, 40); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap<Long, Number>(); + ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 31); + metricValues.put(ts - 80000, 57); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + return entity; + } + + static TimelineEntity getEntityMetricsApp2() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 5L); + metricValues.put(ts - 80000, 101L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + return entity; + } + + static TimelineEntity getEntity1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHello"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 20000000000000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add metrics + Set<TimelineMetric> metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map<Long, Number> metricValues = new HashMap<Long, Number>(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(1436512801000L); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + + static TimelineEntity getEntityGreaterStartTime() { + TimelineEntity entity = new TimelineEntity(); + entity.setCreatedTime(30000000000000L); + entity.setId("flowRunHello with greater start time"); + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setType(type); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + long endTs = 1439379885000L; + event.setTimestamp(endTs); + String expKey = "foo_event_greater"; + String expVal = "test_app_greater"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getEntityMaxEndTime(long endTs) { + TimelineEntity entity = new TimelineEntity(); + entity.setId("flowRunHello Max End time"); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(endTs); + String expKey = "foo_even_max_ finished"; + String expVal = "test_app_max_finished"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getEntityMinStartTime() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloMInStartTime"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 10000000000000L; + entity.setCreatedTime(cTime); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + return entity; + } + + + static TimelineEntity getFlowApp1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowActivity_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + +}