http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java new file mode 100644 index 0000000..32ef1c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Encodes and decodes event column names for application and entity tables. + * The event column name is of the form : eventId=timestamp=infokey. + * If info is not associated with the event, event column name is of the form : + * eventId=timestamp= + * Event timestamp is long and rest are strings. + * Column prefixes are not part of the eventcolumn name passed for encoding. It + * is added later, if required in the associated ColumnPrefix implementations. + */ +public final class EventColumnNameConverter + implements KeyConverter<EventColumnName> { + private static final EventColumnNameConverter INSTANCE = + new EventColumnNameConverter(); + + public static EventColumnNameConverter getInstance() { + return INSTANCE; + } + + private EventColumnNameConverter() { + } + + // eventId=timestamp=infokey are of types String, Long String + // Strings are variable in size (i.e. end whenever separator is encountered). + // This is used while decoding and helps in determining where to split. + private static final int[] SEGMENT_SIZES = { + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }; + + /* + * (non-Javadoc) + * + * Encodes EventColumnName into a byte array with each component/field in + * EventColumnName separated by Separator#VALUES. This leads to an event + * column name of the form eventId=timestamp=infokey. + * If timestamp in passed EventColumnName object is null (eventId is not null) + * this returns a column prefix of the form eventId= and if infokey in + * EventColumnName is null (other 2 components are not null), this returns a + * column name of the form eventId=timestamp= + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(EventColumnName key) { + byte[] first = Separator.encode(key.getId(), Separator.SPACE, Separator.TAB, + Separator.VALUES); + if (key.getTimestamp() == null) { + return Separator.VALUES.join(first, Separator.EMPTY_BYTES); + } + byte[] second = Bytes.toBytes( + TimelineStorageUtils.invertLong(key.getTimestamp())); + if (key.getInfoKey() == null) { + return Separator.VALUES.join(first, second, Separator.EMPTY_BYTES); + } + return Separator.VALUES.join(first, second, Separator.encode( + key.getInfoKey(), Separator.SPACE, Separator.TAB, Separator.VALUES)); + } + + /* + * (non-Javadoc) + * + * Decodes an event column name of the form eventId=timestamp= or + * eventId=timestamp=infoKey represented in byte format and converts it into + * an EventColumnName object. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public EventColumnName decode(byte[] bytes) { + byte[][] components = Separator.VALUES.split(bytes, SEGMENT_SIZES); + if (components.length != 3) { + throw new IllegalArgumentException("the column name is not valid"); + } + String id = Separator.decode(Bytes.toString(components[0]), + Separator.VALUES, Separator.TAB, Separator.SPACE); + Long ts = TimelineStorageUtils.invertLong(Bytes.toLong(components[1])); + String infoKey = components[2].length == 0 ? null : + Separator.decode(Bytes.toString(components[2]), + Separator.VALUES, Separator.TAB, Separator.SPACE); + return new EventColumnName(id, ts, infoKey); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java new file mode 100644 index 0000000..4229e81 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverter.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +/** + * Interface which has to be implemented for encoding and decoding row keys and + * columns. + */ +public interface KeyConverter<T> { + /** + * Encodes a key as a byte array. + * + * @param key key to be encoded. + * @return a byte array. + */ + byte[] encode(T key); + + /** + * Decodes a byte array and returns a key of type T. + * + * @param bytes byte representation + * @return an object(key) of type T which has been constructed after decoding + * the bytes. + */ + T decode(byte[] bytes); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java new file mode 100644 index 0000000..3954145 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; +/** + * Encodes and decodes column names / row keys which are long. + */ +public final class LongKeyConverter implements KeyConverter<Long> { + private static final LongKeyConverter INSTANCE = new LongKeyConverter(); + + public static LongKeyConverter getInstance() { + return INSTANCE; + } + + private LongKeyConverter() { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(Long key) { + try { + // IOException will not be thrown here as we are explicitly passing + // Long. + return LongConverter.getInstance().encodeValue(key); + } catch (IOException e) { + return null; + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public Long decode(byte[] bytes) { + try { + return (Long) LongConverter.getInstance().decodeValue(bytes); + } catch (IOException e) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java index a81c717..8a178db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java @@ -45,7 +45,13 @@ public enum Separator { * getting a + for a space, which may already occur in strings, so we don't * want that. */ - SPACE(" ", "%2$"); + SPACE(" ", "%2$"), + + /** + * separator in values, often used to avoid having these in qualifiers and + * names. + */ + TAB("\t", "%3$"); /** * The string value of this separator. @@ -67,7 +73,22 @@ public enum Separator { */ private final String quotedValue; - private static final byte[] EMPTY_BYTES = new byte[0]; + /** + * Indicator for variable size of an individual segment in a split. The + * segment ends wherever separator is encountered. + * Typically used for string. + * Also used to indicate that there is no fixed number of splits which need to + * be returned. If split limit is specified as this, all possible splits are + * returned. + */ + public static final int VARIABLE_SIZE = 0; + + + /** empty string. */ + public static final String EMPTY_STRING = ""; + + /** empty bytes. */ + public static final byte[] EMPTY_BYTES = new byte[0]; /** * @param value of the separator to use. Cannot be null or empty string. @@ -222,7 +243,6 @@ public enum Separator { System.arraycopy(this.bytes, 0, buf, offset, this.value.length()); offset += this.value.length(); } - } return buf; } @@ -307,7 +327,25 @@ public enum Separator { * @return source split by this separator. */ public byte[][] split(byte[] source, int limit) { - return TimelineStorageUtils.split(source, this.bytes, limit); + return split(source, this.bytes, limit); + } + + /** + * Splits the source array into multiple array segments using this separator. + * The sizes indicate the sizes of the relative components/segments. + * In case one of the segments contains this separator before the specified + * size is reached, the separator will be considered part of that segment and + * we will continue till size is reached. + * Variable length strings cannot contain this separator and are indiced with + * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this + * separator and decoded after the results from split is returned. + * + * @param source byte array to be split. + * @param sizes sizes of relative components/segments. + * @return source split by this separator as per the sizes specified.. + */ + public byte[][] split(byte[] source, int[] sizes) { + return split(source, this.bytes, sizes); } /** @@ -315,10 +353,158 @@ public enum Separator { * as many times as splits are found. This will naturally produce copied byte * arrays for each of the split segments. * - * @param source to be split + * @param source byte array to be split * @return source split by this separator. */ public byte[][] split(byte[] source) { - return TimelineStorageUtils.split(source, this.bytes); + return split(source, this.bytes); + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * The sizes indicate the sizes of the relative components/segments. + * In case one of the segments contains this separator before the specified + * size is reached, the separator will be considered part of that segment and + * we will continue till size is reached. + * Variable length strings cannot contain this separator and are indiced with + * a size of {@value #VARIABLE_SIZE}. Such strings are encoded for this + * separator and decoded after the results from split is returned. + * + * @param source the source data + * @param separator the separator pattern to look for + * @param sizes indicate the sizes of the relative components/segments. + * @return a list of ranges. + */ + private static List<Range> splitRanges(byte[] source, byte[] separator, + int[] sizes) { + List<Range> segments = new ArrayList<Range>(); + if (source == null || separator == null) { + return segments; + } + // VARIABLE_SIZE here indicates that there is no limit to number of segments + // to return. + int limit = VARIABLE_SIZE; + if (sizes != null && sizes.length > 0) { + limit = sizes.length; + } + int start = 0; + int currentSegment = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > VARIABLE_SIZE) { + if (segments.size() >= (limit - 1)) { + // everything else goes in one final segment + break; + } + if (sizes != null) { + int currentSegExpectedSize = sizes[currentSegment]; + if (currentSegExpectedSize > VARIABLE_SIZE) { + int currentSegSize = i - start; + if (currentSegSize < currentSegExpectedSize) { + // Segment not yet complete. More bytes to parse. + continue itersource; + } else if (currentSegSize > currentSegExpectedSize) { + // Segment is not as per size. + throw new IllegalArgumentException( + "Segments not separated as per expected sizes"); + } + } + } + } + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length - 1; + currentSegment++; + } + // add in remaining to a final range + if (start <= source.length) { + if (sizes != null) { + // Check if final segment is as per size specified. + if (sizes[currentSegment] > VARIABLE_SIZE && + source.length - start > sizes[currentSegment]) { + // Segment is not as per size. + throw new IllegalArgumentException( + "Segments not separated as per expected sizes"); + } + } + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * Splits based on segments calculated based on limit/sizes specified for the + * separator. + * + * @param source byte array to be split. + * @param segments specifies the range for each segment. + * @return a byte[][] split as per the segment ranges. + */ + private static byte[][] split(byte[] source, List<Range> segments) { + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + /** + * Splits the source array into multiple array segments using the given + * separator based on the sizes. This will naturally produce copied byte + * arrays for each of the split segments. + * + * @param source source array. + * @param separator separator represented as a byte array. + * @param sizes sizes of relative components/segments. + * @return byte[][] after splitting the source. + */ + private static byte[][] split(byte[] source, byte[] separator, int[] sizes) { + List<Range> segments = splitRanges(source, separator, sizes); + return split(source, segments); + } + + /** + * Splits the source array into multiple array segments using the given + * separator. This will naturally produce copied byte arrays for each of the + * split segments. + * + * @param source Source array. + * @param separator Separator represented as a byte array. + * @return byte[][] after splitting the source. + */ + private static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, (int[]) null); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. + * + * @param source Source array. + * @param separator Separator represented as a byte array. + * @param limit a non-positive value indicates no limit on number of segments. + * @return byte[][] after splitting the input source. + */ + private static byte[][] split(byte[] source, byte[] separator, int limit) { + int[] sizes = null; + if (limit > VARIABLE_SIZE) { + sizes = new int[limit]; + } + List<Range> segments = splitRanges(source, separator, sizes); + return split(source, segments); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java new file mode 100644 index 0000000..b0f6d55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +/** + * Encodes and decodes column names / row keys which are merely strings. + * Column prefixes are not part of the column name passed for encoding. It is + * added later, if required in the associated ColumnPrefix implementations. + */ +public final class StringKeyConverter implements KeyConverter<String> { + private static final StringKeyConverter INSTANCE = new StringKeyConverter(); + + public static StringKeyConverter getInstance() { + return INSTANCE; + } + + private StringKeyConverter() { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(String key) { + return Separator.encode(key, Separator.SPACE, Separator.TAB); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public String decode(byte[] bytes) { + return Separator.decode(bytes, Separator.TAB, Separator.SPACE); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 18f975a..d52a5d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; import java.io.IOException; -import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @@ -48,18 +46,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; -import org.apache.hadoop.yarn.util.ConverterUtils; /** * A bunch of utility functions used across TimelineReader and TimelineWriter. @@ -72,109 +69,10 @@ public final class TimelineStorageUtils { private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class); - /** empty bytes. */ - public static final byte[] EMPTY_BYTES = new byte[0]; - - /** indicator for no limits for splitting. */ - public static final int NO_LIMIT_SPLIT = -1; - /** milliseconds in one day. */ public static final long MILLIS_ONE_DAY = 86400000L; /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}. - * - * @param source Source array. - * @param separator Separator represented as a byte array. - * @return byte[][] after splitting the source - */ - public static byte[][] split(byte[] source, byte[] separator) { - return split(source, separator, NO_LIMIT_SPLIT); - } - - /** - * Splits the source array into multiple array segments using the given - * separator, up to a maximum of count items. This will naturally produce - * copied byte arrays for each of the split segments. To identify the split - * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}. - * - * @param source Source array. - * @param separator Separator represented as a byte array. - * @param limit a non-positive value indicates no limit on number of segments. - * @return byte[][] after splitting the input source. - */ - public static byte[][] split(byte[] source, byte[] separator, int limit) { - List<Range> segments = splitRanges(source, separator, limit); - - byte[][] splits = new byte[segments.size()][]; - for (int i = 0; i < segments.size(); i++) { - Range r = segments.get(i); - byte[] tmp = new byte[r.length()]; - if (tmp.length > 0) { - System.arraycopy(source, r.start(), tmp, 0, r.length()); - } - splits[i] = tmp; - } - return splits; - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - * - * @param source Source array. - * @param separator Separator represented as a byte array. - * @return a list of ranges. - */ - public static List<Range> splitRanges(byte[] source, byte[] separator) { - return splitRanges(source, separator, NO_LIMIT_SPLIT); - } - - /** - * Returns a list of ranges identifying [start, end) -- closed, open -- - * positions within the source byte array that would be split using the - * separator byte array. - * - * @param source the source data - * @param separator the separator pattern to look for - * @param limit the maximum number of splits to identify in the source - * @return a list of ranges. - */ - public static List<Range> splitRanges(byte[] source, byte[] separator, - int limit) { - List<Range> segments = new ArrayList<Range>(); - if ((source == null) || (separator == null)) { - return segments; - } - int start = 0; - itersource: for (int i = 0; i < source.length; i++) { - for (int j = 0; j < separator.length; j++) { - if (source[i + j] != separator[j]) { - continue itersource; - } - } - // all separator elements matched - if (limit > 0 && segments.size() >= (limit - 1)) { - // everything else goes in one final segment - break; - } - segments.add(new Range(start, i)); - start = i + separator.length; - // i will be incremented again in outer for loop - i += separator.length - 1; - } - // add in remaining to a final range - if (start <= source.length) { - segments.add(new Range(start, source.length)); - } - return segments; - } - - /** * Converts a timestamp into it's inverse timestamp to be used in (row) keys * where we want to have the most recent timestamp in the top of the table * (scans start at the most recent timestamp first). @@ -200,53 +98,6 @@ public final class TimelineStorageUtils { return Integer.MAX_VALUE - key; } - - /** - * Converts/encodes a string app Id into a byte representation for (row) keys. - * For conversion, we extract cluster timestamp and sequence id from the - * string app id (calls {@link ConverterUtils#toApplicationId(String)} for - * conversion) and then store it in a byte array of length 12 (8 bytes (long) - * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster - * timestamp and sequence id are inverted so that the most recent cluster - * timestamp and highest sequence id appears first in the table (i.e. - * application id appears in a descending order). - * - * @param appIdStr application id in string format i.e. - * application_{cluster timestamp}_{sequence id with min 4 digits} - * - * @return encoded byte representation of app id. - */ - public static byte[] encodeAppId(String appIdStr) { - ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); - byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT]; - byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp())); - System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG); - byte[] seqId = Bytes.toBytes(invertInt(appId.getId())); - System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT); - return appIdBytes; - } - - /** - * Converts/decodes a 12 byte representation of app id for (row) keys to an - * app id in string format which can be returned back to client. - * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster - * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls - * {@link ApplicationId#toString} to generate string representation of app id. - * - * @param appIdBytes application id in byte representation. - * - * @return decoded app id in string format. - */ - public static String decodeAppId(byte[] appIdBytes) { - if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) { - throw new IllegalArgumentException("Invalid app id in byte format"); - } - long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG)); - int seqId = - invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT)); - return ApplicationId.newInstance(clusterTs, seqId).toString(); - } - /** * returns the timestamp of that day's start (which is midnight 00:00:00 AM) * for a given input timestamp. @@ -810,7 +661,8 @@ public final class TimelineStorageUtils { TimelineEntity entity, Result result, ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException { // isRelatedTo and relatesTo are of type Map<String, Set<String>> - Map<String, Object> columns = prefix.readResults(result); + Map<String, Object> columns = + prefix.readResults(result, StringKeyConverter.getInstance()); for (Map.Entry<String, Object> column : columns.entrySet()) { for (String id : Separator.VALUES.splitEncoded( column.getValue().toString())) { @@ -837,7 +689,8 @@ public final class TimelineStorageUtils { TimelineEntity entity, Result result, ColumnPrefix<T> prefix, boolean isConfig) throws IOException { // info and configuration are of type Map<String, Object or String> - Map<String, Object> columns = prefix.readResults(result); + Map<String, Object> columns = + prefix.readResults(result, StringKeyConverter.getInstance()); if (isConfig) { for (Map.Entry<String, Object> column : columns.entrySet()) { entity.addConfig(column.getKey(), column.getValue().toString()); @@ -861,30 +714,24 @@ public final class TimelineStorageUtils { public static <T> void readEvents(TimelineEntity entity, Result result, ColumnPrefix<T> prefix) throws IOException { Map<String, TimelineEvent> eventsMap = new HashMap<>(); - Map<?, Object> eventsResult = - prefix.readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; + Map<EventColumnName, Object> eventsResult = + prefix.readResults(result, EventColumnNameConverter.getInstance()); + for (Map.Entry<EventColumnName, Object> + eventResult : eventsResult.entrySet()) { + EventColumnName eventColumnName = eventResult.getKey(); + String key = eventColumnName.getId() + + Long.toString(eventColumnName.getTimestamp()); + // Retrieve previously seen event to add to it + TimelineEvent event = eventsMap.get(key); + if (event == null) { + // First time we're seeing this event, add it to the eventsMap + event = new TimelineEvent(); + event.setId(eventColumnName.getId()); + event.setTimestamp(eventColumnName.getTimestamp()); + eventsMap.put(key, event); + } + if (eventColumnName.getInfoKey() != null) { + event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); } } Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index de2b29d..02a4bb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; @@ -78,7 +79,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { */ private final String columnPrefix; private final byte[] columnPrefixBytes; - private final boolean compoundColQual; /** * Private constructor, meant to be used by the enum definition. @@ -122,7 +122,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); } - this.compoundColQual = compondColQual; } /** @@ -154,14 +153,6 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { return column.getValueConverter(); } - public byte[] getCompoundColQualBytes(String qualifier, - byte[]...components) { - if (!compoundColQual) { - return ColumnHelper.getColumnQualifier(null, qualifier); - } - return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); - } - /* * (non-Javadoc) * @@ -233,26 +224,12 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map<String, Object> readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); - } - - /** - * @param result from which to read columns - * @return the latest values of columns in the column family. The column - * qualifier is returned as a list of parts, each part a byte[]. This - * is to facilitate returning byte arrays of values that were not - * Strings. If they can be treated as Strings, you should use - * {@link #readResults(Result)} instead. - * @throws IOException if there is any exception encountered while reading - * result. - */ - public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - return column.readResultsHavingCompoundColumnQualifiers(result, - columnPrefixBytes); + public <K> Map<K, Object> readResults(Result result, + KeyConverter<K> keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* @@ -260,11 +237,14 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public <V> NavigableMap<String, NavigableMap<Long, V>> - readResultsWithTimestamps(Result result) throws IOException { - return column.readResultsWithTimestamps(result, columnPrefixBytes); + public <K, V> NavigableMap<K, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter) + throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes, + keyConverter); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 04c633c..6d08390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - /** * Represents a rowkey for the entity table. */ @@ -28,13 +24,13 @@ public class EntityRowKey { private final String clusterId; private final String userId; private final String flowName; - private final long flowRunId; + private final Long flowRunId; private final String appId; private final String entityType; private final String entityId; public EntityRowKey(String clusterId, String userId, String flowName, - long flowRunId, String appId, String entityType, String entityId) { + Long flowRunId, String appId, String entityType, String entityId) { this.clusterId = clusterId; this.userId = userId; this.flowName = flowName; @@ -56,7 +52,7 @@ public class EntityRowKey { return flowName; } - public long getFlowRunId() { + public Long getFlowRunId() { return flowRunId; } @@ -85,14 +81,8 @@ public class EntityRowKey { */ public static byte[] getRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - byte[] first = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, - flowName)); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); - byte[] third = TimelineStorageUtils.encodeAppId(appId); - return Separator.QUALIFIERS.join(first, second, third, new byte[0]); + return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( + clusterId, userId, flowName, flowRunId, appId, null, null)); } /** @@ -111,16 +101,8 @@ public class EntityRowKey { */ public static byte[] getRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType) { - byte[] first = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, - flowName)); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); - byte[] third = TimelineStorageUtils.encodeAppId(appId); - byte[] fourth = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, "")); - return Separator.QUALIFIERS.join(first, second, third, fourth); + return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( + clusterId, userId, flowName, flowRunId, appId, entityType, null)); } /** @@ -140,16 +122,8 @@ public class EntityRowKey { public static byte[] getRowKey(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { - byte[] first = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, - flowName)); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); - byte[] third = TimelineStorageUtils.encodeAppId(appId); - byte[] fourth = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId)); - return Separator.QUALIFIERS.join(first, second, third, fourth); + return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( + clusterId, userId, flowName, flowRunId, appId, entityType, entityId)); } /** @@ -159,27 +133,6 @@ public class EntityRowKey { * @return An <cite>EntityRowKey</cite> object. */ public static EntityRowKey parseRowKey(byte[] rowKey) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); - - if (rowKeyComponents.length < 7) { - throw new IllegalArgumentException("the row key is not valid for " + - "an entity"); - } - - String userId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); - String clusterId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); - String flowName = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); - long flowRunId = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]); - String entityType = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5])); - String entityId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6])); - return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); + return EntityRowKeyConverter.getInstance().decode(rowKey); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java new file mode 100644 index 0000000..43c0569 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +/** + * Encodes and decodes row key for entity table. + * The row key is of the form : + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId. + * flowRunId is a long, appId is encoded/decoded using + * {@link AppIdKeyConverter} and rest are strings. + */ +public final class EntityRowKeyConverter implements KeyConverter<EntityRowKey> { + private static final EntityRowKeyConverter INSTANCE = + new EntityRowKeyConverter(); + + public static EntityRowKeyConverter getInstance() { + return INSTANCE; + } + + private EntityRowKeyConverter() { + } + + // Entity row key is of the form + // userName!clusterId!flowName!flowRunId!appId!entityType!entityId with each + // segment separated by !. The sizes below indicate sizes of each one of these + // segements in sequence. clusterId, userName, flowName, entityType and + // entityId are strings. flowrunId is a long hence 8 bytes in size. app id is + // represented as 12 bytes with cluster timestamp part of appid being 8 bytes + // (long) and seq id being 4 bytes(int). + // Strings are variable in size (i.e. end whenever separator is encountered). + // This is used while decoding and helps in determining where to split. + private static final int[] SEGMENT_SIZES = { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }; + + /* + * (non-Javadoc) + * + * Encodes EntityRowKey object into a byte array with each component/field in + * EntityRowKey separated by Separator#QUALIFIERS. This leads to an entity + * table row key of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId + * If entityType in passed EntityRowKey object is null (and the fields + * preceding it i.e. clusterId, userId and flowName, flowRunId and appId are + * not null), this returns a row key prefix of the form + * userName!clusterId!flowName!flowRunId!appId! and if entityId in + * EntityRowKey is null (other 6 components are not null), this returns a row + * key prefix of the form + * userName!clusterId!flowName!flowRunId!appId!entityType! + * flowRunId is inverted while encoding as it helps maintain a descending + * order for row keys in entity table. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(EntityRowKey rowKey) { + byte[] user = Separator.encode(rowKey.getUserId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] cluster = Separator.encode(rowKey.getClusterId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] flow = Separator.encode(rowKey.getFlowName(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] first = Separator.QUALIFIERS.join(user, cluster, flow); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong( + rowKey.getFlowRunId())); + byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + if (rowKey.getEntityType() == null) { + return Separator.QUALIFIERS.join( + first, second, third, Separator.EMPTY_BYTES); + } + byte[] entityType = Separator.encode(rowKey.getEntityType(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] entityId = rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : + Separator.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS); + byte[] fourth = Separator.QUALIFIERS.join(entityType, entityId); + return Separator.QUALIFIERS.join(first, second, third, fourth); + } + + /* + * (non-Javadoc) + * + * Decodes an application row key of the form + * userName!clusterId!flowName!flowRunId!appId!entityType!entityId represented + * in byte format and converts it into an EntityRowKey object. flowRunId is + * inverted while decoding as it was inverted while encoding. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public EntityRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 7) { + throw new IllegalArgumentException("the row key is not valid for " + + "an entity"); + } + String userId = Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[1]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long flowRunId = + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); + String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); + String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + return new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + entityType, entityId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 188c2fe..71c3d90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; @@ -51,7 +52,6 @@ public enum FlowActivityColumnPrefix */ private final String columnPrefix; private final byte[] columnPrefixBytes; - private final boolean compoundColQual; private final AggregationOperation aggOp; @@ -83,7 +83,6 @@ public enum FlowActivityColumnPrefix .encode(columnPrefix)); } this.aggOp = aggOp; - this.compoundColQual = compoundColQual; } /** @@ -169,10 +168,12 @@ public enum FlowActivityColumnPrefix * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map<String, Object> readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); + public <K> Map<K, Object> readResults(Result result, + KeyConverter<K> keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* @@ -180,11 +181,14 @@ public enum FlowActivityColumnPrefix * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public <T> NavigableMap<String, NavigableMap<Long, T>> - readResultsWithTimestamps(Result result) throws IOException { - return column.readResultsWithTimestamps(result, columnPrefixBytes); + public <K, V> NavigableMap<K, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter) + throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes, + keyConverter); } /** @@ -270,20 +274,4 @@ public enum FlowActivityColumnPrefix column.store(rowKey, tableMutator, columnQualifier, null, inputValue, combinedAttributes); } - - @Override - public byte[] getCompoundColQualBytes(String qualifier, - byte[]...components) { - if (!compoundColQual) { - return ColumnHelper.getColumnQualifier(null, qualifier); - } - return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); - } - - @Override - public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - // There are no compound column qualifiers for flow activity table. - return null; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index 2726ae2..eea38a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** @@ -27,11 +25,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStor public class FlowActivityRowKey { private final String clusterId; - private final long dayTs; + private final Long dayTs; private final String userId; private final String flowName; - public FlowActivityRowKey(String clusterId, long dayTs, String userId, + public FlowActivityRowKey(String clusterId, Long dayTs, String userId, String flowName) { this.clusterId = clusterId; this.dayTs = dayTs; @@ -43,7 +41,7 @@ public class FlowActivityRowKey { return clusterId; } - public long getDayTimestamp() { + public Long getDayTimestamp() { return dayTs; } @@ -63,7 +61,8 @@ public class FlowActivityRowKey { * @return byte array with the row key prefix */ public static byte[] getRowKeyPrefix(String clusterId) { - return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, "")); + return FlowActivityRowKeyConverter.getInstance().encode( + new FlowActivityRowKey(clusterId, null, null, null)); } /** @@ -75,9 +74,8 @@ public class FlowActivityRowKey { * @return byte array with the row key prefix */ public static byte[] getRowKeyPrefix(String clusterId, long dayTs) { - return Separator.QUALIFIERS.join( - Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), - Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)), new byte[0]); + return FlowActivityRowKeyConverter.getInstance().encode( + new FlowActivityRowKey(clusterId, dayTs, null, null)); } /** @@ -94,12 +92,8 @@ public class FlowActivityRowKey { String flowName) { // convert it to Day's time stamp eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs); - - return Separator.QUALIFIERS.join( - Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), - Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)), - Bytes.toBytes(Separator.QUALIFIERS.encode(userId)), - Bytes.toBytes(Separator.QUALIFIERS.encode(flowName))); + return FlowActivityRowKeyConverter.getInstance().encode( + new FlowActivityRowKey(clusterId, eventTs, userId, flowName)); } /** @@ -109,21 +103,6 @@ public class FlowActivityRowKey { * @return A <cite>FlowActivityRowKey</cite> object. */ public static FlowActivityRowKey parseRowKey(byte[] rowKey) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); - - if (rowKeyComponents.length < 4) { - throw new IllegalArgumentException("the row key is not valid for " - + "a flow activity"); - } - - String clusterId = Separator.QUALIFIERS.decode(Bytes - .toString(rowKeyComponents[0])); - long dayTs = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1])); - String userId = Separator.QUALIFIERS.decode(Bytes - .toString(rowKeyComponents[2])); - String flowName = Separator.QUALIFIERS.decode(Bytes - .toString(rowKeyComponents[3])); - return new FlowActivityRowKey(clusterId, dayTs, userId, flowName); + return FlowActivityRowKeyConverter.getInstance().decode(rowKey); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java new file mode 100644 index 0000000..9dc4c98 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +/** + * Encodes and decodes row key for flow activity table. + * The row key is of the form : clusterId!dayTimestamp!user!flowName. + * dayTimestamp(top of the day timestamp) is a long and rest are strings. + */ +public final class FlowActivityRowKeyConverter implements + KeyConverter<FlowActivityRowKey> { + private static final FlowActivityRowKeyConverter INSTANCE = + new FlowActivityRowKeyConverter(); + + public static FlowActivityRowKeyConverter getInstance() { + return INSTANCE; + } + + private FlowActivityRowKeyConverter() { + } + + // Flow activity row key is of the form clusterId!dayTimestamp!user!flowName + // with each segment separated by !. The sizes below indicate sizes of each + // one of these segements in sequence. clusterId, user and flowName are + // strings. Top of the day timestamp is a long hence 8 bytes in size. + // Strings are variable in size (i.e. end whenever separator is encountered). + // This is used while decoding and helps in determining where to split. + private static final int[] SEGMENT_SIZES = { + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE }; + + /* + * (non-Javadoc) + * + * Encodes FlowActivityRowKey object into a byte array with each + * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS. + * This leads to an flow activity table row key of the form + * clusterId!dayTimestamp!user!flowName + * If dayTimestamp in passed FlowActivityRowKey object is null and clusterId + * is not null, this returns a row key prefix as clusterId! and if userId in + * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId and + * dayTimestamp are not null), this returns a row key prefix as + * clusterId!dayTimeStamp! + * dayTimestamp is inverted while encoding as it helps maintain a descending + * order for row keys in flow activity table. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + + @Override + public byte[] encode(FlowActivityRowKey rowKey) { + if (rowKey.getDayTimestamp() == null) { + return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), + Separator.EMPTY_BYTES); + } + if (rowKey.getUserId() == null) { + return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), + Bytes.toBytes(TimelineStorageUtils.invertLong( + rowKey.getDayTimestamp())), Separator.EMPTY_BYTES); + } + return Separator.QUALIFIERS.join( + Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), + Bytes.toBytes( + TimelineStorageUtils.invertLong(rowKey.getDayTimestamp())), + Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), + Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS)); + } + + @Override + public FlowActivityRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow activity"); + } + String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long dayTs = + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1])); + String userId = Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = Separator.decode(Bytes.toString(rowKeyComponents[3]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + return new FlowActivityRowKey(clusterId, dayTs, userId, flowName); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 77f2ab2..0f14c89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -26,10 +26,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** @@ -40,8 +41,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", null, - LongConverter.getInstance()); + METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); private final ColumnHelper<FlowRunTable> column; private final ColumnFamily<FlowRunTable> columnFamily; @@ -52,17 +52,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { */ private final String columnPrefix; private final byte[] columnPrefixBytes; - private final boolean compoundColQual; private final AggregationOperation aggOp; /** * Private constructor, meant to be used by the enum definition. * - * @param columnFamily - * that this column is stored in. - * @param columnPrefix - * for this column. + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. */ private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily, String columnPrefix, AggregationOperation fra, ValueConverter converter) { @@ -79,11 +76,10 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { this.columnPrefixBytes = null; } else { // Future-proof by ensuring the right column prefix hygiene. - this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE - .encode(columnPrefix)); + this.columnPrefixBytes = + Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); } this.aggOp = fra; - this.compoundColQual = compoundColQual; } /** @@ -99,14 +95,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { @Override public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, + qualifierPrefix); } @Override public byte[] getColumnPrefixBytes(String qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); + return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, + qualifierPrefix); } @Override @@ -139,8 +135,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( - attributes, this.aggOp); + Attribute[] combinedAttributes = + TimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } @@ -166,8 +162,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( - attributes, this.aggOp); + Attribute[] combinedAttributes = + TimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } @@ -180,8 +176,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) */ public Object readResult(Result result, String qualifier) throws IOException { - byte[] columnQualifier = ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifier); + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); return column.readResult(result, columnQualifier); } @@ -190,10 +186,12 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map<String, Object> readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); + public <K> Map<K, Object> readResults(Result result, + KeyConverter<K> keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* @@ -201,11 +199,14 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public <T> NavigableMap<String, NavigableMap<Long, T>> - readResultsWithTimestamps(Result result) throws IOException { - return column.readResultsWithTimestamps(result, columnPrefixBytes); + public <K, V> NavigableMap<K, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter) + throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes, + keyConverter); } /** @@ -213,8 +214,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if * and only if {@code x.equals(y)} or {@code (x == y == null)} * - * @param columnPrefix - * Name of the column to retrieve + * @param columnPrefix Name of the column to retrieve * @return the corresponding {@link FlowRunColumnPrefix} or null */ public static final FlowRunColumnPrefix columnFor(String columnPrefix) { @@ -242,10 +242,8 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { * {@code columnFor(a,x) == columnFor(b,y)} if and only if * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} * - * @param columnFamily - * The columnFamily for which to retrieve the column. - * @param columnPrefix - * Name of the column to retrieve + * @param columnFamily The columnFamily for which to retrieve the column. + * @param columnPrefix Name of the column to retrieve * @return the corresponding {@link FlowRunColumnPrefix} or null if both * arguments don't match. */ @@ -267,20 +265,4 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> { // Default to null return null; } - - @Override - public byte[] getCompoundColQualBytes(String qualifier, - byte[]...components) { - if (!compoundColQual) { - return ColumnHelper.getColumnQualifier(null, qualifier); - } - return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); - } - - @Override - public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - // There are no compound column qualifiers for flow run table. - return null; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index eac8f05..925242b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - /** * Represents a rowkey for the flow run table. */ @@ -28,10 +24,10 @@ public class FlowRunRowKey { private final String clusterId; private final String userId; private final String flowName; - private final long flowRunId; + private final Long flowRunId; public FlowRunRowKey(String clusterId, String userId, String flowName, - long flowRunId) { + Long flowRunId) { this.clusterId = clusterId; this.userId = userId; this.flowName = flowName; @@ -50,7 +46,7 @@ public class FlowRunRowKey { return flowName; } - public long getFlowRunId() { + public Long getFlowRunId() { return flowRunId; } @@ -65,13 +61,13 @@ public class FlowRunRowKey { */ public static byte[] getRowKeyPrefix(String clusterId, String userId, String flowName) { - return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId, - flowName, "")); + return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( + clusterId, userId, flowName, null)); } /** * Constructs a row key for the entity table as follows: { - * clusterId!userI!flowName!Inverted Flow Run Id}. + * clusterId!userId!flowName!Inverted Flow Run Id}. * * @param clusterId Cluster Id. * @param userId User Id. @@ -81,12 +77,8 @@ public class FlowRunRowKey { */ public static byte[] getRowKey(String clusterId, String userId, String flowName, Long flowRunId) { - byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, - userId, flowName)); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); - return Separator.QUALIFIERS.join(first, second); + return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( + clusterId, userId, flowName, flowRunId)); } /** @@ -96,22 +88,7 @@ public class FlowRunRowKey { * @return A <cite>FlowRunRowKey</cite> object. */ public static FlowRunRowKey parseRowKey(byte[] rowKey) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); - - if (rowKeyComponents.length < 4) { - throw new IllegalArgumentException("the row key is not valid for " + - "a flow run"); - } - - String clusterId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); - String userId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); - String flowName = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); - long flowRunId = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - return new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + return FlowRunRowKeyConverter.getInstance().decode(rowKey); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org