This is an automated email from the ASF dual-hosted git repository. wuzhiguo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push: new 85fbf27 AMBARI-25569: Reassess Ambari Metrics data migration (#62) 85fbf27 is described below commit 85fbf2730e776c29b5f80795fdccdd9ebff3f600 Author: lucasbak <lucas.bakal...@gmail.com> AuthorDate: Tue Nov 15 15:55:34 2022 +0100 AMBARI-25569: Reassess Ambari Metrics data migration (#62) --- .../upgrade/core/AbstractPhoenixMetricsCopier.java | 84 +++---- .../upgrade/core/MetricsDataMigrationLauncher.java | 272 ++++++++++++--------- .../upgrade/core/PhoenixClusterMetricsCopier.java | 17 +- .../upgrade/core/PhoenixHostMetricsCopier.java | 17 +- 4 files changed, 207 insertions(+), 183 deletions(-) diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java index 3d2002b..f70dc3f 100644 --- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java +++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java @@ -20,9 +20,10 @@ package org.apache.ambari.metrics.core.timeline.upgrade.core; import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import java.io.FileWriter; import java.io.IOException; +import java.io.Writer; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -31,16 +32,16 @@ import java.util.Set; public abstract class AbstractPhoenixMetricsCopier implements Runnable { private static final Log LOG = LogFactory.getLog(AbstractPhoenixMetricsCopier.class); - private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L; - private final Long startTime; - protected final FileWriter processedMetricsFile; + private static final long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L; + private final long startTime; + protected final Writer processedMetricsFile; protected String inputTable; protected String outputTable; protected Set<String> metricNames; protected PhoenixHBaseAccessor hBaseAccessor; public AbstractPhoenixMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, - Set<String> metricNames, Long startTime, FileWriter outputStream) { + Set<String> metricNames, long startTime, Writer outputStream) { this.inputTable = inputTableName; this.outputTable = outputTableName; this.hBaseAccessor = hBaseAccessor; @@ -52,7 +53,7 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { @Override public void run(){ LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, inputTable, outputTable)); - long startTimer = System.currentTimeMillis(); + long timerStart = System.currentTimeMillis(); String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME", getQueryHint(startTime), getColumnsClause(), inputTable, getMetricNamesLikeClause(), startTime); @@ -62,23 +63,20 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { saveMetrics(); } catch (SQLException e) { LOG.error(e); - } - long estimatedTime = System.currentTimeMillis() - startTimer; - LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, metricNames)); + } finally { + long timerDelta = System.currentTimeMillis() - timerStart; + LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", timerDelta/ 1000.0, inputTable, - saveMetricsProgress(); + saveMetricsProgress(); + } } private String getMetricNamesLikeClause() { - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(256); sb.append('('); int i = 0; for (String metricName : metricNames) { - sb.append("METRIC_NAME"); - sb.append(" LIKE "); - sb.append("'"); - sb.append(metricName); - sb.append("'"); + sb.append("METRIC_NAME LIKE '").append(metricName).append("'"); if (i < metricNames.size() - 1) { sb.append(" OR "); @@ -94,32 +92,15 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { private void runPhoenixQueryAndAddToResults(String query) { LOG.debug(String.format("Running query: %s", query)); - Connection conn = null; - PreparedStatement stmt = null; - try { - conn = hBaseAccessor.getConnection(); - stmt = conn.prepareStatement(query); - ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - addToResults(rs); + try (Connection conn = hBaseAccessor.getConnection(); + PreparedStatement stmt = conn.prepareStatement(query)) { + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + addToResults(rs); + } } } catch (SQLException e) { LOG.error(String.format("Exception during running phoenix query %s", query), e); - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - // Ignore - } - } } } @@ -127,29 +108,34 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { * Saves processed metric names info provided file in format TABLE_NAME:METRIC_NAME */ private void saveMetricsProgress() { - if (processedMetricsFile == null) { + if (this.processedMetricsFile == null) { LOG.info("Skipping metrics progress save as the file is null"); return; } + for (String metricName : metricNames) { try { - processedMetricsFile.append(inputTable + ":" + metricName + System.lineSeparator()); + synchronized (this.processedMetricsFile) { + this.processedMetricsFile.append(inputTable).append(":").append(metricName).append(System.lineSeparator()); + } } catch (IOException e) { LOG.error(e); } } } - protected String getQueryHint(Long startTime) { - StringBuilder sb = new StringBuilder(); - sb.append("/*+ "); - sb.append("NATIVE_TIME_RANGE("); - sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY); - sb.append(") "); - sb.append("*/"); - return sb.toString(); + protected String getQueryHint(long startTime) { + return new StringBuilder().append("/*+ NATIVE_TIME_RANGE(").append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY).append(") */").toString(); } + protected MetricHostAggregate extractMetricHostAggregate(ResultSet rs) throws SQLException { + MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); + metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); + metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); + metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); + metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + return metricHostAggregate; + } /** * Saves aggregated metrics to the Hbase * @throws SQLException diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java index 3a25aee..0c2f8e6 100644 --- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java +++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java @@ -21,21 +21,29 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.Writer; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +52,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; @@ -62,14 +71,14 @@ import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.M public class MetricsDataMigrationLauncher { private static final Log LOG = LogFactory.getLog(MetricsDataMigrationLauncher.class); private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L; - private static String patternPrefix = "._p_"; + private static final String PATTERN_PREFIX = "._p_"; private static final int DEFAULT_BATCH_SIZE = 5; + private static final String MIGRATE_ALL_METRICS_ARG = "--allmetrics"; public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<>(); public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<>(); public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = "/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt"; public static final int DEFAULT_NUMBER_OF_THREADS = 3; - public static final long ONE_MONTH_MILLIS = 2592000000L; - public static final long DEFAULT_START_TIME = System.currentTimeMillis() - ONE_MONTH_MILLIS; //Last month + public static final int DEFAULT_START_DAYS = 30; // 30 Days, Last month static { CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME); @@ -83,127 +92,163 @@ public class MetricsDataMigrationLauncher { private final Set<Set<String>> metricNamesBatches; private final String processedMetricsFilePath; - private Set<String> metricNames; - private Long startTime; - private Integer batchSize; - private Integer numberOfThreads; + private final long startTimeEpoch; + private final int numberOfThreads; private TimelineMetricConfiguration timelineMetricConfiguration; private PhoenixHBaseAccessor hBaseAccessor; private TimelineMetricMetadataManager timelineMetricMetadataManager; private Map<String, Set<String>> processedMetrics; - public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception { - this.startTime = startTime == null? DEFAULT_START_TIME : startTime; - this.numberOfThreads = numberOfThreads == null? DEFAULT_NUMBER_OF_THREADS : numberOfThreads; - this.batchSize = batchSize == null? DEFAULT_BATCH_SIZE : batchSize; - this.processedMetricsFilePath = processedMetricsFilePath == null? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath; + public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startDay, Integer numberOfThreads, Integer batchSize) throws Exception { + this.startTimeEpoch = calculateStartEpochTime(startDay); + this.numberOfThreads = (numberOfThreads == null) ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads; + this.processedMetricsFilePath = (processedMetricsFilePath == null) ? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath; initializeHbaseAccessor(); + readProcessedMetricsMap(); + + final Set<String> metricNames = getMetricNames(whitelistedFilePath); - LOG.info("Looking for whitelisted metric names..."); + LOG.info("Setting up batches..."); + if (batchSize == null) batchSize = DEFAULT_BATCH_SIZE; + this.metricNamesBatches = new HashSet<>(batchSize); + + Iterables.partition(metricNames, batchSize) + .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch))); + LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), batchSize)); + } - if (whitelistedFilePath != null) { - this.metricNames = readMetricWhitelistFromFile(whitelistedFilePath); + private long calculateStartEpochTime(Long startDay) { + final long days; + if (startDay == null) { + LOG.info(String.format("No starting day have been provided, using default: %d", DEFAULT_START_DAYS)); + days = DEFAULT_START_DAYS; } else { - String whitelistFile = timelineMetricConfiguration.getMetricsConf().get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT); - metricNames = readMetricWhitelistFromFile(whitelistFile); + LOG.info(String.format("%d days have been provided as migration starting day.", startDay)); + days = startDay; } + LOG.info(String.format("The last %d days' data will be migrated.", days)); - readProcessedMetricsMap(); + return LocalDateTime.now().minusDays(days).toEpochSecond(ZoneOffset.UTC); + } - LOG.info("Setting up batches..."); - this.metricNamesBatches = new HashSet<>(); + private Set<String> getMetricNames(String whitelistedFilePath) throws MalformedURLException, URISyntaxException, SQLException { + if(StringUtils.isNotEmpty(whitelistedFilePath) && whitelistedFilePath.equalsIgnoreCase(MIGRATE_ALL_METRICS_ARG)) { + LOG.info("Migration of all metrics has been requested by the " + MIGRATE_ALL_METRICS_ARG + " argument."); + LOG.info("Looking for all the metric names in the Metrics Database..."); + return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream() + .map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet()); + } - Iterables.partition(metricNames, this.batchSize) - .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch))); - LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), this.batchSize)); - } + if(StringUtils.isNotEmpty(whitelistedFilePath)) { + LOG.info(String.format("Whitelist file %s has been provided.", whitelistedFilePath)); + LOG.info("Looking for whitelisted metric names based on the file content..."); + return readMetricWhitelistFromFile(whitelistedFilePath); + } + + final Configuration conf = this.timelineMetricConfiguration.getMetricsConf(); + if (Boolean.parseBoolean(conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_ENABLED))) { + whitelistedFilePath = conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, + TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT); + LOG.info(String.format("No whitelist file has been provided but Ambari Metrics Whitelisting is enabled. " + + "Using %s as whitelist file.", whitelistedFilePath)); + LOG.info("Looking for whitelisted metric names based on the file content..."); + return readMetricWhitelistFromFile(whitelistedFilePath); + } + LOG.info("No whitelist file has been provided and Ambari Metrics Whitelisting is disabled."); + LOG.info("Looking for all the metric names in the Metrics Database..."); + return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream() + .map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet()); + } private void readProcessedMetricsMap() { - Map<String, Set<String>> result = new HashMap<>(); - if (!Files.exists(Paths.get(processedMetricsFilePath))) { - LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", processedMetricsFilePath)); - this.processedMetrics = new HashMap<>(); - } - LOG.info(String.format("Reading the list of already copied metrics from %s", processedMetricsFilePath)); - try { - try (Stream<String> stream = Files.lines(Paths.get(processedMetricsFilePath))) { - stream.forEach( line -> { - String [] lineSplit = line.split(":"); - if (!result.containsKey(lineSplit[0])) { - result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1]))); - } else { - result.get(lineSplit[0]).add(lineSplit[1]); - } - }); + final Map<String, Set<String>> result = new HashMap<>(); + final Path path = Paths.get(this.processedMetricsFilePath); + + if (Files.notExists(path)) { + LOG.info(String.format("The processed metrics file %s is missing, assuming there were no metrics processed.", this.processedMetricsFilePath)); + } else { + LOG.info(String.format("Reading the list of already copied metrics from %s", this.processedMetricsFilePath)); + try { + try (Stream<String> stream = Files.lines(path)) { + stream.forEach(line -> { + String[] lineSplit = line.split(":"); + if (!result.containsKey(lineSplit[0])) { + result.put(lineSplit[0], new HashSet<>(Collections.singletonList(lineSplit[1]))); + } else { + result.get(lineSplit[0]).add(lineSplit[1]); + } + }); + } + } catch (IOException e) { + LOG.error(e); } - } catch (IOException e) { - LOG.error(e); } this.processedMetrics = result; } public void runMigration(Long timeoutInMinutes) throws IOException { - - FileWriter processedMetricsFileWriter = new FileWriter(processedMetricsFilePath, true); - LOG.info("Setting up copiers..."); - Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>(); - for (Set<String> batch : metricNamesBatches) { - for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) { - Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); - if (!filteredMetrics.isEmpty()) { - copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor, - filteredMetrics, startTime, processedMetricsFileWriter)); + try (Writer processedMetricsFileWriter = new BufferedWriter(new FileWriter(this.processedMetricsFilePath, true))) { + LOG.info("Setting up copiers..."); + Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>(); + for (Set<String> batch : metricNamesBatches) { + for (Map.Entry<String, String> entry : CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) { + Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); + if (!filteredMetrics.isEmpty()) { + copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, + filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter)); + } } - } - for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) { - Set<String> filteredMetrics = filterProcessedMetrics(batch, processedMetrics, entry.getKey()); - if (!filteredMetrics.isEmpty()) { - copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), hBaseAccessor, - filteredMetrics, startTime, processedMetricsFileWriter)); + for (Map.Entry<String, String> entry : HOST_AGGREGATE_TABLES_MAPPING.entrySet()) { + Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); + if (!filteredMetrics.isEmpty()) { + copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, + filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter)); + } } } - } - if (copiers.isEmpty()) { - LOG.info("No copy threads to run, looks like all metrics have been copied."); - processedMetricsFileWriter.close(); - return; - } - - LOG.info("Running the copy threads..."); - long startTimer = System.currentTimeMillis(); - ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads == null ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads); - for (AbstractPhoenixMetricsCopier copier : copiers) { - executorService.submit(copier); - } + if (copiers.isEmpty()) { + LOG.info("No copy threads to run, looks like all metrics have been copied."); + return; + } - executorService.shutdown(); + LOG.info("Running the copy threads..."); + long timerStart = System.currentTimeMillis(); + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(this.numberOfThreads); + for (AbstractPhoenixMetricsCopier copier : copiers) { + executorService.submit(copier); + } + } finally { + if (executorService != null) { + executorService.shutdown(); + try { + executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOG.error(e); + } + } + } - try { - executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOG.error(e); + long timerDelta = System.currentTimeMillis() - timerStart; + LOG.info(String.format("Copying took %s seconds", timerDelta / 1000.0)); } - - long estimatedTime = System.currentTimeMillis() - startTimer; - LOG.info(String.format("Copying took %s seconds", estimatedTime/1000.0)); - - processedMetricsFileWriter.close(); } private void initializeHbaseAccessor() throws MalformedURLException, URISyntaxException { this.hBaseAccessor = new PhoenixHBaseAccessor(null); this.timelineMetricConfiguration = TimelineMetricConfiguration.getInstance(); - timelineMetricConfiguration.initialize(); + this.timelineMetricConfiguration.initialize(); - timelineMetricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor); - timelineMetricMetadataManager.initializeMetadata(false); + this.timelineMetricMetadataManager = new TimelineMetricMetadataManager(this.hBaseAccessor); + this.timelineMetricMetadataManager.initializeMetadata(false); - hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager); + this.hBaseAccessor.setMetadataInstance(this.timelineMetricMetadataManager); } private static Set<String> filterProcessedMetrics(Set<String> metricNames, Map<String, Set<String>> processedMetrics, String tableName) { @@ -219,21 +264,18 @@ public class MetricsDataMigrationLauncher { */ private static Set<String> readMetricWhitelistFromFile(String whitelistFile) { LOG.info(String.format("Reading metric names from %s", whitelistFile)); - Set<String> whitelistedMetrics = new HashSet<>(); + final Set<String> whitelistedMetrics = new HashSet<>(); - BufferedReader br = null; String strLine; - try(FileInputStream fstream = new FileInputStream(whitelistFile)) { - br = new BufferedReader(new InputStreamReader(fstream)); - + try(BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(whitelistFile)))) { while ((strLine = br.readLine()) != null) { strLine = strLine.trim(); if (StringUtils.isEmpty(strLine)) { continue; } - if (strLine.startsWith(patternPrefix)) { - strLine = strLine.replace(patternPrefix, ""); + if (strLine.startsWith(PATTERN_PREFIX)) { + strLine = strLine.replace(PATTERN_PREFIX, ""); } if (strLine.contains("*")) { strLine = strLine.replaceAll("\\*", "%"); @@ -248,21 +290,24 @@ public class MetricsDataMigrationLauncher { private void saveMetadata() throws SQLException { LOG.info("Saving metadata to store..."); - timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables(); - timelineMetricMetadataManager.forceMetricsMetadataSync(); + this.timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables(); + this.timelineMetricMetadataManager.forceMetricsMetadataSync(); LOG.info("Metadata was saved."); } - /** * * @param args * REQUIRED args[0] - processedMetricsFilePath - full path to the file where processed metric are/will be stored * * OPTIONAL args[1] - whitelistedFilePath - full path to the file with whitelisted metrics filenames - * if not provided the default whitelist file location will be used if configured - * if not configured - will result in error - * args[2] - startTime - default value is set to the last 30 days + * if not provided and AMS whitelisting is enabled the default whitelist + * file location will be used if configured + * if not provided and AMS whitelisting is disabled then no whitelisting + * will be used and all the metrics will be migrated + * if --allmetrics switch is provided then all the metrics will be migrated + * regardless to other settings + * args[2] - startDay - default value is set to the last 30 days * args[3] - numberOfThreads - default value is 3 * args[4] - batchSize - default value is 5 * args[5] - timeoutInMinutes - default value is set to the equivalent of 24 hours @@ -270,7 +315,7 @@ public class MetricsDataMigrationLauncher { public static void main(String[] args) { String processedMetricsFilePath = null; String whitelistedFilePath = null; - Long startTime = null; + Long startDay = null; Integer numberOfThreads = null; Integer batchSize = null; Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES; @@ -282,7 +327,7 @@ public class MetricsDataMigrationLauncher { whitelistedFilePath = args[1]; } if (args.length>2) { - startTime = Long.valueOf(args[2]); + startDay = Long.valueOf(args[2]); } if (args.length>3) { numberOfThreads = Integer.valueOf(args[3]); @@ -297,30 +342,29 @@ public class MetricsDataMigrationLauncher { MetricsDataMigrationLauncher dataMigrationLauncher = null; try { LOG.info("Initializing system..."); - dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startTime, numberOfThreads, batchSize); + dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startDay, numberOfThreads, batchSize); } catch (Exception e) { LOG.error("Exception during system setup, exiting...", e); System.exit(1); } + int exitCode = 0; try { - //Setup shutdown hook for metadata save. - MetricsDataMigrationLauncher finalDataMigrationLauncher = dataMigrationLauncher; - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - finalDataMigrationLauncher.saveMetadata(); - } catch (SQLException e) { - LOG.error("Exception during metadata saving, exiting...", e); - } - })); - dataMigrationLauncher.runMigration(timeoutInMinutes); - } catch (IOException e) { + } catch (Throwable e) { + exitCode = 1; LOG.error("Exception during data migration, exiting...", e); - System.exit(1); + } finally { + try { + dataMigrationLauncher.saveMetadata(); + } catch (SQLException e) { + exitCode = 1; + LOG.error("Exception while saving the Metadata, exiting...", e); + } } - System.exit(0); + if(exitCode == 0) LOG.info("Data migration finished successfully."); + System.exit(exitCode); } } diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java index ee65f00..0840be8 100644 --- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java +++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import java.io.FileWriter; +import java.io.Writer; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; @@ -32,9 +32,9 @@ import java.util.Set; public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier { private static final Log LOG = LogFactory.getLog(PhoenixClusterMetricsCopier.class); - private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); + private final Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); - PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) { + PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, Writer processedMetricsFileWriter) { super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter); } @@ -53,7 +53,7 @@ public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier { @Override protected void saveMetrics() throws SQLException { LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable)); - hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable); + this.hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable); } @Override @@ -62,13 +62,10 @@ public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier { rs.getString("METRIC_NAME"), rs.getString("APP_ID"), rs.getString("INSTANCE_ID"), rs.getLong("SERVER_TIME")); - MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); - metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); - metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); - metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); - metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs); - aggregateMap.put(timelineMetric, metricHostAggregate); + this.aggregateMap.put(timelineMetric, metricHostAggregate); + } } } diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java index a4f0c23..5964a3a 100644 --- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java +++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java @@ -23,7 +23,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import java.io.FileWriter; + +import java.io.Writer; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; @@ -32,9 +33,9 @@ import java.util.Set; public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier { private static final Log LOG = LogFactory.getLog(PhoenixHostMetricsCopier.class); - private Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); + private final Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); - PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) { + PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, Writer processedMetricsFileWriter) { super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter); } @@ -54,7 +55,7 @@ public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier { @Override protected void saveMetrics() throws SQLException { LOG.debug(String.format("Saving %s results read from %s into %s", aggregateMap.size(), inputTable, outputTable)); - hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable); + this.hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable); } @Override @@ -66,12 +67,8 @@ public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier { timelineMetric.setInstanceId(rs.getString("INSTANCE_ID")); timelineMetric.setStartTime(rs.getLong("SERVER_TIME")); - MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); - metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); - metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); - metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); - metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs); - aggregateMap.put(timelineMetric, metricHostAggregate); + this.aggregateMap.put(timelineMetric, metricHostAggregate); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@ambari.apache.org For additional commands, e-mail: commits-h...@ambari.apache.org