YARN-5928. Move ATSv2 HBase backend code into a new module that is only dependent at runtime by yarn servers. Contributed by Haibo Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b92089c0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b92089c0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b92089c0 Branch: refs/heads/YARN-5355 Commit: b92089c0e8ab1b87b8b5b55b1e3d4367ae5d847a Parents: 0327a79 Author: Sangjin Lee <sj...@apache.org> Authored: Thu Jan 19 21:21:48 2017 -0800 Committer: Sangjin Lee <sj...@apache.org> Committed: Thu Jan 19 21:21:48 2017 -0800 ---------------------------------------------------------------------- hadoop-project/pom.xml | 13 +- .../TestRMHATimelineCollectors.java | 6 + .../server/resourcemanager/TestRMRestart.java | 4 + .../TestResourceTrackerService.java | 4 + .../pom.xml | 12 + .../pom.xml | 190 +++++ .../reader/filter/TimelineFilterUtils.java | 307 ++++++++ .../reader/filter/package-info.java | 28 + .../storage/HBaseTimelineReaderImpl.java | 96 +++ .../storage/HBaseTimelineWriterImpl.java | 547 ++++++++++++++ .../storage/TimelineSchemaCreator.java | 251 +++++++ .../storage/application/ApplicationColumn.java | 156 ++++ .../application/ApplicationColumnFamily.java | 65 ++ .../application/ApplicationColumnPrefix.java | 288 ++++++++ .../storage/application/ApplicationRowKey.java | 206 ++++++ .../application/ApplicationRowKeyPrefix.java | 69 ++ .../storage/application/ApplicationTable.java | 161 ++++ .../storage/application/package-info.java | 28 + .../apptoflow/AppToFlowColumnFamily.java | 51 ++ .../apptoflow/AppToFlowColumnPrefix.java | 206 ++++++ .../storage/apptoflow/AppToFlowRowKey.java | 58 ++ .../storage/apptoflow/AppToFlowTable.java | 124 ++++ .../storage/apptoflow/package-info.java | 28 + .../storage/common/AppIdKeyConverter.java | 96 +++ .../storage/common/BaseTable.java | 167 +++++ .../common/BufferedMutatorDelegator.java | 73 ++ .../timelineservice/storage/common/Column.java | 80 ++ .../storage/common/ColumnFamily.java | 34 + .../storage/common/ColumnHelper.java | 389 ++++++++++ .../storage/common/ColumnPrefix.java | 145 ++++ .../storage/common/EventColumnName.java | 63 ++ .../common/EventColumnNameConverter.java | 99 +++ .../storage/common/GenericConverter.java | 48 ++ .../common/HBaseTimelineStorageUtils.java | 306 ++++++++ .../storage/common/KeyConverter.java | 41 ++ .../storage/common/LongConverter.java | 94 +++ .../storage/common/LongKeyConverter.java | 68 ++ .../storage/common/NumericValueConverter.java | 39 + .../timelineservice/storage/common/Range.java | 62 ++ .../storage/common/RowKeyPrefix.java | 42 ++ .../storage/common/Separator.java | 575 +++++++++++++++ .../storage/common/StringKeyConverter.java | 54 ++ .../common/TimelineHBaseSchemaConstants.java | 71 ++ .../storage/common/TimestampGenerator.java | 116 +++ .../storage/common/TypedBufferedMutator.java | 28 + .../storage/common/ValueConverter.java | 47 ++ .../storage/common/package-info.java | 28 + .../storage/entity/EntityColumn.java | 160 ++++ .../storage/entity/EntityColumnFamily.java | 65 ++ .../storage/entity/EntityColumnPrefix.java | 300 ++++++++ .../storage/entity/EntityRowKey.java | 249 +++++++ .../storage/entity/EntityRowKeyPrefix.java | 77 ++ .../storage/entity/EntityTable.java | 161 ++++ .../storage/entity/package-info.java | 28 + .../flow/AggregationCompactionDimension.java | 63 ++ .../storage/flow/AggregationOperation.java | 94 +++ .../timelineservice/storage/flow/Attribute.java | 39 + .../storage/flow/FlowActivityColumnFamily.java | 55 ++ .../storage/flow/FlowActivityColumnPrefix.java | 277 +++++++ .../storage/flow/FlowActivityRowKey.java | 196 +++++ .../storage/flow/FlowActivityRowKeyPrefix.java | 60 ++ .../storage/flow/FlowActivityTable.java | 108 +++ .../storage/flow/FlowRunColumn.java | 182 +++++ .../storage/flow/FlowRunColumnFamily.java | 54 ++ .../storage/flow/FlowRunColumnPrefix.java | 268 +++++++ .../storage/flow/FlowRunCoprocessor.java | 274 +++++++ .../storage/flow/FlowRunRowKey.java | 190 +++++ .../storage/flow/FlowRunRowKeyPrefix.java | 54 ++ .../storage/flow/FlowRunTable.java | 150 ++++ .../storage/flow/FlowScanner.java | 728 +++++++++++++++++++ .../storage/flow/FlowScannerOperation.java | 46 ++ .../storage/flow/package-info.java | 29 + .../timelineservice/storage/package-info.java | 28 + .../reader/AbstractTimelineStorageReader.java | 158 ++++ .../storage/reader/ApplicationEntityReader.java | 502 +++++++++++++ .../storage/reader/EntityTypeReader.java | 180 +++++ .../reader/FlowActivityEntityReader.java | 163 +++++ .../storage/reader/FlowRunEntityReader.java | 290 ++++++++ .../storage/reader/GenericEntityReader.java | 628 ++++++++++++++++ .../storage/reader/TimelineEntityReader.java | 458 ++++++++++++ .../reader/TimelineEntityReaderFactory.java | 102 +++ .../storage/reader/package-info.java | 28 + .../storage/common/TestKeyConverters.java | 130 ++++ .../storage/common/TestRowKeys.java | 250 +++++++ .../storage/common/TestSeparator.java | 215 ++++++ .../hadoop-yarn-server-timelineservice/pom.xml | 66 -- .../reader/filter/TimelineFilterUtils.java | 307 -------- .../storage/HBaseTimelineReaderImpl.java | 96 --- .../storage/HBaseTimelineWriterImpl.java | 547 -------------- .../storage/TimelineSchemaCreator.java | 251 ------- .../storage/application/ApplicationColumn.java | 156 ---- .../application/ApplicationColumnFamily.java | 65 -- .../application/ApplicationColumnPrefix.java | 288 -------- .../storage/application/ApplicationRowKey.java | 206 ------ .../application/ApplicationRowKeyPrefix.java | 69 -- .../storage/application/ApplicationTable.java | 161 ---- .../storage/application/package-info.java | 28 - .../apptoflow/AppToFlowColumnFamily.java | 51 -- .../apptoflow/AppToFlowColumnPrefix.java | 206 ------ .../storage/apptoflow/AppToFlowRowKey.java | 58 -- .../storage/apptoflow/AppToFlowTable.java | 124 ---- .../storage/apptoflow/package-info.java | 28 - .../storage/common/AppIdKeyConverter.java | 96 --- .../storage/common/BaseTable.java | 167 ----- .../common/BufferedMutatorDelegator.java | 73 -- .../timelineservice/storage/common/Column.java | 80 -- .../storage/common/ColumnFamily.java | 34 - .../storage/common/ColumnHelper.java | 389 ---------- .../storage/common/ColumnPrefix.java | 145 ---- .../storage/common/EventColumnName.java | 63 -- .../common/EventColumnNameConverter.java | 99 --- .../storage/common/GenericConverter.java | 48 -- .../common/HBaseTimelineStorageUtils.java | 306 -------- .../storage/common/KeyConverter.java | 41 -- .../storage/common/LongConverter.java | 94 --- .../storage/common/LongKeyConverter.java | 68 -- .../storage/common/NumericValueConverter.java | 39 - .../timelineservice/storage/common/Range.java | 62 -- .../storage/common/RowKeyPrefix.java | 42 -- .../storage/common/Separator.java | 575 --------------- .../storage/common/StringKeyConverter.java | 54 -- .../common/TimelineHBaseSchemaConstants.java | 71 -- .../storage/common/TimestampGenerator.java | 116 --- .../storage/common/TypedBufferedMutator.java | 28 - .../storage/common/ValueConverter.java | 47 -- .../storage/entity/EntityColumn.java | 160 ---- .../storage/entity/EntityColumnFamily.java | 65 -- .../storage/entity/EntityColumnPrefix.java | 300 -------- .../storage/entity/EntityRowKey.java | 249 ------- .../storage/entity/EntityRowKeyPrefix.java | 77 -- .../storage/entity/EntityTable.java | 161 ---- .../storage/entity/package-info.java | 28 - .../flow/AggregationCompactionDimension.java | 63 -- .../storage/flow/AggregationOperation.java | 94 --- .../timelineservice/storage/flow/Attribute.java | 39 - .../storage/flow/FlowActivityColumnFamily.java | 55 -- .../storage/flow/FlowActivityColumnPrefix.java | 277 ------- .../storage/flow/FlowActivityRowKey.java | 196 ----- .../storage/flow/FlowActivityRowKeyPrefix.java | 60 -- .../storage/flow/FlowActivityTable.java | 108 --- .../storage/flow/FlowRunColumn.java | 182 ----- .../storage/flow/FlowRunColumnFamily.java | 54 -- .../storage/flow/FlowRunColumnPrefix.java | 268 ------- .../storage/flow/FlowRunCoprocessor.java | 274 ------- .../storage/flow/FlowRunRowKey.java | 190 ----- .../storage/flow/FlowRunRowKeyPrefix.java | 54 -- .../storage/flow/FlowRunTable.java | 150 ---- .../storage/flow/FlowScanner.java | 728 ------------------- .../storage/flow/FlowScannerOperation.java | 46 -- .../storage/flow/package-info.java | 29 - .../reader/AbstractTimelineStorageReader.java | 158 ---- .../storage/reader/ApplicationEntityReader.java | 502 ------------- .../storage/reader/EntityTypeReader.java | 180 ----- .../reader/FlowActivityEntityReader.java | 163 ----- .../storage/reader/FlowRunEntityReader.java | 290 -------- .../storage/reader/GenericEntityReader.java | 628 ---------------- .../storage/reader/TimelineEntityReader.java | 458 ------------ .../reader/TimelineEntityReaderFactory.java | 102 --- .../storage/reader/package-info.java | 28 - .../storage/common/TestKeyConverters.java | 130 ---- .../storage/common/TestRowKeys.java | 250 ------- .../storage/common/TestSeparator.java | 215 ------ .../hadoop-yarn/hadoop-yarn-server/pom.xml | 1 + .../src/site/markdown/TimelineServiceV2.md | 2 +- hadoop-yarn-project/pom.xml | 4 + 165 files changed, 12701 insertions(+), 12463 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 887e8cb..01184c9 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -319,6 +319,12 @@ <type>test-jar</type> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-applications-distributedshell</artifactId> @@ -1152,13 +1158,6 @@ </dependency> <dependency> <groupId>org.apache.hbase</groupId> - <artifactId>hbase-it</artifactId> - <version>${hbase.version}</version> - <scope>test</scope> - <classifier>tests</classifier> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> <version>${hbase.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java index a54ff34..fa0d318 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.Before; import org.junit.Test; @@ -47,8 +49,12 @@ public class TestRMHATimelineCollectors extends RMHATestBase { super.setup(); confForRM1.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); confForRM2.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + confForRM1.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); confForRM1.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); confForRM2.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + confForRM2.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index a98a124..4760f62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -112,6 +112,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; @@ -147,6 +149,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); rmAddr = new InetSocketAddress("localhost", 8032); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4df4996..5ed3278 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -80,6 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -993,6 +995,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + "timeline_collector" + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); rm = new MockRM(conf); rm.start(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index ed8d09a..de99b95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -61,6 +61,18 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hbase-compatible-hadoop.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml new file mode 100644 index 0000000..a2ba4e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml @@ -0,0 +1,190 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hadoop-yarn-server</artifactId> + <groupId>org.apache.hadoop</groupId> + <version>3.0.0-alpha2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId> + <name>Apache Hadoop YARN TimelineService HBase Backend</name> + + <properties> + <!-- Needed for generating FindBugs warnings using parent pom --> + <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-sslengine</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <additionnalDependencies> + <additionnalDependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + </additionnalDependency> + </additionnalDependencies> + </configuration> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java new file mode 100644 index 0000000..8e38e95 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; + +/** + * Set of utility methods used by timeline filter classes. + */ +public final class TimelineFilterUtils { + + private static final Log LOG = LogFactory.getLog(TimelineFilterUtils.class); + + private TimelineFilterUtils() { + } + + /** + * Returns the equivalent HBase filter list's {@link Operator}. + * + * @param op timeline filter list operator. + * @return HBase filter list's Operator. + */ + private static Operator getHBaseOperator(TimelineFilterList.Operator op) { + switch (op) { + case AND: + return Operator.MUST_PASS_ALL; + case OR: + return Operator.MUST_PASS_ONE; + default: + throw new IllegalArgumentException("Invalid operator"); + } + } + + /** + * Returns the equivalent HBase compare filter's {@link CompareOp}. + * + * @param op timeline compare op. + * @return HBase compare filter's CompareOp. + */ + private static CompareOp getHBaseCompareOp( + TimelineCompareOp op) { + switch (op) { + case LESS_THAN: + return CompareOp.LESS; + case LESS_OR_EQUAL: + return CompareOp.LESS_OR_EQUAL; + case EQUAL: + return CompareOp.EQUAL; + case NOT_EQUAL: + return CompareOp.NOT_EQUAL; + case GREATER_OR_EQUAL: + return CompareOp.GREATER_OR_EQUAL; + case GREATER_THAN: + return CompareOp.GREATER; + default: + throw new IllegalArgumentException("Invalid compare operator"); + } + } + + /** + * Converts a {@link TimelinePrefixFilter} to an equivalent HBase + * {@link QualifierFilter}. + * @param colPrefix + * @param filter + * @return a {@link QualifierFilter} object + */ + private static <T> Filter createHBaseColQualPrefixFilter( + ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) { + return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()), + new BinaryPrefixComparator( + colPrefix.getColumnPrefixBytes(filter.getPrefix()))); + } + + /** + * Create a HBase {@link QualifierFilter} for the passed column prefix and + * compare op. + * + * @param <T> Describes the type of column prefix. + * @param compareOp compare op. + * @param columnPrefix column prefix. + * @return a column qualifier filter. + */ + public static <T> Filter createHBaseQualifierFilter(CompareOp compareOp, + ColumnPrefix<T> columnPrefix) { + return new QualifierFilter(compareOp, + new BinaryPrefixComparator( + columnPrefix.getColumnPrefixBytes(""))); + } + + /** + * Create filters for confs or metrics to retrieve. This list includes a + * configs/metrics family filter and relevant filters for confs/metrics to + * retrieve, if present. + * + * @param <T> Describes the type of column prefix. + * @param confsOrMetricToRetrieve configs/metrics to retrieve. + * @param columnFamily config or metric column family. + * @param columnPrefix config or metric column prefix. + * @return a filter list. + * @throws IOException if any problem occurs while creating the filters. + */ + public static <T> Filter createFilterForConfsOrMetricsToRetrieve( + TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily, + ColumnPrefix<T> columnPrefix) throws IOException { + Filter familyFilter = new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(columnFamily.getBytes())); + if (confsOrMetricToRetrieve != null && + !confsOrMetricToRetrieve.getFilterList().isEmpty()) { + // If confsOrMetricsToRetrive are specified, create a filter list based + // on it and family filter. + FilterList filter = new FilterList(familyFilter); + filter.addFilter( + createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve)); + return filter; + } else { + // Only the family filter needs to be added. + return familyFilter; + } + } + + /** + * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified + * value range represented by start and end value and wraps them inside a + * filter list. Start and end value should not be null. + * + * @param <T> Describes the type of column prefix. + * @param column Column for which single column value filter is to be created. + * @param startValue Start value. + * @param endValue End value. + * @return 2 single column value filters wrapped in a filter list. + * @throws IOException if any problem is encountered while encoding value. + */ + public static <T> FilterList createSingleColValueFiltersByRange( + Column<T> column, Object startValue, Object endValue) throws IOException { + FilterList list = new FilterList(); + Filter singleColValFilterStart = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(startValue), + CompareOp.GREATER_OR_EQUAL, true); + list.addFilter(singleColValFilterStart); + + Filter singleColValFilterEnd = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(endValue), + CompareOp.LESS_OR_EQUAL, true); + list.addFilter(singleColValFilterEnd); + return list; + } + + /** + * Creates a HBase {@link SingleColumnValueFilter} with specified column. + * @param <T> Describes the type of column prefix. + * @param column Column which value to be filtered. + * @param value Value to be filtered. + * @param op Compare operator + * @return a SingleColumnValue Filter + * @throws IOException if any exception. + */ + public static <T> Filter createHBaseSingleColValueFilter(Column<T> column, + Object value, CompareOp op) throws IOException { + Filter singleColValFilter = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(value), op, true); + return singleColValFilter; + } + + /** + * Creates a HBase {@link SingleColumnValueFilter}. + * + * @param columnFamily Column Family represented as bytes. + * @param columnQualifier Column Qualifier represented as bytes. + * @param value Value. + * @param compareOp Compare operator. + * @param filterIfMissing This flag decides if we should filter the row if the + * specified column is missing. This is based on the filter's keyMustExist + * field. + * @return a {@link SingleColumnValueFilter} object + * @throws IOException + */ + private static SingleColumnValueFilter createHBaseSingleColValueFilter( + byte[] columnFamily, byte[] columnQualifier, byte[] value, + CompareOp compareOp, boolean filterIfMissing) throws IOException { + SingleColumnValueFilter singleColValFilter = + new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp, + new BinaryComparator(value)); + singleColValFilter.setLatestVersionOnly(true); + singleColValFilter.setFilterIfMissing(filterIfMissing); + return singleColValFilter; + } + + /** + * Fetch columns from filter list containing exists and multivalue equality + * filters. This is done to fetch only required columns from back-end and + * then match event filters or relationships in reader. + * + * @param filterList filter list. + * @return set of columns. + */ + public static Set<String> fetchColumnsFromFilterList( + TimelineFilterList filterList) { + Set<String> strSet = new HashSet<String>(); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter)); + break; + case KEY_VALUES: + strSet.add(((TimelineKeyValuesFilter)filter).getKey()); + break; + case EXISTS: + strSet.add(((TimelineExistsFilter)filter).getValue()); + break; + default: + LOG.info("Unexpected filter type " + filter.getFilterType()); + break; + } + } + return strSet; + } + + /** + * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} + * while converting different timeline filters(of type {@link TimelineFilter}) + * into their equivalent HBase filters. + * + * @param <T> Describes the type of column prefix. + * @param colPrefix column prefix which will be used for conversion. + * @param filterList timeline filter list which has to be converted. + * @return A {@link FilterList} object. + * @throws IOException if any problem occurs while creating the filter list. + */ + public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix, + TimelineFilterList filterList) throws IOException { + FilterList list = + new FilterList(getHBaseOperator(filterList.getOperator())); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + list.addFilter(createHBaseFilterList(colPrefix, + (TimelineFilterList)filter)); + break; + case PREFIX: + list.addFilter(createHBaseColQualPrefixFilter(colPrefix, + (TimelinePrefixFilter)filter)); + break; + case COMPARE: + TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter; + list.addFilter( + createHBaseSingleColValueFilter( + colPrefix.getColumnFamilyBytes(), + colPrefix.getColumnPrefixBytes(compareFilter.getKey()), + colPrefix.getValueConverter(). + encodeValue(compareFilter.getValue()), + getHBaseCompareOp(compareFilter.getCompareOp()), + compareFilter.getKeyMustExist())); + break; + case KEY_VALUE: + TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter; + list.addFilter( + createHBaseSingleColValueFilter( + colPrefix.getColumnFamilyBytes(), + colPrefix.getColumnPrefixBytes(kvFilter.getKey()), + colPrefix.getValueConverter().encodeValue(kvFilter.getValue()), + getHBaseCompareOp(kvFilter.getCompareOp()), + kvFilter.getKeyMustExist())); + break; + default: + LOG.info("Unexpected filter type " + filter.getFilterType()); + break; + } + } + return list; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java new file mode 100644 index 0000000..f7c0705 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.server.timelineservice.reader.filter stores + * timeline filter implementations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java new file mode 100644 index 0000000..ce20113 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + + +import java.io.IOException; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; + +/** + * HBase based implementation for {@link TimelineReader}. + */ +public class HBaseTimelineReaderImpl + extends AbstractService implements TimelineReader { + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineReaderImpl.class); + + private Configuration hbaseConf = null; + private Connection conn; + + public HBaseTimelineReaderImpl() { + super(HBaseTimelineReaderImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + } + + @Override + protected void serviceStop() throws Exception { + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + + @Override + public TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException { + TimelineEntityReader reader = + TimelineEntityReaderFactory.createSingleEntityReader(context, + dataToRetrieve); + return reader.readEntity(hbaseConf, conn); + } + + @Override + public Set<TimelineEntity> getEntities(TimelineReaderContext context, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + TimelineEntityReader reader = + TimelineEntityReaderFactory.createMultipleEntitiesReader(context, + filters, dataToRetrieve); + return reader.readEntities(hbaseConf, conn); + } + + @Override + public Set<String> getEntityTypes(TimelineReaderContext context) + throws IOException { + EntityTypeReader reader = new EntityTypeReader(context); + return reader.readEntityTypes(hbaseConf, conn); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 0000000..642df63 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,547 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +/** + * This implements a hbase based backend for storing the timeline entity + * information. + * It writes to multiple tables at the backend + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + + private Connection conn; + private TypedBufferedMutator<EntityTable> entityTable; + private TypedBufferedMutator<AppToFlowTable> appToFlowTable; + private TypedBufferedMutator<ApplicationTable> applicationTable; + private TypedBufferedMutator<FlowActivityTable> flowActivityTable; + private TypedBufferedMutator<FlowRunTable> flowRunTable; + + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter<String> stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); + + public HBaseTimelineWriterImpl() { + super(HBaseTimelineWriterImpl.class.getName()); + } + + public HBaseTimelineWriterImpl(Configuration conf) throws IOException { + super(conf.get("yarn.application.id", + HBaseTimelineWriterImpl.class.getName())); + } + + /** + * initializes the hbase connection to write to the entity table. + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + Configuration hbaseConf = + HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + entityTable = new EntityTable().getTableMutator(hbaseConf, conn); + appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); + applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); + flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); + flowActivityTable = + new FlowActivityTable().getTableMutator(hbaseConf, conn); + } + + /** + * Stores the entire information in TimelineEntities to the timeline store. + */ + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException { + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + // defensive coding to avoid NPE during row key construction + if ((flowName == null) || (appId == null) || (clusterId == null) + || (userId == null)) { + LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId + + " userId=" + userId + " clusterId=" + clusterId + + " . Not proceeding with writing to hbase"); + return putStatus; + } + + for (TimelineEntity te : data.getEntities()) { + + // a set can have at most 1 null + if (te == null) { + continue; + } + + // if the entity is the application, the destination is the application + // table + boolean isApplication = ApplicationEntity.isApplicationEntity(te); + byte[] rowKey; + if (isApplication) { + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(clusterId, userId, flowName, flowRunId, + appId); + rowKey = applicationRowKey.getRowKey(); + } else { + EntityRowKey entityRowKey = + new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + te.getType(), te.getIdPrefix(), te.getId()); + rowKey = entityRowKey.getRowKey(); + } + + storeInfo(rowKey, te, flowVersion, isApplication); + storeEvents(rowKey, te.getEvents(), isApplication); + storeConfig(rowKey, te.getConfigs(), isApplication); + storeMetrics(rowKey, te.getMetrics(), isApplication); + storeRelations(rowKey, te, isApplication); + + if (isApplication) { + TimelineEvent event = + ApplicationEntity.getApplicationEvent(te, + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + if (event != null) { + onApplicationCreated(flowRunRowKey, clusterId, appId, userId, + flowVersion, te, event.getTimestamp()); + } + // if it's an application entity, store metrics + storeFlowMetricsAppRunning(flowRunRowKey, appId, te); + // if application has finished, store it's finish time and write final + // values of all metrics + event = ApplicationEntity.getApplicationEvent(te, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + if (event != null) { + onApplicationFinished(flowRunRowKey, flowVersion, appId, te, + event.getTimestamp()); + } + } + } + return putStatus; + } + + private void onApplicationCreated(FlowRunRowKey flowRunRowKey, + String clusterId, String appId, String userId, String flowVersion, + TimelineEntity te, long appCreatedTimeStamp) + throws IOException { + + String flowName = flowRunRowKey.getFlowName(); + Long flowRunId = flowRunRowKey.getFlowRunId(); + + // store in App to flow table + AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId); + byte[] rowKey = appToFlowRowKey.getRowKey(); + AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId, + null, flowName); + AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId, + null, flowRunId); + AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null, + userId); + + // store in flow run table + storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); + + // store in flow activity table + byte[] flowActivityRowKeyBytes = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) + .getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, + flowActivityTable, qualifier, null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + /* + * updates the {@link FlowRunTable} with Application Created information + */ + private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); + FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, + te.getCreatedTime(), + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + + /* + * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an + * application has finished + */ + private void onApplicationFinished(FlowRunRowKey flowRunRowKey, + String flowVersion, String appId, TimelineEntity te, + long appFinishedTimeStamp) throws IOException { + // store in flow run table + storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te, + appFinishedTimeStamp); + + // indicate in the flow activity table that the app has finished + byte[] rowKey = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appFinishedTimeStamp, flowRunRowKey.getUserId(), + flowRunRowKey.getFlowName()).getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + /* + * Update the {@link FlowRunTable} with Application Finished information + */ + private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te, long appFinishedTimeStamp) + throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); + Attribute attributeAppId = + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); + FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, + appFinishedTimeStamp, attributeAppId); + + // store the final value of metrics since application has finished + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + storeFlowMetrics(rowKey, metrics, attributeAppId, + AggregationOperation.SUM_FINAL.getAttribute()); + } + } + + /* + * Updates the {@link FlowRunTable} with Application Metrics + */ + private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + byte[] rowKey = flowRunRowKey.getRowKey(); + storeFlowMetrics(rowKey, metrics, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), + AggregationOperation.SUM.getAttribute()); + } + } + + private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, + Attribute... attributes) throws IOException { + for (TimelineMetric metric : metrics) { + byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); + Map<Long, Number> timeseries = metric.getValues(); + for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue(), + attributes); + } + } + } + + private void storeRelations(byte[] rowKey, TimelineEntity te, + boolean isApplication) throws IOException { + if (isApplication) { + storeRelations(rowKey, te.getIsRelatedToEntities(), + ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + ApplicationColumnPrefix.RELATES_TO, applicationTable); + } else { + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO, entityTable); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO, entityTable); + } + } + + /** + * Stores the Relations from the {@linkplain TimelineEntity} object. + */ + private <T> void storeRelations(byte[] rowKey, + Map<String, Set<String>> connectedEntities, + ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) + throws IOException { + for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities + .entrySet()) { + // id3?id4?id5 + String compoundValue = + Separator.VALUES.joinEncoded(connectedEntity.getValue()); + columnPrefix.store(rowKey, table, + stringKeyConverter.encode(connectedEntity.getKey()), null, + compoundValue); + } + } + + /** + * Stores information from the {@linkplain TimelineEntity} object. + */ + private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, + boolean isApplication) throws IOException { + + if (isApplication) { + ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); + ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, + te.getCreatedTime()); + ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, + flowVersion); + Map<String, Object> info = te.getInfo(); + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, + stringKeyConverter.encode(entry.getKey()), null, + entry.getValue()); + } + } + } else { + EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); + EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); + EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, + te.getCreatedTime()); + EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map<String, Object> info = te.getInfo(); + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, + stringKeyConverter.encode(entry.getKey()), null, + entry.getValue()); + } + } + } + } + + /** + * stores the config information from {@linkplain TimelineEntity}. + */ + private void storeConfig(byte[] rowKey, Map<String, String> config, + boolean isApplication) throws IOException { + if (config == null) { + return; + } + for (Map.Entry<String, String> entry : config.entrySet()) { + byte[] configKey = stringKeyConverter.encode(entry.getKey()); + if (isApplication) { + ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, + configKey, null, entry.getValue()); + } else { + EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey, + null, entry.getValue()); + } + } + } + + /** + * stores the {@linkplain TimelineMetric} information from the + * {@linkplain TimelineEvent} object. + */ + private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics, + boolean isApplication) throws IOException { + if (metrics != null) { + for (TimelineMetric metric : metrics) { + byte[] metricColumnQualifier = + stringKeyConverter.encode(metric.getId()); + Map<Long, Number> timeseries = metric.getValues(); + for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + if (isApplication) { + ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } else { + EntityColumnPrefix.METRIC.store(rowKey, entityTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } + } + } + } + } + + /** + * Stores the events from the {@linkplain TimelineEvent} object. + */ + private void storeEvents(byte[] rowKey, Set<TimelineEvent> events, + boolean isApplication) throws IOException { + if (events != null) { + for (TimelineEvent event : events) { + if (event != null) { + String eventId = event.getId(); + if (eventId != null) { + long eventTimestamp = event.getTimestamp(); + // if the timestamp is not set, use the current timestamp + if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { + LOG.warn("timestamp is not set for event " + eventId + + "! Using the current timestamp"); + eventTimestamp = System.currentTimeMillis(); + } + Map<String, Object> eventInfo = event.getInfo(); + if ((eventInfo == null) || (eventInfo.size() == 0)) { + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, null) + .getColumnQualifier(); + if (isApplication) { + ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, + columnQualifierBytes, null, Separator.EMPTY_BYTES); + } else { + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + columnQualifierBytes, null, Separator.EMPTY_BYTES); + } + } else { + for (Map.Entry<String, Object> info : eventInfo.entrySet()) { + // eventId=infoKey + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, info.getKey()) + .getColumnQualifier(); + if (isApplication) { + ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, + columnQualifierBytes, null, info.getValue()); + } else { + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + columnQualifierBytes, null, info.getValue()); + } + } // for info: eventInfo + } + } + } + } // event : events + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage + * .TimelineWriter#aggregate + * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity, + * org.apache + * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack) + */ + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush + * () + */ + @Override + public void flush() throws IOException { + // flush all buffered mutators + entityTable.flush(); + appToFlowTable.flush(); + applicationTable.flush(); + flowRunTable.flush(); + flowActivityTable.flush(); + } + + /** + * close the hbase connections The close APIs perform flushing and release any + * resources held. + */ + @Override + protected void serviceStop() throws Exception { + if (entityTable != null) { + LOG.info("closing the entity table"); + // The close API performs flushing and releases any resources held + entityTable.close(); + } + if (appToFlowTable != null) { + LOG.info("closing the app_flow table"); + // The close API performs flushing and releases any resources held + appToFlowTable.close(); + } + if (applicationTable != null) { + LOG.info("closing the application table"); + applicationTable.close(); + } + if (flowRunTable != null) { + LOG.info("closing the flow run table"); + // The close API performs flushing and releases any resources held + flowRunTable.close(); + } + if (flowActivityTable != null) { + LOG.info("closing the flowActivityTable table"); + // The close API performs flushing and releases any resources held + flowActivityTable.close(); + } + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java new file mode 100644 index 0000000..dd87169 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class TimelineSchemaCreator { + private TimelineSchemaCreator() { + } + + final static String NAME = TimelineSchemaCreator.class.getSimpleName(); + private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); + private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; + private static final String APP_TABLE_NAME_SHORT = "a"; + private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; + private static final String TTL_OPTION_SHORT = "m"; + private static final String ENTITY_TABLE_NAME_SHORT = "e"; + + public static void main(String[] args) throws Exception { + + Configuration hbaseConf = + HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null); + // Grab input args and allow for -Dxyz style arguments + String[] otherArgs = new GenericOptionsParser(hbaseConf, args) + .getRemainingArgs(); + + // Grab the arguments we're looking for. + CommandLine commandLine = parseArgs(otherArgs); + + // Grab the entityTableName argument + String entityTableName + = commandLine.getOptionValue(ENTITY_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(entityTableName)) { + hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName); + } + String entityTableTTLMetrics = commandLine.getOptionValue(TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(entityTableTTLMetrics)) { + int metricsTTL = Integer.parseInt(entityTableTTLMetrics); + new EntityTable().setMetricsTTL(metricsTTL, hbaseConf); + } + // Grab the appToflowTableName argument + String appToflowTableName = commandLine.getOptionValue( + APP_TO_FLOW_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(appToflowTableName)) { + hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName); + } + // Grab the applicationTableName argument + String applicationTableName = commandLine.getOptionValue( + APP_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(applicationTableName)) { + hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME, + applicationTableName); + } + + List<Exception> exceptions = new ArrayList<>(); + try { + boolean skipExisting + = commandLine.hasOption(SKIP_EXISTING_TABLE_OPTION_SHORT); + if (skipExisting) { + LOG.info("Will skip existing tables and continue on htable creation " + + "exceptions!"); + } + createAllTables(hbaseConf, skipExisting); + LOG.info("Successfully created HBase schema. "); + } catch (IOException e) { + LOG.error("Error in creating hbase tables: " + e.getMessage()); + exceptions.add(e); + } + + if (exceptions.size() > 0) { + LOG.warn("Schema creation finished with the following exceptions"); + for (Exception e : exceptions) { + LOG.warn(e.getMessage()); + } + System.exit(-1); + } else { + LOG.info("Schema creation finished successfully"); + } + } + + /** + * Parse command-line arguments. + * + * @param args + * command line arguments passed to program. + * @return parsed command line. + * @throws ParseException + */ + private static CommandLine parseArgs(String[] args) throws ParseException { + Options options = new Options(); + + // Input + Option o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true, + "entity table name"); + o.setArgName("entityTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(TTL_OPTION_SHORT, "metricsTTL", true, + "TTL for metrics column family"); + o.setArgName("metricsTTL"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true, + "app to flow table name"); + o.setArgName("appToflowTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true, + "application table name"); + o.setArgName("applicationTableName"); + o.setRequired(false); + options.addOption(o); + + // Options without an argument + // No need to set arg name since we do not need an argument here + o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", + false, "skip existing Hbase tables and continue to create new tables"); + o.setRequired(false); + options.addOption(o); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + } catch (Exception e) { + LOG.error("ERROR: " + e.getMessage() + "\n"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(NAME + " ", options, true); + System.exit(-1); + } + + return commandLine; + } + + @VisibleForTesting + public static void createAllTables(Configuration hbaseConf, + boolean skipExisting) throws IOException { + + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Cannot create table since admin is null"); + } + try { + new EntityTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new AppToFlowTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new ApplicationTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowRunTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowActivityTable().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + } finally { + if (conn != null) { + conn.close(); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java new file mode 100644 index 0000000..dde3911 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; + +/** + * Identifies fully qualified columns for the {@link ApplicationTable}. + */ +public enum ApplicationColumn implements Column<ApplicationTable> { + + /** + * App id. + */ + ID(ApplicationColumnFamily.INFO, "id"), + + /** + * When the application was created. + */ + CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", + new LongConverter()), + + /** + * The version of the flow that this app belongs to. + */ + FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version"); + + private final ColumnHelper<ApplicationTable> column; + private final ColumnFamily<ApplicationTable> columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, + String columnQualifier) { + this(columnFamily, columnQualifier, GenericConverter.getInstance()); + } + + private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, + String columnQualifier, ValueConverter converter) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.column = new ColumnHelper<ApplicationTable>(columnFamily, converter); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public void store(byte[] rowKey, + TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue, attributes); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + @Override + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes.clone(); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return column.getValueConverter(); + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)}. + * + * @param columnQualifier Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null + */ + public static final ApplicationColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based only on name. + if (ac.getColumnQualifier().equals(columnQualifier)) { + return ac; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param name Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null if both + * arguments don't match. + */ + public static final ApplicationColumn columnFor( + ApplicationColumnFamily columnFamily, String name) { + + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based column family and on name. + if (ac.columnFamily.equals(columnFamily) + && ac.getColumnQualifier().equals(name)) { + return ac; + } + } + + // Default to null + return null; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org