YARN-5109. timestamps are stored unencoded causing parse errors (Varun Saxena via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e2229377 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e2229377 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e2229377 Branch: refs/heads/YARN-2928 Commit: e2229377b0a4bcc54cff1dd4adf4e5b5c0a27bc1 Parents: 10b26bb Author: Sangjin Lee <sj...@apache.org> Authored: Thu May 26 21:39:16 2016 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Thu May 26 21:39:16 2016 -0700 ---------------------------------------------------------------------- .../storage/TestHBaseTimelineStorage.java | 145 ++++++--- .../flow/TestHBaseStorageFlowActivity.java | 8 +- .../reader/filter/TimelineFilterUtils.java | 20 +- .../storage/HBaseTimelineWriterImpl.java | 67 +++-- .../application/ApplicationColumnPrefix.java | 65 ++-- .../storage/application/ApplicationRowKey.java | 50 +--- .../application/ApplicationRowKeyConverter.java | 130 ++++++++ .../storage/apptoflow/AppToFlowRowKey.java | 20 +- .../apptoflow/AppToFlowRowKeyConverter.java | 96 ++++++ .../storage/common/AppIdKeyConverter.java | 101 +++++++ .../storage/common/ColumnHelper.java | 175 +++++------ .../storage/common/ColumnPrefix.java | 43 +-- .../storage/common/EventColumnName.java | 48 +++ .../common/EventColumnNameConverter.java | 105 +++++++ .../storage/common/KeyConverter.java | 41 +++ .../storage/common/LongKeyConverter.java | 68 +++++ .../storage/common/Separator.java | 198 ++++++++++++- .../storage/common/StringKeyConverter.java | 59 ++++ .../storage/common/TimelineStorageUtils.java | 199 ++----------- .../storage/entity/EntityColumnPrefix.java | 48 +-- .../storage/entity/EntityRowKey.java | 67 +---- .../storage/entity/EntityRowKeyConverter.java | 143 +++++++++ .../storage/flow/FlowActivityColumnPrefix.java | 38 +-- .../storage/flow/FlowActivityRowKey.java | 41 +-- .../flow/FlowActivityRowKeyConverter.java | 115 ++++++++ .../storage/flow/FlowRunColumnPrefix.java | 82 ++---- .../storage/flow/FlowRunRowKey.java | 41 +-- .../storage/flow/FlowRunRowKeyConverter.java | 120 ++++++++ .../storage/flow/FlowScanner.java | 9 +- .../reader/FlowActivityEntityReader.java | 12 +- .../storage/reader/TimelineEntityReader.java | 4 +- .../storage/common/TestKeyConverters.java | 293 +++++++++++++++++++ .../storage/common/TestSeparator.java | 82 +++++- .../common/TestTimelineStorageUtils.java | 56 ---- 34 files changed, 1988 insertions(+), 801 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 68135a0..bcf2d2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -50,25 +50,28 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -482,7 +485,6 @@ public class TestHBaseTimelineStorage { } } - @Test public void testWriteNullApplicationToHBase() throws Exception { TimelineEntities te = new TimelineEntities(); @@ -494,7 +496,7 @@ public class TestHBaseTimelineStorage { // add the info map in Timeline Entity Map<String, Object> infoMap = new HashMap<String, Object>(); - infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("in fo M apK ey1", "infoMapValue1"); infoMap.put("infoMapKey2", 10); entity.addInfo(infoMap); @@ -517,6 +519,7 @@ public class TestHBaseTimelineStorage { // retrieve the row Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(cluster)); + scan.setStopRow(Bytes.toBytes(cluster + "1")); Connection conn = ConnectionFactory.createConnection(c1); ResultScanner resultScanner = new ApplicationTable() .getResultScanner(c1, conn, scan); @@ -626,7 +629,7 @@ public class TestHBaseTimelineStorage { hbi.start(); String cluster = "cluster_test_write_app"; String user = "user1"; - String flow = "some_flow_name"; + String flow = "s!ome_f\tlow _n am!e"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; hbi.write(cluster, user, flow, flowVersion, runid, appId, te); @@ -670,7 +673,8 @@ public class TestHBaseTimelineStorage { assertEquals(cTime, cTime1); Map<String, Object> infoColumns = - ApplicationColumnPrefix.INFO.readResults(result); + ApplicationColumnPrefix.INFO.readResults(result, + StringKeyConverter.getInstance()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map<String, Set<String>> @@ -706,11 +710,13 @@ public class TestHBaseTimelineStorage { // Configuration Map<String, Object> configColumns = - ApplicationColumnPrefix.CONFIG.readResults(result); + ApplicationColumnPrefix.CONFIG.readResults(result, + StringKeyConverter.getInstance()); assertEquals(conf, configColumns); NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); + ApplicationColumnPrefix.METRIC.readResultsWithTimestamps( + result, StringKeyConverter.getInstance()); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); matchMetrics(metricValues, metricMap); @@ -868,7 +874,8 @@ public class TestHBaseTimelineStorage { assertEquals(cTime1, cTime); Map<String, Object> infoColumns = - EntityColumnPrefix.INFO.readResults(result); + EntityColumnPrefix.INFO.readResults(result, + StringKeyConverter.getInstance()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map<String, Set<String>> @@ -906,11 +913,12 @@ public class TestHBaseTimelineStorage { // Configuration Map<String, Object> configColumns = - EntityColumnPrefix.CONFIG.readResults(result); + EntityColumnPrefix.CONFIG.readResults(result, StringKeyConverter.getInstance()); assertEquals(conf, configColumns); NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - EntityColumnPrefix.METRIC.readResultsWithTimestamps(result); + EntityColumnPrefix.METRIC.readResultsWithTimestamps( + result, StringKeyConverter.getInstance()); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); matchMetrics(metricValues, metricMap); @@ -963,7 +971,7 @@ public class TestHBaseTimelineStorage { } private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, - String flow, long runid, String appName, TimelineEntity te) { + String flow, Long runid, String appName, TimelineEntity te) { EntityRowKey key = EntityRowKey.parseRowKey(rowKey); @@ -978,7 +986,7 @@ public class TestHBaseTimelineStorage { } private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, - String user, String flow, long runid, String appName) { + String user, String flow, Long runid, String appName) { ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey); @@ -995,7 +1003,7 @@ public class TestHBaseTimelineStorage { TimelineEvent event = new TimelineEvent(); String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE; event.setId(eventId); - long expTs = 1436512802000L; + Long expTs = 1436512802000L; event.setTimestamp(expTs); String expKey = "foo_event"; Object expVal = "test"; @@ -1038,20 +1046,18 @@ public class TestHBaseTimelineStorage { assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, appName)); - Map<?, Object> eventsResult = - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); + Map<EventColumnName, Object> eventsResult = + ApplicationColumnPrefix.EVENT.readResults(result, + EventColumnNameConverter.getInstance()); // there should be only one event assertEquals(1, eventsResult.size()); - for (Map.Entry<?, Object> e : eventsResult.entrySet()) { + for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) { + EventColumnName eventColumnName = e.getKey(); // the qualifier is a compound key // hence match individual values - byte[][] karr = (byte[][])e.getKey(); - assertEquals(3, karr.length); - assertEquals(eventId, Bytes.toString(karr[0])); - assertEquals( - TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1])); - assertEquals(expKey, Bytes.toString(karr[2])); + assertEquals(eventId, eventColumnName.getId()); + assertEquals(expTs, eventColumnName.getTimestamp()); + assertEquals(expKey, eventColumnName.getInfoKey()); Object value = e.getValue(); // there should be only one timestamp and value assertEquals(expVal, value.toString()); @@ -1076,7 +1082,7 @@ public class TestHBaseTimelineStorage { assertEquals(1, events.size()); for (TimelineEvent e : events) { assertEquals(eventId, e.getId()); - assertEquals(expTs, e.getTimestamp()); + assertEquals(expTs, Long.valueOf(e.getTimestamp())); Map<String,Object> info = e.getInfo(); assertEquals(1, info.size()); for (Map.Entry<String, Object> infoEntry : info.entrySet()) { @@ -1095,9 +1101,9 @@ public class TestHBaseTimelineStorage { @Test public void testEventsWithEmptyInfo() throws IOException { TimelineEvent event = new TimelineEvent(); - String eventId = "foo_event_id"; + String eventId = "foo_ev e nt_id"; event.setId(eventId); - long expTs = 1436512802000L; + Long expTs = 1436512802000L; event.setTimestamp(expTs); final TimelineEntity entity = new TimelineEntity(); @@ -1142,21 +1148,19 @@ public class TestHBaseTimelineStorage { assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, entity)); - Map<?, Object> eventsResult = - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); + Map<EventColumnName, Object> eventsResult = + EntityColumnPrefix.EVENT.readResults(result, + EventColumnNameConverter.getInstance()); // there should be only one event assertEquals(1, eventsResult.size()); - for (Map.Entry<?, Object> e : eventsResult.entrySet()) { + for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) { + EventColumnName eventColumnName = e.getKey(); // the qualifier is a compound key // hence match individual values - byte[][] karr = (byte[][])e.getKey(); - assertEquals(3, karr.length); - assertEquals(eventId, Bytes.toString(karr[0])); - assertEquals(TimelineStorageUtils.invertLong(expTs), - Bytes.toLong(karr[1])); + assertEquals(eventId, eventColumnName.getId()); + assertEquals(expTs,eventColumnName.getTimestamp()); // key must be empty - assertEquals(0, karr[2].length); + assertNull(eventColumnName.getInfoKey()); Object value = e.getValue(); // value should be empty assertEquals("", value.toString()); @@ -1184,7 +1188,7 @@ public class TestHBaseTimelineStorage { assertEquals(1, events.size()); for (TimelineEvent e : events) { assertEquals(eventId, e.getId()); - assertEquals(expTs, e.getTimestamp()); + assertEquals(expTs, Long.valueOf(e.getTimestamp())); Map<String,Object> info = e.getInfo(); assertTrue(info == null || info.isEmpty()); } @@ -1195,6 +1199,67 @@ public class TestHBaseTimelineStorage { } @Test + public void testEventsEscapeTs() throws IOException { + TimelineEvent event = new TimelineEvent(); + String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE; + event.setId(eventId); + long expTs = 1463567041056L; + event.setTimestamp(expTs); + String expKey = "f==o o_e ve\tnt"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + + final TimelineEntity entity = new ApplicationEntity(); + entity.setId(ApplicationId.newInstance(0, 1).toString()); + entity.addEvent(event); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.start(); + String cluster = "clus!ter_\ttest_ev ents"; + String user = "user2"; + String flow = "other_flow_name"; + String flowVersion = "1111F01C2287BA"; + long runid = 1009876543218L; + String appName = "application_123465899910_2001"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.stop(); + + // read the timeline entity using the reader this time + TimelineEntity e1 = reader.getEntity( + new TimelineReaderContext(cluster, user, flow, runid, appName, + entity.getType(), entity.getId()), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL))); + assertNotNull(e1); + // check the events + NavigableSet<TimelineEvent> events = e1.getEvents(); + // there should be only one event + assertEquals(1, events.size()); + for (TimelineEvent e : events) { + assertEquals(eventId, e.getId()); + assertEquals(expTs, e.getTimestamp()); + Map<String,Object> info = e.getInfo(); + assertEquals(1, info.size()); + for (Map.Entry<String, Object> infoEntry : info.entrySet()) { + assertEquals(expKey, infoEntry.getKey()); + assertEquals(expVal, infoEntry.getValue()); + } + } + } finally { + if (hbi != null) { + hbi.stop(); + hbi.close(); + } + } + } + + @Test public void testNonIntegralMetricValues() throws IOException { TimelineEntities teApp = new TimelineEntities(); ApplicationEntity entityApp = new ApplicationEntity(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 6b23b6c..072332d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -170,7 +170,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); + Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -194,7 +194,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivity.getCluster()); assertEquals(user, flowActivity.getUser()); assertEquals(flow, flowActivity.getFlowName()); - assertEquals(dayTs, flowActivity.getDate().getTime()); + assertEquals(dayTs, Long.valueOf(flowActivity.getDate().getTime())); Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns(); assertEquals(1, flowRuns.size()); } @@ -294,7 +294,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -429,7 +429,7 @@ public class TestHBaseStorageFlowActivity { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); Map<byte[], byte[]> values = result http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java index 8cae410..036746b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -31,10 +31,14 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; - +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -205,6 +209,17 @@ public final class TimelineFilterUtils { return singleColValFilter; } + private static <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix, + String column) { + if (colPrefix == ApplicationColumnPrefix.EVENT || + colPrefix == EntityColumnPrefix.EVENT) { + return EventColumnNameConverter.getInstance().encode( + new EventColumnName(column, null, null)); + } else { + return StringKeyConverter.getInstance().encode(column); + } + } + /** * Create a filter list of qualifier filters based on passed set of columns. * @@ -219,8 +234,7 @@ public final class TimelineFilterUtils { for (String column : columns) { // For columns which have compound column qualifiers (eg. events), we need // to include the required separator. - byte[] compoundColQual = - colPrefix.getCompoundColQualBytes(column, (byte[])null); + byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryPrefixComparator( colPrefix.getColumnPrefixBytes(compoundColQual)))); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index fe4671f..f8b5a65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -46,7 +44,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; @@ -194,7 +196,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements long activityTimeStamp) throws IOException { byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp, userId, flowName); - byte[] qualifier = GenericObjectMapper.write(flowRunId); + byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId); FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, null, flowVersion, AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); @@ -278,7 +280,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, Attribute... attributes) throws IOException { for (TimelineMetric metric : metrics) { - String metricColumnQualifier = metric.getId(); + byte[] metricColumnQualifier = + StringKeyConverter.getInstance().encode(metric.getId()); Map<Long, Number> timeseries = metric.getValues(); for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); @@ -316,8 +319,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // id3?id4?id5 String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, connectedEntity.getKey(), null, - compoundValue); + columnPrefix.store(rowKey, table, + StringKeyConverter.getInstance().encode(connectedEntity.getKey()), + null, compoundValue); } } @@ -337,7 +341,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (info != null) { for (Map.Entry<String, Object> entry : info.entrySet()) { ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, - entry.getKey(), null, entry.getValue()); + StringKeyConverter.getInstance().encode(entry.getKey()), null, + entry.getValue()); } } } else { @@ -349,8 +354,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements Map<String, Object> info = te.getInfo(); if (info != null) { for (Map.Entry<String, Object> entry : info.entrySet()) { - EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), - null, entry.getValue()); + EntityColumnPrefix.INFO.store(rowKey, entityTable, + StringKeyConverter.getInstance().encode(entry.getKey()), null, + entry.getValue()); } } } @@ -365,11 +371,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return; } for (Map.Entry<String, String> entry : config.entrySet()) { + byte[] configKey = + StringKeyConverter.getInstance().encode(entry.getKey()); if (isApplication) { ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, - entry.getKey(), null, entry.getValue()); + configKey, null, entry.getValue()); } else { - EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(), + EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey, null, entry.getValue()); } } @@ -383,7 +391,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements boolean isApplication) throws IOException { if (metrics != null) { for (TimelineMetric metric : metrics) { - String metricColumnQualifier = metric.getId(); + byte[] metricColumnQualifier = + StringKeyConverter.getInstance().encode(metric.getId()); Map<Long, Number> timeseries = metric.getValues(); for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); @@ -416,41 +425,31 @@ public class HBaseTimelineWriterImpl extends AbstractService implements "! Using the current timestamp"); eventTimestamp = System.currentTimeMillis(); } - byte[] eventTs = - Bytes.toBytes(TimelineStorageUtils.invertLong(eventTimestamp)); + EventColumnNameConverter converter = + EventColumnNameConverter.getInstance(); Map<String, Object> eventInfo = event.getInfo(); if ((eventInfo == null) || (eventInfo.size() == 0)) { + byte[] columnQualifierBytes = converter.encode( + new EventColumnName(eventId, eventTimestamp, null)); if (isApplication) { - byte[] compoundColumnQualifierBytes = - ApplicationColumnPrefix.EVENT. - getCompoundColQualBytes(eventId, eventTs, null); ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - compoundColumnQualifierBytes, null, - TimelineStorageUtils.EMPTY_BYTES); + columnQualifierBytes, null, Separator.EMPTY_BYTES); } else { - byte[] compoundColumnQualifierBytes = - EntityColumnPrefix.EVENT. - getCompoundColQualBytes(eventId, eventTs, null); EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifierBytes, null, - TimelineStorageUtils.EMPTY_BYTES); + columnQualifierBytes, null, Separator.EMPTY_BYTES); } } else { for (Map.Entry<String, Object> info : eventInfo.entrySet()) { - // eventId?infoKey - byte[] infoKey = Bytes.toBytes(info.getKey()); + // eventId=infoKey + byte[] columnQualifierBytes = converter.encode( + new EventColumnName(eventId, eventTimestamp, + info.getKey())); if (isApplication) { - byte[] compoundColumnQualifierBytes = - ApplicationColumnPrefix.EVENT. - getCompoundColQualBytes(eventId, eventTs, infoKey); ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - compoundColumnQualifierBytes, null, info.getValue()); + columnQualifierBytes, null, info.getValue()); } else { - byte[] compoundColumnQualifierBytes = - EntityColumnPrefix.EVENT. - getCompoundColQualBytes(eventId, eventTs, infoKey); EntityColumnPrefix.EVENT.store(rowKey, entityTable, - compoundColumnQualifierBytes, null, info.getValue()); + columnQualifierBytes, null, info.getValue()); } } // for info: eventInfo } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index 1dfc4db..0febc67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; @@ -56,7 +57,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { /** * Lifecycle events for an application. */ - EVENT(ApplicationColumnFamily.INFO, "e", true), + EVENT(ApplicationColumnFamily.INFO, "e"), /** * Config column stores configuration with config key as the column name. @@ -78,7 +79,6 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { */ private final String columnPrefix; private final byte[] columnPrefixBytes; - private final boolean compoundColQual; /** * Private constructor, meant to be used by the enum definition. @@ -88,18 +88,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { */ private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, String columnPrefix) { - this(columnFamily, columnPrefix, false, GenericConverter.getInstance()); - } - - private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, - String columnPrefix, boolean compoundColQual) { - this(columnFamily, columnPrefix, compoundColQual, - GenericConverter.getInstance()); - } - - private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, - String columnPrefix, ValueConverter converter) { - this(columnFamily, columnPrefix, false, converter); + this(columnFamily, columnPrefix, GenericConverter.getInstance()); } /** @@ -111,7 +100,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { * this column prefix. */ private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, - String columnPrefix, boolean compoundColQual, ValueConverter converter) { + String columnPrefix, ValueConverter converter) { column = new ColumnHelper<ApplicationTable>(columnFamily, converter); this.columnFamily = columnFamily; this.columnPrefix = columnPrefix; @@ -122,7 +111,6 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); } - this.compoundColQual = compoundColQual; } /** @@ -149,15 +137,6 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { return columnFamily.getBytes(); } - @Override - public byte[] getCompoundColQualBytes(String qualifier, - byte[]...components) { - if (!compoundColQual) { - return ColumnHelper.getColumnQualifier(null, qualifier); - } - return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components); - } - /* * (non-Javadoc) * @@ -232,25 +211,12 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result) - */ - public Map<String, Object> readResults(Result result) throws IOException { - return column.readResults(result, columnPrefixBytes); - } - - /** - * @param result from which to read columns - * @return the latest values of columns in the column family. The column - * qualifier is returned as a list of parts, each part a byte[]. This - * is to facilitate returning byte arrays of values that were not - * Strings. If they can be treated as Strings, you should use - * {@link #readResults(Result)} instead. - * @throws IOException if any problem occurs while reading results. + * #readResults(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException { - return column.readResultsHavingCompoundColumnQualifiers(result, - columnPrefixBytes); + public <K> Map<K, Object> readResults(Result result, + KeyConverter<K> keyConverter) throws IOException { + return column.readResults(result, columnPrefixBytes, keyConverter); } /* @@ -258,11 +224,14 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) */ - public <V> NavigableMap<String, NavigableMap<Long, V>> - readResultsWithTimestamps(Result result) throws IOException { - return column.readResultsWithTimestamps(result, columnPrefixBytes); + public <K, V> NavigableMap<K, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter) + throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes, + keyConverter); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java index ad2aa7a..e476b21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -15,11 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +package org.apache.hadoop.yarn.server.timelineservice.storage.application; /** * Represents a rowkey for the application table. @@ -28,11 +25,11 @@ public class ApplicationRowKey { private final String clusterId; private final String userId; private final String flowName; - private final long flowRunId; + private final Long flowRunId; private final String appId; public ApplicationRowKey(String clusterId, String userId, String flowName, - long flowRunId, String appId) { + Long flowRunId, String appId) { this.clusterId = clusterId; this.userId = userId; this.flowName = flowName; @@ -52,7 +49,7 @@ public class ApplicationRowKey { return flowName; } - public long getFlowRunId() { + public Long getFlowRunId() { return flowRunId; } @@ -71,9 +68,8 @@ public class ApplicationRowKey { */ public static byte[] getRowKeyPrefix(String clusterId, String userId, String flowName) { - byte[] first = Bytes.toBytes( - Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName)); - return Separator.QUALIFIERS.join(first, new byte[0]); + return ApplicationRowKeyConverter.getInstance().encode( + new ApplicationRowKey(clusterId, userId, flowName, null, null)); } /** @@ -88,10 +84,8 @@ public class ApplicationRowKey { */ public static byte[] getRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId) { - byte[] first = Bytes.toBytes( - Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName)); - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); - return Separator.QUALIFIERS.join(first, second, new byte[0]); + return ApplicationRowKeyConverter.getInstance().encode( + new ApplicationRowKey(clusterId, userId, flowName, flowRunId, null)); } /** @@ -107,14 +101,8 @@ public class ApplicationRowKey { */ public static byte[] getRowKey(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - byte[] first = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId, - flowName)); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId)); - byte[] third = TimelineStorageUtils.encodeAppId(appId); - return Separator.QUALIFIERS.join(first, second, third); + return ApplicationRowKeyConverter.getInstance().encode( + new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId)); } /** @@ -124,22 +112,6 @@ public class ApplicationRowKey { * @return An <cite>ApplicationRowKey</cite> object. */ public static ApplicationRowKey parseRowKey(byte[] rowKey) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); - - if (rowKeyComponents.length < 5) { - throw new IllegalArgumentException("the row key is not valid for " + - "an application"); - } - - String clusterId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); - String userId = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1])); - String flowName = - Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); - long flowRunId = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]); - return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); + return ApplicationRowKeyConverter.getInstance().decode(rowKey); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java new file mode 100644 index 0000000..3b054a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +/** + * Encodes and decodes row key for application table. + * The row key is of the form : clusterId!userName!flowName!flowRunId!appId. + * flowRunId is a long, appId is encoded and decoded using + * {@link AppIdKeyConverter} and rest are strings. + */ +public final class ApplicationRowKeyConverter implements + KeyConverter<ApplicationRowKey> { + private static final ApplicationRowKeyConverter INSTANCE = + new ApplicationRowKeyConverter(); + + public static ApplicationRowKeyConverter getInstance() { + return INSTANCE; + } + + private ApplicationRowKeyConverter() { + } + + // Application row key is of the form + // clusterId!userName!flowName!flowRunId!appId with each segment separated + // by !. The sizes below indicate sizes of each one of these segements in + // sequence. clusterId, userName and flowName are strings. flowrunId is a long + // hence 8 bytes in size. app id is represented as 12 bytes with cluster + // timestamp part of appid being 8 bytes(long) and seq id being 4 bytes(int). + // Strings are variable in size (i.e. end whenever separator is encountered). + // This is used while decoding and helps in determining where to split. + private static final int[] SEGMENT_SIZES = { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize() }; + + /* + * (non-Javadoc) + * + * Encodes ApplicationRowKey object into a byte array with each + * component/field in ApplicationRowKey separated by Separator#QUALIFIERS. + * This leads to an application table row key of the form + * clusterId!userName!flowName!flowRunId!appId + * If flowRunId in passed ApplicationRowKey object is null (and the fields + * preceding it i.e. clusterId, userId and flowName are not null), this + * returns a row key prefix of the form clusterId!userName!flowName! and if + * appId in ApplicationRowKey is null (other 4 components are not null), this + * returns a row key prefix of the form clusterId!userName!flowName!flowRunId! + * flowRunId is inverted while encoding as it helps maintain a descending + * order for row keys in application table. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(ApplicationRowKey rowKey) { + byte[] cluster = Separator.encode(rowKey.getClusterId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] user = Separator.encode(rowKey.getUserId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] flow = Separator.encode(rowKey.getFlowName(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] first = Separator.QUALIFIERS.join(cluster, user, flow); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + if (rowKey.getFlowRunId() == null) { + return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); + } + byte[] second = Bytes.toBytes( + TimelineStorageUtils.invertLong(rowKey.getFlowRunId())); + if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) { + return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES); + } + byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + return Separator.QUALIFIERS.join(first, second, third); + } + + /* + * (non-Javadoc) + * + * Decodes an application row key of the form + * clusterId!userName!flowName!flowRunId!appId represented in byte format and + * converts it into an ApplicationRowKey object.flowRunId is inverted while + * decoding as it was inverted while encoding. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public ApplicationRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 5) { + throw new IllegalArgumentException("the row key is not valid for " + + "an application"); + } + String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long flowRunId = + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); + String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); + return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index 3085bb1..6a38e32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - /** * Represents a rowkey for the app_flow table. */ @@ -50,9 +46,8 @@ public class AppToFlowRowKey { * @return byte array with the row key */ public static byte[] getRowKey(String clusterId, String appId) { - byte[] first = Bytes.toBytes(clusterId); - byte[] second = TimelineStorageUtils.encodeAppId(appId); - return Separator.QUALIFIERS.join(first, second); + return AppToFlowRowKeyConverter.getInstance().encode( + new AppToFlowRowKey(clusterId, appId)); } /** @@ -62,15 +57,6 @@ public class AppToFlowRowKey { * @return an <cite>AppToFlowRowKey</cite> object. */ public static AppToFlowRowKey parseRowKey(byte[] rowKey) { - byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); - - if (rowKeyComponents.length < 2) { - throw new IllegalArgumentException("the row key is not valid for " + - "the app-to-flow table"); - } - - String clusterId = Bytes.toString(rowKeyComponents[0]); - String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]); - return new AppToFlowRowKey(clusterId, appId); + return AppToFlowRowKeyConverter.getInstance().decode(rowKey); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java new file mode 100644 index 0000000..0f0b879d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Encodes and decodes row key for app_flow table. + * The row key is of the form : clusterId!appId. + * clusterId is a string and appId is encoded/decoded using + * {@link AppIdKeyConverter}. + */ +public final class AppToFlowRowKeyConverter + implements KeyConverter<AppToFlowRowKey> { + private static final AppToFlowRowKeyConverter INSTANCE = + new AppToFlowRowKeyConverter(); + + public static AppToFlowRowKeyConverter getInstance() { + return INSTANCE; + } + + private AppToFlowRowKeyConverter() { + } + + // App to flow row key is of the form clusterId!appId with the 2 segments + // separated by !. The sizes below indicate sizes of both of these segments + // in sequence. clusterId is a string. appId is represented as 12 bytes with + // cluster Timestamp part of appid being 8 bytes(long) and seq id being 4 + // bytes(int). + // Strings are variable in size (i.e. end whenever separator is encountered). + // This is used while decoding and helps in determining where to split. + private static final int[] SEGMENT_SIZES = { + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT }; + + /* + * (non-Javadoc) + * + * Encodes AppToFlowRowKey object into a byte array with each component/field + * in AppToFlowRowKey separated by Separator#QUALIFIERS. This leads to an + * app to flow table row key of the form clusterId!appId + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(AppToFlowRowKey rowKey) { + byte[] first = Separator.encode(rowKey.getClusterId(), + Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + return Separator.QUALIFIERS.join(first, second); + } + + /* + * (non-Javadoc) + * + * Decodes an app to flow row key of the form clusterId!appId represented in + * byte format and converts it into an AppToFlowRowKey object. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public AppToFlowRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 2) { + throw new IllegalArgumentException("the row key is not valid for " + + "the app-to-flow table"); + } + String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]); + return new AppToFlowRowKey(clusterId, appId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java new file mode 100644 index 0000000..a173b0f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * Encodes and decodes {@link ApplicationId} for row keys. + * App ID is stored in row key as 12 bytes, cluster timestamp section of app id + * (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes). + */ +public final class AppIdKeyConverter implements KeyConverter<String> { + private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter(); + + public static AppIdKeyConverter getInstance() { + return INSTANCE; + } + + private AppIdKeyConverter() { + } + + /* + * (non-Javadoc) + * + * Converts/encodes a string app Id into a byte representation for (row) keys. + * For conversion, we extract cluster timestamp and sequence id from the + * string app id (calls ConverterUtils#toApplicationId(String) for + * conversion) and then store it in a byte array of length 12 (8 bytes (long) + * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster + * timestamp and sequence id are inverted so that the most recent cluster + * timestamp and highest sequence id appears first in the table (i.e. + * application id appears in a descending order). + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(String appIdStr) { + ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); + byte[] appIdBytes = new byte[getKeySize()]; + byte[] clusterTs = Bytes.toBytes( + TimelineStorageUtils.invertLong(appId.getClusterTimestamp())); + System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG); + byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId())); + System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT); + return appIdBytes; + } + + /* + * (non-Javadoc) + * + * Converts/decodes a 12 byte representation of app id for (row) keys to an + * app id in string format which can be returned back to client. + * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster + * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls + * ApplicationId#toString to generate string representation of app id. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public String decode(byte[] appIdBytes) { + if (appIdBytes.length != getKeySize()) { + throw new IllegalArgumentException("Invalid app id in byte format"); + } + long clusterTs = TimelineStorageUtils.invertLong( + Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG)); + int seqId = TimelineStorageUtils.invertInt( + Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT)); + return ApplicationId.newInstance(clusterTs, seqId).toString(); + } + + /** + * Returns the size of app id after encoding. + * + * @return size of app id after encoding. + */ + public static int getKeySize() { + return Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index 759bf27..be55db5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -166,19 +166,22 @@ public class ColumnHelper<T> { * @param result from which to reads data with timestamps * @param columnPrefixBytes optional prefix to limit columns. If null all * columns are returned. + * @param <K> identifies the type of column name(indicated by type of key + * converter). * @param <V> the type of the values. The values will be cast into that type. + * @param keyConverter used to convert column bytes to the appropriate key + * type. * @return the cell values at each respective time in for form * {@literal {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}}} * @throws IOException if any problem occurs while reading results. */ @SuppressWarnings("unchecked") - public <V> NavigableMap<String, NavigableMap<Long, V>> - readResultsWithTimestamps(Result result, byte[] columnPrefixBytes) - throws IOException { + public <K, V> NavigableMap<K, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, byte[] columnPrefixBytes, + KeyConverter<K> keyConverter) throws IOException { - NavigableMap<String, NavigableMap<Long, V>> results = - new TreeMap<String, NavigableMap<Long, V>>(); + NavigableMap<K, NavigableMap<Long, V>> results = new TreeMap<>(); if (result != null) { NavigableMap< @@ -192,13 +195,17 @@ public class ColumnHelper<T> { if (columnCellMap != null) { for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap .entrySet()) { - String columnName = null; + K converterColumnKey = null; if (columnPrefixBytes == null) { if (LOG.isDebugEnabled()) { LOG.debug("null prefix was specified; returning all columns"); } - // Decode the spaces we encoded in the column name. - columnName = Separator.decode(entry.getKey(), Separator.SPACE); + try { + converterColumnKey = keyConverter.decode(entry.getKey()); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } } else { // A non-null prefix means columns are actually of the form // prefix!columnNameRemainder @@ -207,13 +214,18 @@ public class ColumnHelper<T> { byte[] actualColumnPrefixBytes = columnNameParts[0]; if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) && columnNameParts.length == 2) { - // This is the prefix that we want - columnName = Separator.decode(columnNameParts[1]); + try { + // This is the prefix that we want + converterColumnKey = keyConverter.decode(columnNameParts[1]); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } } } // If this column has the prefix we want - if (columnName != null) { + if (converterColumnKey != null) { NavigableMap<Long, V> cellResults = new TreeMap<Long, V>(); NavigableMap<Long, byte[]> cells = entry.getValue(); @@ -226,7 +238,7 @@ public class ColumnHelper<T> { value); } } - results.put(columnName, cellResults); + results.put(converterColumnKey, cellResults); } } // for entry : columnCellMap } // if columnCellMap != null @@ -235,20 +247,24 @@ public class ColumnHelper<T> { } /** + * @param <K> identifies the type of column name(indicated by type of key + * converter). * @param result from which to read columns * @param columnPrefixBytes optional prefix to limit columns. If null all * columns are returned. - * @return the latest values of columns in the column family. This assumes - * that the column name parts are all Strings by default. If the - * column name parts should be treated natively and not be converted - * back and forth from Strings, you should use - * {@link #readResultsHavingCompoundColumnQualifiers(Result, byte[])} - * instead. + * @param keyConverter used to convert column bytes to the appropriate key + * type. + * @return the latest values of columns in the column family. If the column + * prefix is null, the column qualifier is returned as Strings. For a + * non-null column prefix bytes, the column qualifier is returned as + * a list of parts, each part a byte[]. This is to facilitate + * returning byte arrays of values that were not Strings. * @throws IOException if any problem occurs while reading results. */ - public Map<String, Object> readResults(Result result, - byte[] columnPrefixBytes) throws IOException { - Map<String, Object> results = new HashMap<String, Object>(); + public <K> Map<K, Object> readResults(Result result, + byte[] columnPrefixBytes, KeyConverter<K> keyConverter) + throws IOException { + Map<K, Object> results = new HashMap<K, Object>(); if (result != null) { Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes); @@ -256,84 +272,40 @@ public class ColumnHelper<T> { byte[] columnKey = entry.getKey(); if (columnKey != null && columnKey.length > 0) { - String columnName = null; + K converterColumnKey = null; if (columnPrefixBytes == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("null prefix was specified; returning all columns"); + try { + converterColumnKey = keyConverter.decode(columnKey); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; } - // Decode the spaces we encoded in the column name. - columnName = Separator.decode(columnKey, Separator.SPACE); } else { // A non-null prefix means columns are actually of the form // prefix!columnNameRemainder - byte[][] columnNameParts = - Separator.QUALIFIERS.split(columnKey, 2); - byte[] actualColumnPrefixBytes = columnNameParts[0]; - if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) - && columnNameParts.length == 2) { - // This is the prefix that we want - // if the column name is a compound qualifier - // with non string datatypes, the following decode will not - // work correctly since it considers all components to be String - // invoke the readResultsHavingCompoundColumnQualifiers function - columnName = Separator.decode(columnNameParts[1]); + byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2); + if (columnNameParts.length > 0) { + byte[] actualColumnPrefixBytes = columnNameParts[0]; + // If this is the prefix that we want + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + try { + converterColumnKey = keyConverter.decode(columnNameParts[1]); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } + } } - } + } // if-else - // If this column has the prefix we want - if (columnName != null) { + // If the columnPrefix is null (we want all columns), or the actual + // prefix matches the given prefix we want this column + if (converterColumnKey != null) { Object value = converter.decodeValue(entry.getValue()); - results.put(columnName, value); - } - } - } // for entry - } - return results; - } - - /** - * @param result from which to read columns - * @param columnPrefixBytes optional prefix to limit columns. If null all - * columns are returned. - * @return the latest values of columns in the column family. If the column - * prefix is null, the column qualifier is returned as Strings. For a - * non-null column prefix bytes, the column qualifier is returned as - * a list of parts, each part a byte[]. This is to facilitate - * returning byte arrays of values that were not Strings. - * @throws IOException if any problem occurs while reading results. - */ - public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result, - byte[] columnPrefixBytes) throws IOException { - // handle the case where the column prefix is null - // it is the same as readResults() so simply delegate to that implementation - if (columnPrefixBytes == null) { - return readResults(result, null); - } - - Map<byte[][], Object> results = new HashMap<byte[][], Object>(); - - if (result != null) { - Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes); - for (Entry<byte[], byte[]> entry : columns.entrySet()) { - byte[] columnKey = entry.getKey(); - if (columnKey != null && columnKey.length > 0) { - // A non-null prefix means columns are actually of the form - // prefix!columnNameRemainder - // with a compound column qualifier, we are presuming existence of a - // prefix - byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2); - if (columnNameParts.length > 0) { - byte[] actualColumnPrefixBytes = columnNameParts[0]; - if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) - && columnNameParts.length == 2) { - // This is the prefix that we want - byte[][] columnQualifierParts = - Separator.VALUES.split(columnNameParts[1]); - Object value = converter.decodeValue(entry.getValue()); - // we return the columnQualifier in parts since we don't know - // which part is of which data type - results.put(columnQualifierParts, value); - } + // we return the columnQualifier in parts since we don't know + // which part is of which data type. + results.put(converterColumnKey, value); } } } // for entry @@ -353,8 +325,9 @@ public class ColumnHelper<T> { public static byte[] getColumnQualifier(byte[] columnPrefixBytes, String qualifier) { - // We don't want column names to have spaces - byte[] encodedQualifier = Bytes.toBytes(Separator.SPACE.encode(qualifier)); + // We don't want column names to have spaces / tabs. + byte[] encodedQualifier = + Separator.encode(qualifier, Separator.SPACE, Separator.TAB); if (columnPrefixBytes == null) { return encodedQualifier; } @@ -367,22 +340,6 @@ public class ColumnHelper<T> { } /** - * Create a compound column qualifier by combining qualifier and components. - * - * @param qualifier Column QUalifier. - * @param components Other components. - * @return a byte array representing compound column qualifier. - */ - public static byte[] getCompoundColumnQualifierBytes(String qualifier, - byte[]...components) { - byte[] colQualBytes = Bytes.toBytes(Separator.VALUES.encode(qualifier)); - for (int i = 0; i < components.length; i++) { - colQualBytes = Separator.VALUES.join(colQualBytes, components[i]); - } - return colQualBytes; - } - - /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. * @param qualifier for the remainder of the column. http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index e4b7f16..89aa013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -91,37 +91,33 @@ public interface ColumnPrefix<T> { Object readResult(Result result, String qualifier) throws IOException; /** - * @param result from which to read columns + * + * @param <K> identifies the type of key converter. + * @param result from which to read columns. + * @param keyConverter used to convert column bytes to the appropriate key + * type * @return the latest values of columns in the column family with this prefix * (or all of them if the prefix value is null). * @throws IOException if there is any exception encountered while reading - * results. + * results. */ - Map<String, Object> readResults(Result result) throws IOException; + <K> Map<K, Object> readResults(Result result, KeyConverter<K> keyConverter) + throws IOException; /** - * @param result from which to reads data with timestamps + * @param result from which to reads data with timestamps. + * @param <K> identifies the type of key converter. * @param <V> the type of the values. The values will be cast into that type. + * @param keyConverter used to convert column bytes to the appropriate key + * type. * @return the cell values at each respective time in for form * {@literal {idA={timestamp1->value1}, idA={timestamp2->value2}, * idB={timestamp3->value3}, idC={timestamp1->value4}}} * @throws IOException if there is any exception encountered while reading * result. */ - <V> NavigableMap<String, NavigableMap<Long, V>> - readResultsWithTimestamps(Result result) throws IOException; - - /** - * @param result from which to read columns - * @return the latest values of columns in the column family. The column - * qualifier is returned as a list of parts, each part a byte[]. This - * is to facilitate returning byte arrays of values that were not - * Strings. If they can be treated as Strings, you should use - * {@link #readResults(Result)} instead. - * @throws IOException if any problem occurs while reading results. - */ - Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result) - throws IOException; + <K, V> NavigableMap<K, NavigableMap<Long, V>> readResultsWithTimestamps( + Result result, KeyConverter<K> keyConverter) throws IOException; /** * @param qualifierPrefix Column qualifier or prefix of qualifier. @@ -146,15 +142,4 @@ public interface ColumnPrefix<T> { * @return a {@link ValueConverter} implementation. */ ValueConverter getValueConverter(); - - /** - * Get compound column qualifier bytes if the column qualifier is a compound - * qualifier. Returns the qualifier passed as bytes if the column is not a - * compound column qualifier. - * - * @param qualifier Column Qualifier. - * @param components Other components. - * @return byte array representing compound column qualifier. - */ - byte[] getCompoundColQualBytes(String qualifier, byte[]...components); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java new file mode 100644 index 0000000..6018f86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +/** + * Encapsulates information about Event column names for application and entity + * tables. Used while encoding/decoding event column names. + */ +public class EventColumnName { + + private final String id; + private final Long timestamp; + private final String infoKey; + + public EventColumnName(String id, Long timestamp, String infoKey) { + this.id = id; + this.timestamp = timestamp; + this.infoKey = infoKey; + } + + public String getId() { + return id; + } + + public Long getTimestamp() { + return timestamp; + } + + public String getInfoKey() { + return infoKey; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org