YARN-5976. Update hbase version to 1.2. Contributed by Vrushali C. (cherry picked from commit f945008d1cf5730bdebeae501ed0e42477ad219e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9bcfbf5e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9bcfbf5e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9bcfbf5e Branch: refs/heads/YARN-5355-branch-2 Commit: 9bcfbf5ec172715c652d7dfe4900fd73631139bb Parents: cf8e3a8 Author: Sangjin Lee <sj...@apache.org> Authored: Wed Dec 21 09:53:07 2016 -0800 Committer: Sangjin Lee <sj...@apache.org> Committed: Wed Dec 21 10:04:03 2016 -0800 ---------------------------------------------------------------------- LICENSE.txt | 8 +- hadoop-project/pom.xml | 26 +- .../pom.xml | 142 +------- ...TestPhoenixOfflineAggregationWriterImpl.java | 161 --------- .../hadoop-yarn-server-timelineservice/pom.xml | 26 +- .../PhoenixOfflineAggregationWriterImpl.java | 358 ------------------- .../storage/TimelineSchemaCreator.java | 22 -- 7 files changed, 18 insertions(+), 725 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/LICENSE.txt ---------------------------------------------------------------------- diff --git a/LICENSE.txt b/LICENSE.txt index ee5d528..fee4ae4 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1542,12 +1542,6 @@ JLine 0.9.94 leveldbjni-all 1.8 Hamcrest Core 1.3 xmlenc Library 0.52 -StringTemplate 4 4.0.7 -ANTLR 3 Tool 3.5 -ANTLR 3 Runtime 3.5 -ANTLR StringTemplate 3.2.1 -ASM All 5.0.2 -sqlline 1.1.8 -------------------------------------------------------------------------------- Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: @@ -1778,7 +1772,7 @@ the Licensor and You. The binary distribution of this product bundles these dependencies under the following license: -jamon-runtime 2.3.1 +jamon-runtime 2.4.1 -------------------------------------------------------------------------------- MOZILLA PUBLIC LICENSE Version 1.1 http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 771d3e4..6c6de28 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -50,8 +50,7 @@ <kafka.version>0.8.2.1</kafka.version> - <hbase.version>1.1.3</hbase.version> - <phoenix.version>4.7.0-HBase-1.1</phoenix.version> + <hbase.version>1.2.4</hbase.version> <hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version> <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version> @@ -1095,29 +1094,6 @@ <classifier>tests</classifier> </dependency> <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - <version>${phoenix.version}</version> - <exclusions> - <!-- Exclude jline from here --> - <exclusion> - <artifactId>jline</artifactId> - <groupId>jline</groupId> - </exclusion> - <exclusion> - <artifactId>joda-time</artifactId> - <groupId>joda-time</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - <type>test-jar</type> - <version>${phoenix.version}</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-it</artifactId> <version>${hbase.version}</version> http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index c627112..ed014de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -56,10 +56,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> - <exclusion> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - </exclusion> </exclusions> </dependency> @@ -76,6 +72,8 @@ </exclusions> </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this direct + dependency --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> @@ -111,20 +109,6 @@ </exclusions> </dependency> - <!-- 'mvn dependency:analyze' fails to detect use of this direct - dependency --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-common</artifactId> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> @@ -145,14 +129,14 @@ <dependency> <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> + <artifactId>jersey-client</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - <scope>test</scope> + <groupId>javax.ws.rs</groupId> + <artifactId>jsr311-api</artifactId> + <version>1.1.1</version> </dependency> <dependency> @@ -225,23 +209,6 @@ <dependency> <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <classifier>tests</classifier> <scope>test</scope> @@ -276,99 +243,6 @@ <!-- 'mvn dependency:analyze' fails to detect use of this direct dependency --> <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-it</artifactId> - <scope>test</scope> - <classifier>tests</classifier> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>net.sourceforge.findbugs</groupId> - <artifactId>annotations</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - <type>test-jar</type> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>net.sourceforge.findbugs</groupId> - <artifactId>annotations</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- for runtime dependencies --> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hbase-compatible-hadoop.version}</version> @@ -382,6 +256,8 @@ </exclusions> </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this direct + dependency --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> @@ -389,6 +265,8 @@ <scope>test</scope> </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this direct + dependency --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java deleted file mode 100644 index e34ae90..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.phoenix.hbase.index.write.IndexWriterUtils; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.ReadOnlyProps; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - -public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest { - private static PhoenixOfflineAggregationWriterImpl storage; - private static final int BATCH_SIZE = 3; - - @BeforeClass - public static void setup() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - storage = setupPhoenixClusterAndWriterForTest(conf); - } - - @Test(timeout = 90000) - public void testFlowLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION); - } - - @Test(timeout = 90000) - public void testUserLevelAggregationStorage() throws Exception { - testAggregator(OfflineAggregationInfo.USER_AGGREGATION); - } - - @AfterClass - public static void cleanup() throws Exception { - storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME); - storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME); - tearDownMiniCluster(); - } - - private static PhoenixOfflineAggregationWriterImpl - setupPhoenixClusterAndWriterForTest(YarnConfiguration conf) - throws Exception { - Map<String, String> props = new HashMap<>(); - // Must update config before starting server - props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - Boolean.FALSE.toString()); - props.put("java.security.krb5.realm", ""); - props.put("java.security.krb5.kdc", ""); - props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, - Boolean.FALSE.toString()); - props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); - props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); - // Make a small batch size to test multiple calls to reserve sequences - props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, - Long.toString(BATCH_SIZE)); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - - // Change connection settings for test - conf.set( - YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, - getUrl()); - PhoenixOfflineAggregationWriterImpl - myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES); - myWriter.init(conf); - myWriter.start(); - myWriter.createPhoenixTables(); - return myWriter; - } - - private static TimelineEntity getTestAggregationTimelineEntity() { - TimelineEntity entity = new TimelineEntity(); - String id = "hello1"; - String type = "testAggregationType"; - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(1425016501000L); - - TimelineMetric metric = new TimelineMetric(); - metric.setId("HDFS_BYTES_READ"); - metric.addValue(1425016501100L, 8000); - entity.addMetric(metric); - - return entity; - } - - private void testAggregator(OfflineAggregationInfo aggregationInfo) - throws Exception { - // Set up a list of timeline entities and write them back to Phoenix - int numEntity = 1; - TimelineEntities te = new TimelineEntities(); - te.addEntity(getTestAggregationTimelineEntity()); - TimelineCollectorContext context = new TimelineCollectorContext("cluster_1", - "user1", "testFlow", null, 0L, null); - storage.writeAggregatedEntity(context, te, - aggregationInfo); - - // Verify if we're storing all entities - String[] primaryKeyList = aggregationInfo.getPrimaryKeyList(); - String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1] - +") FROM " + aggregationInfo.getTableName(); - verifySQLWithCount(sql, numEntity, "Number of entities should be "); - // Check metric - sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM " - + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) "; - verifySQLWithCount(sql, numEntity, - "Number of entities with info should be "); - } - - - private void verifySQLWithCount(String sql, int targetCount, String message) - throws Exception { - try ( - Statement stmt = - storage.getConnection().createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - assertTrue("Result set empty on statement " + sql, rs.next()); - assertNotNull("Fail to execute query " + sql, rs); - assertEquals(message + " " + targetCount, targetCount, rs.getInt(1)); - } catch (SQLException se) { - fail("SQL exception on query: " + sql - + " With exception message: " + se.getLocalizedMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 1eca09f..7a5e38a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -93,11 +93,6 @@ <dependency> <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - </dependency> - - <dependency> - <groupId>com.sun.jersey</groupId> <artifactId>jersey-client</artifactId> </dependency> @@ -132,6 +127,12 @@ </dependency> <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>jsr311-api</artifactId> + <version>1.1.1</version> + </dependency> + + <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <exclusions> @@ -176,21 +177,6 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>net.sourceforge.findbugs</groupId> - <artifactId>annotations</artifactId> - </exclusion> - </exclusions> - </dependency> - <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java deleted file mode 100644 index 130cb6c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java +++ /dev/null @@ -1,358 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.phoenix.util.PropertiesUtil; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -/** - * Offline aggregation Phoenix storage. This storage currently consists of two - * aggregation tables, one for flow level aggregation and one for user level - * aggregation. - * - * Example table record: - * - * <pre> - * |---------------------------| - * | Primary | Column Family| - * | key | metrics | - * |---------------------------| - * | row_key | metricId1: | - * | | metricValue1 | - * | | @timestamp1 | - * | | | - * | | metriciD1: | - * | | metricValue2 | - * | | @timestamp2 | - * | | | - * | | metricId2: | - * | | metricValue1 | - * | | @timestamp2 | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * | | | - * |---------------------------| - * </pre> - * - * For the flow aggregation table, the primary key contains user, cluster, and - * flow id. For user aggregation table,the primary key is user. - * - * Metrics column family stores all aggregated metrics for each record. - */ -@Private -@Unstable -public class PhoenixOfflineAggregationWriterImpl - extends OfflineAggregationWriter { - - private static final Log LOG - = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class); - private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER - = "timeline_cf_placeholder"; - - /** Default Phoenix JDBC driver name. */ - private static final String DRIVER_CLASS_NAME - = "org.apache.phoenix.jdbc.PhoenixDriver"; - - /** Default Phoenix timeline config column family. */ - private static final String METRIC_COLUMN_FAMILY = "m."; - /** Default Phoenix timeline info column family. */ - private static final String INFO_COLUMN_FAMILY = "i."; - /** Default separator for Phoenix storage. */ - private static final String AGGREGATION_STORAGE_SEPARATOR = ";"; - - /** Connection string to the deployed Phoenix cluster. */ - private String connString = null; - private Properties connProperties = new Properties(); - - public PhoenixOfflineAggregationWriterImpl(Properties prop) { - super(PhoenixOfflineAggregationWriterImpl.class.getName()); - connProperties = PropertiesUtil.deepCopy(prop); - } - - public PhoenixOfflineAggregationWriterImpl() { - super(PhoenixOfflineAggregationWriterImpl.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - Class.forName(DRIVER_CLASS_NAME); - // so check it here and only read in the config if it's not overridden. - connString = - conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR, - YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT); - super.init(conf); - } - - @Override - public TimelineWriteResponse writeAggregatedEntity( - TimelineCollectorContext context, TimelineEntities entities, - OfflineAggregationInfo info) throws IOException { - TimelineWriteResponse response = new TimelineWriteResponse(); - String sql = "UPSERT INTO " + info.getTableName() - + " (" + StringUtils.join(info.getPrimaryKeyList(), ",") - + ", created_time, metric_names) " - + "VALUES (" - + StringUtils.repeat("?,", info.getPrimaryKeyList().length) - + "?, ?)"; - if (LOG.isDebugEnabled()) { - LOG.debug("TimelineEntity write SQL: " + sql); - } - - try (Connection conn = getConnection(); - PreparedStatement ps = conn.prepareStatement(sql)) { - for (TimelineEntity entity : entities.getEntities()) { - HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>(); - if (entity.getMetrics() != null) { - for (TimelineMetric m : entity.getMetrics()) { - formattedMetrics.put(m.getId(), m); - } - } - int idx = info.setStringsForPrimaryKey(ps, context, null, 1); - ps.setLong(idx++, entity.getCreatedTime()); - ps.setString(idx++, - StringUtils.join(formattedMetrics.keySet().toArray(), - AGGREGATION_STORAGE_SEPARATOR)); - ps.execute(); - - storeEntityVariableLengthFields(entity, formattedMetrics, context, conn, - info); - - conn.commit(); - } - } catch (SQLException se) { - LOG.error("Failed to add entity to Phoenix " + se.getMessage()); - throw new IOException(se); - } catch (Exception e) { - LOG.error("Exception on getting connection: " + e.getMessage()); - throw new IOException(e); - } - return response; - } - - /** - * Create Phoenix tables for offline aggregation storage if the tables do not - * exist. - * - * @throws IOException if any problem happens while creating Phoenix tables. - */ - public void createPhoenixTables() throws IOException { - // Create tables if necessary - try (Connection conn = getConnection(); - Statement stmt = conn.createStatement()) { - // Table schema defined as in YARN-3817. - String sql = "CREATE TABLE IF NOT EXISTS " - + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME - + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " - + "flow_name VARCHAR NOT NULL, " - + "created_time UNSIGNED_LONG, " - + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER - + " VARBINARY, " - + "metric_names VARCHAR, info_keys VARCHAR " - + "CONSTRAINT pk PRIMARY KEY(" - + "user, cluster, flow_name))"; - stmt.executeUpdate(sql); - sql = "CREATE TABLE IF NOT EXISTS " - + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME - + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, " - + "created_time UNSIGNED_LONG, " - + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER - + " VARBINARY, " - + "metric_names VARCHAR, info_keys VARCHAR " - + "CONSTRAINT pk PRIMARY KEY(user, cluster))"; - stmt.executeUpdate(sql); - conn.commit(); - } catch (SQLException se) { - LOG.error("Failed in init data " + se.getLocalizedMessage()); - throw new IOException(se); - } - return; - } - - // Utility functions - @Private - @VisibleForTesting - Connection getConnection() throws IOException { - Connection conn; - try { - conn = DriverManager.getConnection(connString, connProperties); - conn.setAutoCommit(false); - } catch (SQLException se) { - LOG.error("Failed to connect to phoenix server! " - + se.getLocalizedMessage()); - throw new IOException(se); - } - return conn; - } - - // WARNING: This method will permanently drop a table! - @Private - @VisibleForTesting - void dropTable(String tableName) throws Exception { - try (Connection conn = getConnection(); - Statement stmt = conn.createStatement()) { - String sql = "DROP TABLE " + tableName; - stmt.executeUpdate(sql); - } catch (SQLException se) { - LOG.error("Failed in dropping entity table " + se.getLocalizedMessage()); - throw se; - } - } - - private static class DynamicColumns<K> { - static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY"; - static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR"; - private String columnFamilyPrefix; - private String type; - private Set<K> columns; - - public DynamicColumns(String columnFamilyPrefix, String type, - Set<K> keyValues) { - this.columnFamilyPrefix = columnFamilyPrefix; - this.columns = keyValues; - this.type = type; - } - } - - private static <K> StringBuilder appendColumnsSQL( - StringBuilder colNames, DynamicColumns<K> cfInfo) { - // Prepare the sql template by iterating through all keys - for (K key : cfInfo.columns) { - colNames.append(",").append(cfInfo.columnFamilyPrefix) - .append(key.toString()).append(cfInfo.type); - } - return colNames; - } - - private static <K, V> int setValuesForColumnFamily( - PreparedStatement ps, Map<K, V> keyValues, int startPos, - boolean converToBytes) throws SQLException { - int idx = startPos; - for (Map.Entry<K, V> entry : keyValues.entrySet()) { - V value = entry.getValue(); - if (value instanceof Collection) { - ps.setString(idx++, StringUtils.join( - (Collection) value, AGGREGATION_STORAGE_SEPARATOR)); - } else { - if (converToBytes) { - try { - ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue())); - } catch (IOException ie) { - LOG.error("Exception in converting values into bytes " - + ie.getMessage()); - throw new SQLException(ie); - } - } else { - ps.setString(idx++, value.toString()); - } - } - } - return idx; - } - - private static <K, V> int setBytesForColumnFamily( - PreparedStatement ps, Map<K, V> keyValues, int startPos) - throws SQLException { - return setValuesForColumnFamily(ps, keyValues, startPos, true); - } - - private static <K, V> int setStringsForColumnFamily( - PreparedStatement ps, Map<K, V> keyValues, int startPos) - throws SQLException { - return setValuesForColumnFamily(ps, keyValues, startPos, false); - } - - private static void storeEntityVariableLengthFields(TimelineEntity entity, - Map<String, TimelineMetric> formattedMetrics, - TimelineCollectorContext context, Connection conn, - OfflineAggregationInfo aggregationInfo) throws SQLException { - int numPlaceholders = 0; - StringBuilder columnDefs = new StringBuilder( - StringUtils.join(aggregationInfo.getPrimaryKeyList(), ",")); - if (formattedMetrics != null && formattedMetrics.size() > 0) { - appendColumnsSQL(columnDefs, new DynamicColumns<>( - METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES, - formattedMetrics.keySet())); - numPlaceholders += formattedMetrics.keySet().size(); - } - if (numPlaceholders == 0) { - return; - } - StringBuilder placeholders = new StringBuilder(); - placeholders.append( - StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length)); - // numPlaceholders >= 1 now - placeholders.append("?") - .append(StringUtils.repeat(",?", numPlaceholders - 1)); - String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ") - .append(aggregationInfo.getTableName()).append(" (").append(columnDefs) - .append(") VALUES(").append(placeholders).append(")").toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL statement for variable length fields: " - + sqlVariableLengthFields); - } - // Use try with resource statement for the prepared statement - try (PreparedStatement psVariableLengthFields = - conn.prepareStatement(sqlVariableLengthFields)) { - int idx = aggregationInfo.setStringsForPrimaryKey( - psVariableLengthFields, context, null, 1); - if (formattedMetrics != null && formattedMetrics.size() > 0) { - idx = setBytesForColumnFamily( - psVariableLengthFields, formattedMetrics, idx); - } - psVariableLengthFields.execute(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bcfbf5e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index fa0d479..dd87169 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -59,7 +59,6 @@ public final class TimelineSchemaCreator { final static String NAME = TimelineSchemaCreator.class.getSimpleName(); private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); - private static final String PHOENIX_OPTION_SHORT = "p"; private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; private static final String APP_TABLE_NAME_SHORT = "a"; private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; @@ -117,22 +116,6 @@ public final class TimelineSchemaCreator { exceptions.add(e); } - // Create Phoenix data schema if needed - if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) { - Configuration phoenixConf = new Configuration(); - try { - PhoenixOfflineAggregationWriterImpl phoenixWriter = - new PhoenixOfflineAggregationWriterImpl(); - phoenixWriter.init(phoenixConf); - phoenixWriter.start(); - phoenixWriter.createPhoenixTables(); - phoenixWriter.stop(); - LOG.info("Successfully created Phoenix offline aggregation schema. "); - } catch (IOException e) { - LOG.error("Error in creating phoenix tables: " + e.getMessage()); - exceptions.add(e); - } - } if (exceptions.size() > 0) { LOG.warn("Schema creation finished with the following exceptions"); for (Exception e : exceptions) { @@ -182,11 +165,6 @@ public final class TimelineSchemaCreator { // Options without an argument // No need to set arg name since we do not need an argument here - o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false, - "create Phoenix offline aggregation tables"); - o.setRequired(false); - options.addOption(o); - o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", false, "skip existing Hbase tables and continue to create new tables"); o.setRequired(false); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org