Repository: ambari Updated Branches: refs/heads/branch-2.2 16907f6e5 -> 33f600f00
AMBARI-15361. Fix ordering of Alter table calls which could result in Region Close issue. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/33f600f0 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/33f600f0 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/33f600f0 Branch: refs/heads/branch-2.2 Commit: 33f600f008266d97a94596a5cc1a9691dc35192f Parents: 16907f6 Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Wed Mar 9 13:42:05 2016 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Wed Mar 9 13:42:05 2016 -0800 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 7 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 134 ++++++++++++------- 2 files changed, 94 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/33f600f0/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index f460292..465fe95 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -80,12 +80,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin Configuration metricsConf) { if (!isInitialized) { hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); + // Initialize schema hBaseAccessor.initMetricSchema(); // Initialize metadata from store metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf); metricMetadataManager.initializeMetadata(); - + // Initialize policies before TTL update hBaseAccessor.initPolicies(); + // Alter TTL on tables + hBaseAccessor.alterMetricTableTTL(); if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { LOG.info("Using group by aggregators for aggregating host and cluster metrics."); @@ -96,7 +99,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager); scheduleAggregatorThread(secondClusterAggregator, metricsConf); -// // Start the minute cluster aggregator + // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); scheduleAggregatorThread(minuteClusterAggregator, metricsConf); http://git-wip-us.apache.org/repos/asf/ambari/blob/33f600f0/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 4afb722..48be4ee 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -148,6 +148,15 @@ public class PhoenixHBaseAccessor { static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles"; + private final String precisionTtl; + private final String hostMinTtl; + private final String hostHourTtl; + private final String hostDailyTtl; + private final String clusterSecTtl; + private final String clusterMinTtl; + private final String clusterHourTtl; + private final String clusterDailyTtl; + public PhoenixHBaseAccessor(Configuration hbaseConf, Configuration metricsConf){ this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf)); @@ -171,6 +180,15 @@ public class PhoenixHBaseAccessor { (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5))); this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE); + + precisionTtl = getDaysInSeconds(metricsConf.get(PRECISION_TABLE_TTL, "1")); //1 day + hostMinTtl = getDaysInSeconds(metricsConf.get(HOST_MINUTE_TABLE_TTL, "7")); //7 days + hostHourTtl = getDaysInSeconds(metricsConf.get(HOST_HOUR_TABLE_TTL, "30")); //30 days + hostDailyTtl = getDaysInSeconds(metricsConf.get(HOST_DAILY_TABLE_TTL, "365")); //1 year + clusterSecTtl = getDaysInSeconds(metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "7")); //7 days + clusterMinTtl = getDaysInSeconds(metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "30")); //30 days + clusterHourTtl = getDaysInSeconds(metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "365")); //1 year + clusterDailyTtl = getDaysInSeconds(metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "730")); //2 years } private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) @@ -234,20 +252,80 @@ public class PhoenixHBaseAccessor { return dataSource.getHBaseAdmin(); } + /** + * Set TTL on tables based on user settings + */ + protected void alterMetricTableTTL() { + Connection conn = null; + Statement stmt = null; + + try { + LOG.info("Initializing metrics schema..."); + conn = getConnectionRetryingOnException(); + stmt = conn.createStatement(); + + //alter TTL options to update tables + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_RECORD_TABLE_NAME, + precisionTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_AGGREGATE_MINUTE_TABLE_NAME, + hostMinTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_AGGREGATE_HOURLY_TABLE_NAME, + hostHourTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_AGGREGATE_DAILY_TABLE_NAME, + hostDailyTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME, + clusterSecTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, + clusterMinTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, + clusterHourTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, + clusterDailyTtl)); + + conn.commit(); + + + } catch (InterruptedException e) { + LOG.warn("Error updating TTL on tables.", e); + } catch (SQLException sql) { + if (sql.getErrorCode() == SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode()) { + LOG.warn("Update TTL on tables is unsupported by the phoenix version. " + sql.getMessage()); + } else { + LOG.warn("Error updating TTL on tables.", sql); + } + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + // Ignore + } + } + } + } + protected void initMetricSchema() { Connection conn = null; Statement stmt = null; String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); - String precisionTtl = getDaysInSeconds(metricsConf.get(PRECISION_TABLE_TTL, "1")); //1 day - String hostMinTtl = getDaysInSeconds(metricsConf.get(HOST_MINUTE_TABLE_TTL, "7")); //7 days - String hostHourTtl = getDaysInSeconds(metricsConf.get(HOST_HOUR_TABLE_TTL, "30")); //30 days - String hostDailyTtl = getDaysInSeconds(metricsConf.get(HOST_DAILY_TABLE_TTL, "365")); //1 year - String clusterSecTtl = getDaysInSeconds(metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "7")); //7 days - String clusterMinTtl = getDaysInSeconds(metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "30")); //30 days - String clusterHourTtl = getDaysInSeconds(metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "365")); //1 year - String clusterDailyTtl = getDaysInSeconds(metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "730")); //2 years + try { LOG.info("Initializing metrics schema..."); @@ -292,48 +370,14 @@ public class PhoenixHBaseAccessor { stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression)); - //alter TTL options to update tables - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_RECORD_TABLE_NAME, - precisionTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_MINUTE_TABLE_NAME, - hostMinTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_HOURLY_TABLE_NAME, - hostHourTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_AGGREGATE_DAILY_TABLE_NAME, - hostDailyTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME, - clusterSecTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, - clusterMinTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, - clusterHourTtl)); - stmt.executeUpdate(String.format(ALTER_SQL, - METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, - clusterDailyTtl)); conn.commit(); LOG.info("Metrics schema initialized."); - } catch (SQLException sql) { - if (sql.getErrorCode() == - SQLExceptionCode.SET_UNSUPPORTED_PROP_ON_ALTER_TABLE.getErrorCode()) { - LOG.warn("Cannot update TTL on tables. " + sql.getMessage()); - } else { - LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql); - throw new MetricsSystemInitializationException( - "Error creating Metrics Schema in HBase using Phoenix.", sql); - } - } catch (InterruptedException e) { - LOG.error("Error creating Metrics Schema in HBase using Phoenix.", e); + } catch (SQLException | InterruptedException sql) { + LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql); throw new MetricsSystemInitializationException( - "Error creating Metrics Schema in HBase using Phoenix.", e); + "Error creating Metrics Schema in HBase using Phoenix.", sql); } finally { if (stmt != null) { try {