Repository: hadoop Updated Branches: refs/heads/feature-YARN-2928 28fc7b140 -> a434b77ae
YARN-4053. Change the way metric values are stored in HBase Storage (Varun Saxena via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a434b77a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a434b77a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a434b77a Branch: refs/heads/feature-YARN-2928 Commit: a434b77aee7953073bed7f629895a9b597f331f0 Parents: 28fc7b1 Author: Sangjin Lee <sj...@apache.org> Authored: Fri Nov 20 10:03:02 2015 -0800 Committer: Sangjin Lee <sj...@apache.org> Committed: Fri Nov 20 10:03:02 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../storage/FlowRunEntityReader.java | 4 +- .../application/ApplicationColumnPrefix.java | 23 +++- .../storage/common/ColumnHelper.java | 27 ++++- .../storage/common/GenericConverter.java | 48 ++++++++ .../storage/common/LongConverter.java | 78 +++++++++++++ .../storage/common/NumericValueConverter.java | 38 +++++++ .../storage/common/TimelineStorageUtils.java | 11 ++ .../storage/common/ValueConverter.java | 45 ++++++++ .../storage/common/package-info.java | 28 +++++ .../storage/entity/EntityColumn.java | 1 - .../storage/entity/EntityColumnPrefix.java | 25 ++++- .../storage/flow/FlowRunColumn.java | 24 +++- .../storage/flow/FlowRunColumnPrefix.java | 21 +++- .../storage/flow/FlowScanner.java | 110 ++++++++++++++----- .../storage/TestHBaseTimelineStorage.java | 81 +++++++++++++- .../storage/flow/TestHBaseStorageFlowRun.java | 15 +-- 17 files changed, 524 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 252d7b3..2b2aae7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -140,6 +140,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-4221. Store user in app to flow table (Varun Saxena via sjlee) + YARN-4053. Change the way metric values are stored in HBase Storage (Varun + Saxena via sjlee) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java index c4b4e91..ebf2d27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java @@ -137,7 +137,7 @@ class FlowRunEntityReader extends TimelineEntityReader { } // read the start time - Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result); + Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); if (startTime != null) { flowRun.setStartTime(startTime.longValue()); } @@ -147,7 +147,7 @@ class FlowRunEntityReader extends TimelineEntityReader { } // read the end time if available - Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result); + Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); if (endTime != null) { flowRun.setMaxEndTime(endTime.longValue()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index d7b5773..b06f5c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -26,8 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** @@ -63,7 +66,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { /** * Metrics are stored with the metric name as the column name. */ - METRIC(ApplicationColumnFamily.METRICS, null); + METRIC(ApplicationColumnFamily.METRICS, null, + LongConverter.getInstance()); private final ColumnHelper<ApplicationTable> column; private final ColumnFamily<ApplicationTable> columnFamily; @@ -83,7 +87,20 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { */ private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, String columnPrefix) { - column = new ColumnHelper<ApplicationTable>(columnFamily); + this(columnFamily, columnPrefix, GenericConverter.getInstance()); + } + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + * @param converter used to encode/decode values to be stored in HBase for + * this column prefix. + */ + private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, + String columnPrefix, ValueConverter converter) { + column = new ColumnHelper<ApplicationTable>(columnFamily, converter); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; if (columnPrefix == null) { @@ -127,7 +144,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); - } + } /* * (non-Javadoc) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index 3a2e088..1e63ce5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** @@ -50,9 +49,20 @@ public class ColumnHelper<T> { */ private final byte[] columnFamilyBytes; + private final ValueConverter converter; + public ColumnHelper(ColumnFamily<T> columnFamily) { + this(columnFamily, GenericConverter.getInstance()); + } + + public ColumnHelper(ColumnFamily<T> columnFamily, ValueConverter converter) { this.columnFamily = columnFamily; columnFamilyBytes = columnFamily.getBytes(); + if (converter == null) { + this.converter = GenericConverter.getInstance(); + } else { + this.converter = converter; + } } /** @@ -83,7 +93,7 @@ public class ColumnHelper<T> { Put p = new Put(rowKey); timestamp = getPutTimestamp(timestamp, attributes); p.addColumn(columnFamilyBytes, columnQualifier, timestamp, - GenericObjectMapper.write(inputValue)); + converter.encodeValue(inputValue)); if ((attributes != null) && (attributes.length > 0)) { for (Attribute attribute : attributes) { p.setAttribute(attribute.getName(), attribute.getValue()); @@ -148,7 +158,7 @@ public class ColumnHelper<T> { // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like // that. byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes); - return GenericObjectMapper.read(value); + return converter.decodeValue(value); } /** @@ -206,7 +216,7 @@ public class ColumnHelper<T> { if (cells != null) { for (Entry<Long, byte[]> cell : cells.entrySet()) { V value = - (V) GenericObjectMapper.read(cell.getValue()); + (V) converter.decodeValue(cell.getValue()); cellResults.put( TimestampGenerator.getTruncatedTimestamp(cell.getKey()), value); @@ -266,7 +276,7 @@ public class ColumnHelper<T> { // If this column has the prefix we want if (columnName != null) { - Object value = GenericObjectMapper.read(entry.getValue()); + Object value = converter.decodeValue(entry.getValue()); results.put(columnName, value); } } @@ -313,7 +323,7 @@ public class ColumnHelper<T> { // This is the prefix that we want byte[][] columnQualifierParts = Separator.VALUES.split(columnNameParts[1]); - Object value = GenericObjectMapper.read(entry.getValue()); + Object value = converter.decodeValue(entry.getValue()); // we return the columnQualifier in parts since we don't know // which part is of which data type results.put(columnQualifierParts, value); @@ -371,6 +381,11 @@ public class ColumnHelper<T> { Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier)); return columnQualifier; } + + public ValueConverter getValueConverter() { + return converter; + } + /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java new file mode 100644 index 0000000..c34bfcb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/GenericConverter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; + +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; + +/** + * Uses GenericObjectMapper to encode objects as bytes and decode bytes as + * objects. + */ +public final class GenericConverter implements ValueConverter { + private static final GenericConverter INSTANCE = new GenericConverter(); + + private GenericConverter() { + } + + public static GenericConverter getInstance() { + return INSTANCE; + } + + @Override + public byte[] encodeValue(Object value) throws IOException { + return GenericObjectMapper.write(value); + } + + @Override + public Object decodeValue(byte[] bytes) throws IOException { + return GenericObjectMapper.read(bytes); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java new file mode 100644 index 0000000..cdb8619 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Encodes a value by interpreting it as a Long and converting it to bytes and + * decodes a set of bytes as a Long. + */ +public final class LongConverter implements NumericValueConverter { + private static final LongConverter INSTANCE = new LongConverter(); + + private LongConverter() { + } + + public static LongConverter getInstance() { + return INSTANCE; + } + + @Override + public byte[] encodeValue(Object value) throws IOException { + if (!TimelineStorageUtils.isIntegralValue(value)) { + throw new IOException("Expected integral value"); + } + return Bytes.toBytes(((Number)value).longValue()); + } + + @Override + public Object decodeValue(byte[] bytes) throws IOException { + if (bytes == null) { + return null; + } + return Bytes.toLong(bytes); + } + + /** + * Compares two numbers as longs. If either number is null, it will be taken + * as 0. + * @param num1 + * @param num2 + * @return -1 if num1 is less than num2, 0 if num1 is equal to num2 and 1 if + * num1 is greater than num2. + */ + @Override + public int compare(Number num1, Number num2) { + return Long.compare((num1 == null) ? 0L : num1.longValue(), + (num2 == null) ? 0L : num2.longValue()); + } + + @Override + public Number add(Number num1, Number num2, Number...numbers) { + long sum = ((num1 == null) ? 0L : num1.longValue()) + + ((num2 == null) ? 0L : num2.longValue()); + for (Number num : numbers) { + sum = sum + ((num == null) ? 0L : num.longValue()); + } + return sum; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java new file mode 100644 index 0000000..70964cd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/NumericValueConverter.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.util.Comparator; + +/** + * Extends ValueConverter interface for numeric converters to support numerical + * operations such as comparison, addition, etc. + */ +public interface NumericValueConverter extends ValueConverter, + Comparator<Number> { + /** + * Adds two or more numbers. If either of the numbers are null, it is taken as + * 0. + * @param num1 + * @param num2 + * @param numbers + * @return result after adding up the numbers. + */ + Number add(Number num1, Number num2, Number...numbers); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index c1aaf19..e30f699 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -472,4 +472,15 @@ public class TimelineStorageUtils { } return true; } + + /** + * Checks if passed object is of integral type(Short/Integer/Long). + * @param obj + * @return true if object passed is of type Short or Integer or Long, false + * otherwise. + */ + public static boolean isIntegralValue(Object obj) { + return (obj instanceof Short) || (obj instanceof Integer) || + (obj instanceof Long); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java new file mode 100644 index 0000000..2388ba5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ValueConverter.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; + +/** + * Converter used to encode/decode value associated with a column prefix or a + * column. + */ +public interface ValueConverter { + + /** + * Encode an object as a byte array depending on the converter implementation. + * @param value + * @return a byte array + * @throws IOException + */ + byte[] encodeValue(Object value) throws IOException; + + /** + * Decode a byte array and convert it into an object depending on the + * converter implementation. + * @param bytes + * @return an object + * @throws IOException + */ + Object decodeValue(byte[] bytes) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java new file mode 100644 index 0000000..0df5b8a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains + * a set of utility classes used across backend storage reader and writer. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 8ae19b8..e12b6e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import java.io.IOException; -import java.util.Map; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 0d4e5a8..abede9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -26,8 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** @@ -63,7 +66,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { /** * Metrics are stored with the metric name as the column name. */ - METRIC(EntityColumnFamily.METRICS, null); + METRIC(EntityColumnFamily.METRICS, null, + LongConverter.getInstance()); private final ColumnHelper<EntityTable> column; private final ColumnFamily<EntityTable> columnFamily; @@ -83,7 +87,20 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { */ EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily, String columnPrefix) { - column = new ColumnHelper<EntityTable>(columnFamily); + this(columnFamily, columnPrefix, GenericConverter.getInstance()); + } + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + * @param converter used to encode/decode values to be stored in HBase for + * this column prefix. + */ + EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily, + String columnPrefix, ValueConverter converter) { + column = new ColumnHelper<EntityTable>(columnFamily, converter); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; if (columnPrefix == null) { @@ -128,7 +145,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); - } + } /* * (non-Javadoc) @@ -155,7 +172,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, attributes); - } + } /* * (non-Javadoc) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index 5079cc0..148a37f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -24,9 +24,12 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** * Identifies fully qualified columns for the {@link FlowRunTable}. @@ -38,14 +41,14 @@ public enum FlowRunColumn implements Column<FlowRunTable> { * application start times. */ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.MIN), + AggregationOperation.MIN, LongConverter.getInstance()), /** * When the flow ended. This is the maximum of currently known application end * times. */ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.MAX), + AggregationOperation.MAX, LongConverter.getInstance()), /** * The version of the flow that this flow belongs to. @@ -60,13 +63,20 @@ public enum FlowRunColumn implements Column<FlowRunTable> { private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily, String columnQualifier, AggregationOperation aggOp) { + this(columnFamily, columnQualifier, aggOp, + GenericConverter.getInstance()); + } + + private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily, + String columnQualifier, AggregationOperation aggOp, + ValueConverter converter) { this.columnFamily = columnFamily; this.columnQualifier = columnQualifier; this.aggOp = aggOp; // Future-proof by ensuring the right column prefix hygiene. this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE .encode(columnQualifier)); - this.column = new ColumnHelper<FlowRunTable>(columnFamily); + this.column = new ColumnHelper<FlowRunTable>(columnFamily, converter); } /** @@ -80,6 +90,10 @@ public enum FlowRunColumn implements Column<FlowRunTable> { return columnQualifierBytes.clone(); } + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + public AggregationOperation getAggregationOperation() { return aggOp; } @@ -130,6 +144,10 @@ public enum FlowRunColumn implements Column<FlowRunTable> { return null; } + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + /** * Retrieve an {@link FlowRunColumn} given a name, or null if there is no * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index b090bba..eb055fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** * Identifies partially qualified columns for the {@link FlowRunTable}. @@ -38,7 +40,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM); + METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM, + LongConverter.getInstance()); private final ColumnHelper<FlowRunTable> column; private final ColumnFamily<FlowRunTable> columnFamily; @@ -61,8 +64,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { * for this column. */ private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily, - String columnPrefix, AggregationOperation fra) { - column = new ColumnHelper<FlowRunTable>(columnFamily); + String columnPrefix, AggregationOperation fra, ValueConverter converter) { + column = new ColumnHelper<FlowRunTable>(columnFamily, converter); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; if (columnPrefix == null) { @@ -86,6 +89,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { return columnPrefixBytes.clone(); } + public byte[] getColumnPrefixBytes(String qualifier) { + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + } + + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + public AggregationOperation getAttribute() { return aggOp; } @@ -205,6 +216,10 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { return null; } + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + /** * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is * no match. The following holds true: http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index a537891..d541df0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -39,8 +39,10 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** * Invoked via the coprocessor when a Get or a Scan is issued for flow run @@ -114,6 +116,45 @@ class FlowScanner implements RegionScanner, Closeable { } /** + * Get value converter associated with a column or a column prefix. If nothing + * matches, generic converter is returned. + * @param colQualifierBytes + * @return value converter implementation. + */ + private static ValueConverter getValueConverter(byte[] colQualifierBytes) { + // Iterate over all the column prefixes for flow run table and get the + // appropriate converter for the column qualifier passed if prefix matches. + for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) { + byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes(""); + if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length, + colQualifierBytes, 0, colPrefixBytes.length) == 0) { + return colPrefix.getValueConverter(); + } + } + // Iterate over all the columns for flow run table and get the + // appropriate converter for the column qualifier passed if match occurs. + for (FlowRunColumn column : FlowRunColumn.values()) { + if (Bytes.compareTo( + column.getColumnQualifierBytes(), colQualifierBytes) == 0) { + return column.getValueConverter(); + } + } + // Return generic converter if nothing matches. + return GenericConverter.getInstance(); + } + + /** + * Checks if the converter is a numeric converter or not. For a converter to + * be numeric, it must implement {@link NumericValueConverter} interface. + * @param converter + * @return true, if converter is of type NumericValueConverter, false + * otherwise. + */ + private static boolean isNumericConverter(ValueConverter converter) { + return (converter instanceof NumericValueConverter); + } + + /** * This method loops through the cells in a given row of the * {@link FlowRunTable}. It looks at the tags of each cell to figure out how * to process the contents. It then calculates the sum or min or max for each @@ -141,20 +182,32 @@ class FlowScanner implements RegionScanner, Closeable { SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set<String> alreadySeenAggDim = new HashSet<>(); int addedCnt = 0; + ValueConverter converter = null; while (((cell = peekAtNextCell(limit)) != null) && (limit <= 0 || addedCnt < limit)) { byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { - addedCnt += emitCells(cells, currentColumnCells, currentAggOp); + if (converter != null && isNumericConverter(converter)) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + (NumericValueConverter)converter); + } resetState(currentColumnCells, alreadySeenAggDim); currentColumnQualifier = newColumnQualifier; currentAggOp = getCurrentAggOp(cell); + converter = getValueConverter(newColumnQualifier); } - collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim); + // No operation needs to be performed on non numeric converters. + if (!isNumericConverter(converter)) { + nextCell(limit); + continue; + } + collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, + (NumericValueConverter)converter); nextCell(limit); } if (!currentColumnCells.isEmpty()) { - emitCells(cells, currentColumnCells, currentAggOp); + emitCells(cells, currentColumnCells, currentAggOp, + (NumericValueConverter)converter); } return hasMore(); } @@ -183,7 +236,8 @@ class FlowScanner implements RegionScanner, Closeable { private void collectCells(SortedSet<Cell> currentColumnCells, AggregationOperation currentAggOp, Cell cell, - Set<String> alreadySeenAggDim) throws IOException { + Set<String> alreadySeenAggDim, NumericValueConverter converter) + throws IOException { if (currentAggOp == null) { // not a min/max/metric cell, so just return it as is currentColumnCells.add(cell); @@ -197,7 +251,8 @@ class FlowScanner implements RegionScanner, Closeable { currentColumnCells.add(cell); } else { Cell currentMinCell = currentColumnCells.first(); - Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp); + Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp, + converter); if (!currentMinCell.equals(newMinCell)) { currentColumnCells.remove(currentMinCell); currentColumnCells.add(newMinCell); @@ -209,7 +264,8 @@ class FlowScanner implements RegionScanner, Closeable { currentColumnCells.add(cell); } else { Cell currentMaxCell = currentColumnCells.first(); - Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp); + Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp, + converter); if (!currentMaxCell.equals(newMaxCell)) { currentColumnCells.remove(currentMaxCell); currentColumnCells.add(newMaxCell); @@ -245,7 +301,8 @@ class FlowScanner implements RegionScanner, Closeable { * parameter. */ private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells, - AggregationOperation currentAggOp) throws IOException { + AggregationOperation currentAggOp, NumericValueConverter converter) + throws IOException { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { return 0; } @@ -261,7 +318,7 @@ class FlowScanner implements RegionScanner, Closeable { return currentColumnCells.size(); case SUM: case SUM_FINAL: - Cell sumCell = processSummation(currentColumnCells); + Cell sumCell = processSummation(currentColumnCells, converter); cells.add(sumCell); return 1; default: @@ -276,24 +333,24 @@ class FlowScanner implements RegionScanner, Closeable { * sum of a metric for a flow run is the summation at the point of the last * metric update in that flow till that time. */ - private Cell processSummation(SortedSet<Cell> currentColumnCells) - throws IOException { + private Cell processSummation(SortedSet<Cell> currentColumnCells, + NumericValueConverter converter) throws IOException { Number sum = 0; Number currentValue = 0; long ts = 0L; - long mostCurrentTimestamp = 0l; + long mostCurrentTimestamp = 0L; Cell mostRecentCell = null; for (Cell cell : currentColumnCells) { - currentValue = (Number) GenericObjectMapper.read(CellUtil - .cloneValue(cell)); + currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell)); ts = cell.getTimestamp(); if (mostCurrentTimestamp < ts) { mostCurrentTimestamp = ts; mostRecentCell = cell; } - sum = sum.longValue() + currentValue.longValue(); + sum = converter.add(sum, currentValue); } - Cell sumCell = createNewCell(mostRecentCell, sum); + byte[] sumBytes = converter.encodeValue(sum); + Cell sumCell = createNewCell(mostRecentCell, sumBytes); return sumCell; } @@ -308,18 +365,20 @@ class FlowScanner implements RegionScanner, Closeable { * @throws IOException */ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, - AggregationOperation currentAggOp) throws IOException { + AggregationOperation currentAggOp, NumericValueConverter converter) + throws IOException { if (previouslyChosenCell == null) { return currentCell; } try { - long previouslyChosenCellValue = ((Number) GenericObjectMapper - .read(CellUtil.cloneValue(previouslyChosenCell))).longValue(); - long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil - .cloneValue(currentCell))).longValue(); + Number previouslyChosenCellValue = (Number)converter.decodeValue( + CellUtil.cloneValue(previouslyChosenCell)); + Number currentCellValue = (Number) converter.decodeValue(CellUtil + .cloneValue(currentCell)); switch (currentAggOp) { case MIN: - if (currentCellValue < previouslyChosenCellValue) { + if (converter.compare( + currentCellValue, previouslyChosenCellValue) < 0) { // new value is minimum, hence return this cell return currentCell; } else { @@ -327,7 +386,8 @@ class FlowScanner implements RegionScanner, Closeable { return previouslyChosenCell; } case MAX: - if (currentCellValue > previouslyChosenCellValue) { + if (converter.compare( + currentCellValue, previouslyChosenCellValue) > 0) { // new value is max, hence return this cell return currentCell; } else { @@ -343,8 +403,8 @@ class FlowScanner implements RegionScanner, Closeable { } } - private Cell createNewCell(Cell origCell, Number number) throws IOException { - byte[] newValue = GenericObjectMapper.write(number); + private Cell createNewCell(Cell origCell, byte[] newValue) + throws IOException { return CellUtil.createCell(CellUtil.cloneRow(origCell), CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 701615e..30ead40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -90,6 +91,15 @@ public class TestHBaseTimelineStorage { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } + private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) { + assertEquals(m1.size(), m2.size()); + for (Map.Entry<Long, Number> entry : m2.entrySet()) { + Number val = m1.get(entry.getKey()); + assertNotNull(val); + assertEquals(val.longValue(), entry.getValue().longValue()); + } + } + @Test public void testWriteApplicationToHBase() throws Exception { TimelineEntities te = new TimelineEntities(); @@ -243,7 +253,7 @@ public class TestHBaseTimelineStorage { ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); - assertEquals(metricValues, metricMap); + matchMetrics(metricValues, metricMap); // read the timeline entity using the reader this time TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, @@ -273,7 +283,7 @@ public class TestHBaseTimelineStorage { assertEquals(metrics, metrics2); for (TimelineMetric metric2 : metrics2) { Map<Long, Number> metricValues2 = metric2.getValues(); - assertEquals(metricValues, metricValues2); + matchMetrics(metricValues, metricValues2); } } finally { if (hbi != null) { @@ -451,7 +461,7 @@ public class TestHBaseTimelineStorage { EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); - assertEquals(metricValues, metricMap); + matchMetrics(metricValues, metricMap); } } assertEquals(1, rowCount); @@ -488,7 +498,7 @@ public class TestHBaseTimelineStorage { assertEquals(metrics, metrics2); for (TimelineMetric metric2 : metrics2) { Map<Long, Number> metricValues2 = metric2.getValues(); - assertEquals(metricValues, metricValues2); + matchMetrics(metricValues, metricValues2); } } finally { if (hbi != null) { @@ -743,6 +753,69 @@ public class TestHBaseTimelineStorage { } } + @Test + public void testNonIntegralMetricValues() throws IOException { + TimelineEntities teApp = new TimelineEntities(); + ApplicationEntity entityApp = new ApplicationEntity(); + String appId = "application_1000178881110_2002"; + entityApp.setId(appId); + entityApp.setCreatedTime(1425016501000L); + entityApp.setModifiedTime(1425026901000L); + // add metrics with floating point values + Set<TimelineMetric> metricsApp = new HashSet<>(); + TimelineMetric mApp = new TimelineMetric(); + mApp.setId("MAP_SLOT_MILLIS"); + Map<Long, Number> metricAppValues = new HashMap<Long, Number>(); + long ts = System.currentTimeMillis(); + metricAppValues.put(ts - 20, 10.5); + metricAppValues.put(ts - 10, 20.5); + mApp.setType(Type.TIME_SERIES); + mApp.setValues(metricAppValues); + metricsApp.add(mApp); + entityApp.addMetrics(metricsApp); + teApp.addEntity(entityApp); + + TimelineEntities teEntity = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + entity.setId("hello"); + entity.setType("world"); + entity.setCreatedTime(1425016501000L); + entity.setModifiedTime(1425026901000L); + // add metrics with floating point values + Set<TimelineMetric> metricsEntity = new HashSet<>(); + TimelineMetric mEntity = new TimelineMetric(); + mEntity.setId("MAP_SLOT_MILLIS"); + mEntity.addValue(ts - 20, 10.5); + metricsEntity.add(mEntity); + entity.addMetrics(metricsEntity); + teEntity.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.start(); + // Writing application entity. + try { + hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teApp); + Assert.fail("Expected an exception as metric values are non integral"); + } catch (IOException e) {} + + // Writing generic entity. + try { + hbi.write("c1", "u1", "f1", "v1", 1002345678919L, appId, teEntity); + Assert.fail("Expected an exception as metric values are non integral"); + } catch (IOException e) {} + hbi.stop(); + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a434b77a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index b0f83b7..4fb8f0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -164,10 +165,10 @@ public class TestHBaseStorageFlowRun { .getBytes()); assertEquals(2, r1.size()); - long starttime = (Long) GenericObjectMapper.read(values - .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + long starttime = Bytes.toLong(values.get( + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); assertEquals(minStartTs, starttime); - assertEquals(endTs, GenericObjectMapper.read(values + assertEquals(endTs, Bytes.toLong(values .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); // use the timeline reader to verify data @@ -253,10 +254,10 @@ public class TestHBaseStorageFlowRun { } switch (id) { case metric1: - assertEquals(141, value); + assertEquals(141L, value); break; case metric2: - assertEquals(57, value); + assertEquals(57L, value); break; default: fail("unrecognized metric: " + id); @@ -292,14 +293,14 @@ public class TestHBaseStorageFlowRun { byte[] q = ColumnHelper.getColumnQualifier( FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); assertTrue(values.containsKey(q)); - assertEquals(141, GenericObjectMapper.read(values.get(q))); + assertEquals(141L, Bytes.toLong(values.get(q))); // check metric2 assertEquals(2, values.size()); q = ColumnHelper.getColumnQualifier( FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); assertTrue(values.containsKey(q)); - assertEquals(57, GenericObjectMapper.read(values.get(q))); + assertEquals(57L, Bytes.toLong(values.get(q))); } assertEquals(1, rowCount); }