This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit c29159a97646cc0a6019bbf2b3a3620c76da5789 Author: Aravindan Vijayan <avija...@hortonworks.com> AuthorDate: Mon Apr 16 15:10:18 2018 -0700 AMBARI-22740 : Rename ambari metrics collector package to org.apache.ambari.metrics. (Commit 2) --- .../core/timeline/HBaseTimelineMetricsService.java | 9 ++++---- .../core/timeline/TimelineMetricsIgniteCache.java | 26 ++++++++++++++-------- .../aggregators/TimelineMetricAppAggregator.java | 11 +++++---- .../TimelineMetricFilteringHostAggregator.java | 4 +++- .../aggregators/TimelineMetricHostAggregator.java | 4 +++- .../v2/TimelineMetricHostAggregator.java | 4 +++- .../discovery/TimelineMetricMetadataManager.java | 16 ++++++++----- .../core/timeline/query/PhoenixTransactSQL.java | 1 - .../core/timeline/sink/DefaultFSSinkProvider.java | 4 +++- .../core/timeline/sink/KafkaSinkProvider.java | 25 ++++++++++++++------- 10 files changed, 69 insertions(+), 35 deletions(-) diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java index 4165b1e..d21edfc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java @@ -17,6 +17,8 @@ */ package org.apache.ambari.metrics.core.timeline; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; import java.io.IOException; @@ -92,7 +94,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time /** * Construct the service. - * */ public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) { super(HBaseTimelineMetricsService.class.getName()); @@ -159,8 +160,8 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time } } - defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT, "20")); - if (Boolean.parseBoolean(metricsConf.get(TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { + defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20")); + if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { LOG.info("Using group by aggregators for aggregating host and cluster metrics."); } @@ -587,7 +588,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time aggregator.getSleepIntervalMillis(), TimeUnit.MILLISECONDS); LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " + - +aggregator.getSleepIntervalMillis() + " milliseconds."); + + aggregator.getSleepIntervalMillis() + " milliseconds."); } else { LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled."); } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java index a7793c0..0a8dcc5 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java @@ -18,6 +18,14 @@ package org.apache.ambari.metrics.core.timeline; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY; import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getJavaRegexFromSqlRegex; import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices; @@ -84,7 +92,7 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach //TODO add config to disable logging //enable ssl for ignite requests - if (metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) { + if (metricConf.get(TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) { SslContextFactory sslContextFactory = new SslContextFactory(); String keyStorePath = sslConf.get("ssl.server.keystore.location"); String keyStorePassword = sslConf.get("ssl.server.keystore.password"); @@ -100,11 +108,11 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach //aggregation parameters appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation(); - interpolationEnabled = Boolean.parseBoolean(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true")); - cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30)); - Long aggregationInterval = metricConf.getLong(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L); + interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true")); + cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30)); + Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L); - String filteredMetricPatterns = metricConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS); + String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS); if (!StringUtils.isEmpty(filteredMetricPatterns)) { LOG.info("Skipping aggregation for metric patterns : " + filteredMetricPatterns); for (String patternString : filteredMetricPatterns.split(",")) { @@ -113,10 +121,10 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach } } - if (metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) { + if (metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) { TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); - ipFinder.setAddresses(Arrays.asList(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(","))); + ipFinder.setAddresses(Arrays.asList(metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(","))); LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses()); discoverySpi.setIpFinder(ipFinder); igniteConfiguration.setDiscoverySpi(discoverySpi); @@ -143,7 +151,7 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach cacheConfiguration.setName("metrics_cache"); //set cache mode to partitioned with # of backups cacheConfiguration.setCacheMode(CacheMode.PARTITIONED); - cacheConfiguration.setBackups(metricConf.getInt(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1)); + cacheConfiguration.setBackups(metricConf.getInt(TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1)); //disable throttling due to cpu impact cacheConfiguration.setRebalanceThrottle(0); //enable locks @@ -214,7 +222,7 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach putMetricIntoCache(metricDoubleEntry.getKey(), newMetricClusterAggregate); if (hostMetadata != null) { //calculate app host metric - if (metric.getAppId().equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) { + if (metric.getAppId().equalsIgnoreCase(HOST_APP_ID)) { // Candidate metric, update app aggregates if (hostMetadata.containsKey(metric.getHostName())) { updateAppAggregatesFromHostMetric(metricDoubleEntry.getKey(), newMetricClusterAggregate, hostMetadata.get(metric.getHostName())); diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java index 190ad9a..4c62366 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java @@ -36,6 +36,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID; + /** * Aggregator responsible for providing app level host aggregates. This task * is accomplished without doing a round trip to storage, rather @@ -91,7 +94,7 @@ public class TimelineMetricAppAggregator { } // If metric is a host metric and host has apps on it - if (appId.equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) { + if (appId.equalsIgnoreCase(HOST_APP_ID)) { // Candidate metric, update app aggregates if (hostMetadata.containsKey(hostname)) { updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue); @@ -128,7 +131,7 @@ public class TimelineMetricAppAggregator { return; } - TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId()); + TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId()); ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps(); for (String appId : apps.keySet()) { if (appIdsToAggregate.contains(appId)) { @@ -136,7 +139,7 @@ public class TimelineMetricAppAggregator { appKey.setAppId(appId); TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); if (appMetadata == null) { - TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId()); + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId()); TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key); if (hostMetricMetadata != null) { @@ -178,7 +181,7 @@ public class TimelineMetricAppAggregator { } private List<String> getAppIdsForHostAggregation(Configuration metricsConf) { - String appIds = metricsConf.get(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS); + String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS); if (!StringUtils.isEmpty(appIds)) { return Arrays.asList(StringUtils.stripAll(appIds.split(","))); } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java index 371d9fa..b0aec2f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java @@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; + public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricFilteringHostAggregator.class); private TimelineMetricMetadataManager metricMetadataManager; @@ -84,7 +86,7 @@ public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAgg endTime, null, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); // Retaining order of the row-key avoids client side merge sort. condition.addOrderByColumn("UUID"); condition.addOrderByColumn("SERVER_TIME"); diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java index c25d6ce..6f2351b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; + public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); TimelineMetricReadHelper readHelper; @@ -73,7 +75,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { endTime, null, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); // Retaining order of the row-key avoids client side merge sort. condition.addOrderByColumn("UUID"); condition.addOrderByColumn("SERVER_TIME"); diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java index 9e8df6d..f8757a4 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java @@ -31,6 +31,8 @@ import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.conf.Configuration; import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL; + public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { public TimelineMetricHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, @@ -62,7 +64,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { EmptyCondition condition = new EmptyCondition(); condition.setDoUpdate(true); - condition.setStatement(String.format(PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL, + condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL, outputTableName, endTime, tableName, getDownsampledMetricSkipClause(), startTime, endTime)); diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java index 97eb7b1..ff24c10 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataManager.java @@ -50,6 +50,12 @@ import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; import org.apache.ambari.metrics.core.timeline.uuid.HashBasedUuidGenStrategy; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS; + public class TimelineMetricMetadataManager { private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class); private boolean isDisabled = false; @@ -82,7 +88,7 @@ public class TimelineMetricMetadataManager { public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) { this.metricsConf = metricsConf; this.hBaseAccessor = hBaseAccessor; - String patternStrings = metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS); + String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS); if (!StringUtils.isEmpty(patternStrings)) { metricNameFilters.addAll(Arrays.asList(patternStrings.split(","))); } @@ -98,14 +104,14 @@ public class TimelineMetricMetadataManager { * Initialize Metadata from the store */ public void initializeMetadata() { - if (metricsConf.getBoolean(TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT, false)) { + if (metricsConf.getBoolean(DISABLE_METRIC_METADATA_MGMT, false)) { isDisabled = true; } else { metricMetadataSync = new TimelineMetricMetadataSync(this); // Schedule the executor to sync to store executorService.scheduleWithFixedDelay(metricMetadataSync, - metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes - metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes + metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes + metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes TimeUnit.SECONDS); // Read from store and initialize map try { @@ -330,7 +336,7 @@ public class TimelineMetricMetadataManager { * @return */ private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) { - String strategy = configuration.get(TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY, ""); + String strategy = configuration.get(TIMELINE_METRICS_UUID_GEN_STRATEGY, ""); if ("random".equalsIgnoreCase(strategy)) { return new RandomUuidGenStrategy(); } else { diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java index fda6214..819a20e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java @@ -345,7 +345,6 @@ public class PhoenixTransactSQL { "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND " + "SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS"; - public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS"; diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java index 52abc1e..4c0920c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java @@ -31,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; + public class DefaultFSSinkProvider implements ExternalSinkProvider { private static final Log LOG = LogFactory.getLog(DefaultFSSinkProvider.class); TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); @@ -64,7 +66,7 @@ public class DefaultFSSinkProvider implements ExternalSinkProvider { @Override public int getFlushSeconds() { try { - return conf.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); + return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); } catch (Exception e) { LOG.warn("Cannot read cache commit interval."); } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java index 1ce624b..9935d38 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java @@ -35,6 +35,15 @@ import org.apache.kafka.clients.producer.RecordMetadata; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_ACKS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_BATCH_SIZE; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_BUFFER_MEM; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_LINGER_MS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_RETRIES; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_SERVERS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL; + /* This will be used by the single Metrics committer thread. Hence it is important to make this non-blocking export. @@ -54,15 +63,15 @@ public class KafkaSinkProvider implements ExternalSinkProvider { Properties configProperties = new Properties(); try { - configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_SERVERS)); - configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_ACKS, "all")); + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_SERVERS)); + configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_ACKS, "all")); // Avoid duplicates - No transactional semantics - configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_RETRIES, 0)); - configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_BATCH_SIZE, 128)); - configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_LINGER_MS, 1)); - configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(TimelineMetricConfiguration.KAFKA_BUFFER_MEM, 33554432)); // 32 MB - FLUSH_SECONDS = configuration.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); - TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS, 10); + configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(KAFKA_RETRIES, 0)); + configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(KAFKA_BATCH_SIZE, 128)); + configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(KAFKA_LINGER_MS, 1)); + configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(KAFKA_BUFFER_MEM, 33554432)); // 32 MB + FLUSH_SECONDS = configuration.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3); + TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(KAFKA_SINK_TIMEOUT_SECONDS, 10); } catch (Exception e) { LOG.error("Configuration error!", e); throw new ExceptionInInitializerError(e); -- To stop receiving notification emails like this one, please contact avija...@apache.org.