This is an automated email from the ASF dual-hosted git repository.

wuzhiguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git


The following commit(s) were added to refs/heads/master by this push:
     new 85fbf27  AMBARI-25569: Reassess Ambari Metrics data migration (#62)
85fbf27 is described below

commit 85fbf2730e776c29b5f80795fdccdd9ebff3f600
Author: lucasbak <lucas.bakal...@gmail.com>
AuthorDate: Tue Nov 15 15:55:34 2022 +0100

    AMBARI-25569: Reassess Ambari Metrics data migration (#62)
---
 .../upgrade/core/AbstractPhoenixMetricsCopier.java |  84 +++----
 .../upgrade/core/MetricsDataMigrationLauncher.java | 272 ++++++++++++---------
 .../upgrade/core/PhoenixClusterMetricsCopier.java  |  17 +-
 .../upgrade/core/PhoenixHostMetricsCopier.java     |  17 +-
 4 files changed, 207 insertions(+), 183 deletions(-)

diff --git 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
index 3d2002b..f70dc3f 100644
--- 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
+++ 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
@@ -20,9 +20,10 @@ package org.apache.ambari.metrics.core.timeline.upgrade.core;
 import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 
-import java.io.FileWriter;
 import java.io.IOException;
+import java.io.Writer;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -31,16 +32,16 @@ import java.util.Set;
 
 public abstract class AbstractPhoenixMetricsCopier implements Runnable {
   private static final Log LOG = 
LogFactory.getLog(AbstractPhoenixMetricsCopier.class);
-  private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
-  private final Long startTime;
-  protected final FileWriter processedMetricsFile;
+  private static final long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
+  private final long startTime;
+  protected final Writer processedMetricsFile;
   protected String inputTable;
   protected String outputTable;
   protected Set<String> metricNames;
   protected PhoenixHBaseAccessor hBaseAccessor;
 
   public AbstractPhoenixMetricsCopier(String inputTableName, String 
outputTableName, PhoenixHBaseAccessor hBaseAccessor,
-                                      Set<String> metricNames, Long startTime, 
FileWriter outputStream) {
+                                      Set<String> metricNames, long startTime, 
Writer outputStream) {
     this.inputTable = inputTableName;
     this.outputTable = outputTableName;
     this.hBaseAccessor = hBaseAccessor;
@@ -52,7 +53,7 @@ public abstract class AbstractPhoenixMetricsCopier implements 
Runnable {
   @Override
   public void run(){
     LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, 
inputTable, outputTable));
-    long startTimer = System.currentTimeMillis();
+    long timerStart = System.currentTimeMillis();
     String query = String.format("SELECT %s %s FROM %s WHERE %s AND 
SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
       getQueryHint(startTime), getColumnsClause(), inputTable, 
getMetricNamesLikeClause(), startTime);
 
@@ -62,23 +63,20 @@ public abstract class AbstractPhoenixMetricsCopier 
implements Runnable {
       saveMetrics();
     } catch (SQLException e) {
       LOG.error(e);
-    }
-    long estimatedTime = System.currentTimeMillis() - startTimer;
-    LOG.debug(String.format("Copying took %s seconds from table %s to table %s 
for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, 
metricNames));
+    } finally {
+      long timerDelta = System.currentTimeMillis() - timerStart;
+      LOG.debug(String.format("Copying took %s seconds from table %s to table 
%s for metric names %s", timerDelta/ 1000.0, inputTable,
 
-    saveMetricsProgress();
+      saveMetricsProgress();
+    }
   }
 
   private String getMetricNamesLikeClause() {
-    StringBuilder sb = new StringBuilder();
+    StringBuilder sb = new StringBuilder(256);
     sb.append('(');
     int i = 0;
     for (String metricName : metricNames) {
-      sb.append("METRIC_NAME");
-      sb.append(" LIKE ");
-      sb.append("'");
-      sb.append(metricName);
-      sb.append("'");
+      sb.append("METRIC_NAME LIKE '").append(metricName).append("'");
 
       if (i < metricNames.size() - 1) {
           sb.append(" OR ");
@@ -94,32 +92,15 @@ public abstract class AbstractPhoenixMetricsCopier 
implements Runnable {
 
   private void runPhoenixQueryAndAddToResults(String query) {
     LOG.debug(String.format("Running query: %s", query));
-    Connection conn = null;
-    PreparedStatement stmt = null;
-    try {
-      conn = hBaseAccessor.getConnection();
-      stmt = conn.prepareStatement(query);
-      ResultSet rs = stmt.executeQuery();
-      while (rs.next()) {
-        addToResults(rs);
+    try (Connection conn = hBaseAccessor.getConnection();
+         PreparedStatement stmt = conn.prepareStatement(query)) {
+      try (ResultSet rs = stmt.executeQuery()) {
+        while (rs.next()) {
+          addToResults(rs);
+        }
       }
     } catch (SQLException e) {
       LOG.error(String.format("Exception during running phoenix query %s", 
query), e);
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
     }
   }
 
@@ -127,29 +108,34 @@ public abstract class AbstractPhoenixMetricsCopier 
implements Runnable {
    * Saves processed metric names info provided file in format 
TABLE_NAME:METRIC_NAME
    */
   private void saveMetricsProgress() {
-    if (processedMetricsFile == null) {
+    if (this.processedMetricsFile == null) {
       LOG.info("Skipping metrics progress save as the file is null");
       return;
     }
+    
     for (String metricName : metricNames) {
       try {
-        processedMetricsFile.append(inputTable + ":" + metricName + 
System.lineSeparator());
+        synchronized (this.processedMetricsFile) {
+        
this.processedMetricsFile.append(inputTable).append(":").append(metricName).append(System.lineSeparator());
+        }
       } catch (IOException e) {
         LOG.error(e);
       }
     }
   }
 
-  protected String getQueryHint(Long startTime) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("/*+ ");
-    sb.append("NATIVE_TIME_RANGE(");
-    sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY);
-    sb.append(") ");
-    sb.append("*/");
-    return sb.toString();
+  protected String getQueryHint(long startTime) {
+    return new StringBuilder().append("/*+ 
NATIVE_TIME_RANGE(").append(startTime - 
DEFAULT_NATIVE_TIME_RANGE_DELAY).append(") */").toString();
   }
 
+  protected MetricHostAggregate extractMetricHostAggregate(ResultSet rs) 
throws SQLException {
+    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
+    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
+    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
+    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
+    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    return metricHostAggregate;
+  }
   /**
    * Saves aggregated metrics to the Hbase
    * @throws SQLException
diff --git 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
index 3a25aee..0c2f8e6 100644
--- 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
+++ 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
@@ -21,21 +21,29 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
 import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
 import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.FileInputStream;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.Writer;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -44,6 +52,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
@@ -62,14 +71,14 @@ import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.M
 public class MetricsDataMigrationLauncher {
   private static final Log LOG = 
LogFactory.getLog(MetricsDataMigrationLauncher.class);
   private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L;
-  private static String patternPrefix = "._p_";
+  private static final String PATTERN_PREFIX = "._p_";
   private static final int DEFAULT_BATCH_SIZE = 5;
+  private static final String MIGRATE_ALL_METRICS_ARG = "--allmetrics";
   public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = 
new HashMap<>();
   public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new 
HashMap<>();
   public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = 
"/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt";
   public static final int DEFAULT_NUMBER_OF_THREADS = 3;
-  public static final long ONE_MONTH_MILLIS = 2592000000L;
-  public static final long DEFAULT_START_TIME = System.currentTimeMillis() - 
ONE_MONTH_MILLIS; //Last month
+  public static final int DEFAULT_START_DAYS = 30; // 30 Days, Last month
 
   static {
     
CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME,
 METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
@@ -83,127 +92,163 @@ public class MetricsDataMigrationLauncher {
 
   private final Set<Set<String>> metricNamesBatches;
   private final String processedMetricsFilePath;
-  private Set<String> metricNames;
 
-  private Long startTime;
-  private Integer batchSize;
-  private Integer numberOfThreads;
+  private final long startTimeEpoch;
+  private final int numberOfThreads;
   private TimelineMetricConfiguration timelineMetricConfiguration;
   private PhoenixHBaseAccessor hBaseAccessor;
   private TimelineMetricMetadataManager timelineMetricMetadataManager;
   private Map<String, Set<String>> processedMetrics;
 
-  public MetricsDataMigrationLauncher(String whitelistedFilePath, String 
processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer 
batchSize) throws Exception {
-    this.startTime = startTime == null? DEFAULT_START_TIME : startTime;
-    this.numberOfThreads = numberOfThreads == null? DEFAULT_NUMBER_OF_THREADS 
: numberOfThreads;
-    this.batchSize = batchSize == null? DEFAULT_BATCH_SIZE : batchSize;
-    this.processedMetricsFilePath = processedMetricsFilePath == null? 
DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
+  public MetricsDataMigrationLauncher(String whitelistedFilePath, String 
processedMetricsFilePath, Long startDay, Integer numberOfThreads, Integer 
batchSize) throws Exception {
+    this.startTimeEpoch = calculateStartEpochTime(startDay);
+    this.numberOfThreads = (numberOfThreads == null) ? 
DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
+    this.processedMetricsFilePath = (processedMetricsFilePath == null) ? 
DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
 
     initializeHbaseAccessor();
+    readProcessedMetricsMap();
+
+    final Set<String> metricNames = getMetricNames(whitelistedFilePath);
 
-    LOG.info("Looking for whitelisted metric names...");
+    LOG.info("Setting up batches...");
+    if (batchSize == null) batchSize = DEFAULT_BATCH_SIZE;
+    this.metricNamesBatches = new HashSet<>(batchSize);
+
+    Iterables.partition(metricNames, batchSize)
+      .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch)));
+    LOG.info(String.format("Split metric names into %s batches with size of 
%s", metricNamesBatches.size(), batchSize));
+  }
 
-    if (whitelistedFilePath != null) {
-      this.metricNames = readMetricWhitelistFromFile(whitelistedFilePath);
+  private long calculateStartEpochTime(Long startDay) {
+    final long days;
+    if (startDay == null) {
+      LOG.info(String.format("No starting day have been provided, using 
default: %d", DEFAULT_START_DAYS));
+      days = DEFAULT_START_DAYS;
     } else {
-      String whitelistFile = 
timelineMetricConfiguration.getMetricsConf().get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE,
 TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
-      metricNames = readMetricWhitelistFromFile(whitelistFile);
+      LOG.info(String.format("%d days have been provided as migration starting 
day.", startDay));
+      days = startDay;
     }
+    LOG.info(String.format("The last %d days' data will be migrated.", days));
 
-    readProcessedMetricsMap();
+    return LocalDateTime.now().minusDays(days).toEpochSecond(ZoneOffset.UTC);
+  }
 
-    LOG.info("Setting up batches...");
-    this.metricNamesBatches = new HashSet<>();
+  private Set<String> getMetricNames(String whitelistedFilePath) throws 
MalformedURLException, URISyntaxException, SQLException {
+    if(StringUtils.isNotEmpty(whitelistedFilePath) && 
whitelistedFilePath.equalsIgnoreCase(MIGRATE_ALL_METRICS_ARG)) {
+      LOG.info("Migration of all metrics has been requested by the " + 
MIGRATE_ALL_METRICS_ARG + " argument.");
+      LOG.info("Looking for all the metric names in the Metrics Database...");
+      return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream()
+          
.map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
+    }
 
-    Iterables.partition(metricNames, this.batchSize)
-      .forEach(batch -> metricNamesBatches.add(new HashSet<>(batch)));
-    LOG.info(String.format("Split metric names into %s batches with size of 
%s", metricNamesBatches.size(), this.batchSize));
-  }
+    if(StringUtils.isNotEmpty(whitelistedFilePath)) {
+      LOG.info(String.format("Whitelist file %s has been provided.", 
whitelistedFilePath));
+      LOG.info("Looking for whitelisted metric names based on the file 
content...");
+      return readMetricWhitelistFromFile(whitelistedFilePath);
+    }
+
+    final Configuration conf = 
this.timelineMetricConfiguration.getMetricsConf();
+    if 
(Boolean.parseBoolean(conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_ENABLED)))
 {
+      whitelistedFilePath = 
conf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE,
+              
TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
+      LOG.info(String.format("No whitelist file has been provided but Ambari 
Metrics Whitelisting is enabled. " +
+              "Using %s as whitelist file.", whitelistedFilePath));
+      LOG.info("Looking for whitelisted metric names based on the file 
content...");
+      return readMetricWhitelistFromFile(whitelistedFilePath);
+    }
 
+    LOG.info("No whitelist file has been provided and Ambari Metrics 
Whitelisting is disabled.");
+    LOG.info("Looking for all the metric names in the Metrics Database...");
+    return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream()
+        
.map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
+  }
 
   private void readProcessedMetricsMap() {
-    Map<String, Set<String>> result = new HashMap<>();
-    if (!Files.exists(Paths.get(processedMetricsFilePath))) {
-      LOG.info(String.format("The processed metrics file %s is missing, 
assuming there were no metrics processed.", processedMetricsFilePath));
-      this.processedMetrics = new HashMap<>();
-    }
-    LOG.info(String.format("Reading the list of already copied metrics from 
%s", processedMetricsFilePath));
-    try {
-      try (Stream<String> stream = 
Files.lines(Paths.get(processedMetricsFilePath))) {
-        stream.forEach( line -> {
-          String [] lineSplit = line.split(":");
-          if (!result.containsKey(lineSplit[0])) {
-            result.put(lineSplit[0], new 
HashSet<>(Collections.singletonList(lineSplit[1])));
-          } else {
-            result.get(lineSplit[0]).add(lineSplit[1]);
-          }
-        });
+    final Map<String, Set<String>> result = new HashMap<>();
+    final Path path = Paths.get(this.processedMetricsFilePath);
+
+    if (Files.notExists(path)) {
+      LOG.info(String.format("The processed metrics file %s is missing, 
assuming there were no metrics processed.", this.processedMetricsFilePath));
+    } else {
+      LOG.info(String.format("Reading the list of already copied metrics from 
%s", this.processedMetricsFilePath));
+      try {
+        try (Stream<String> stream = Files.lines(path)) {
+          stream.forEach(line -> {
+            String[] lineSplit = line.split(":");
+            if (!result.containsKey(lineSplit[0])) {
+              result.put(lineSplit[0], new 
HashSet<>(Collections.singletonList(lineSplit[1])));
+            } else {
+              result.get(lineSplit[0]).add(lineSplit[1]);
+            }
+          });
+        }
+      } catch (IOException e) {
+        LOG.error(e);
       }
-    } catch (IOException e) {
-      LOG.error(e);
     }
     this.processedMetrics = result;
   }
 
   public void runMigration(Long timeoutInMinutes) throws IOException {
-
-    FileWriter processedMetricsFileWriter = new 
FileWriter(processedMetricsFilePath, true);
-    LOG.info("Setting up copiers...");
-    Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
-    for (Set<String> batch : metricNamesBatches) {
-      for (Map.Entry<String, String> entry : 
CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
-        Set<String> filteredMetrics = filterProcessedMetrics(batch, 
this.processedMetrics, entry.getKey());
-        if (!filteredMetrics.isEmpty()) {
-          copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), 
entry.getValue(), hBaseAccessor,
-            filteredMetrics, startTime, processedMetricsFileWriter));
+    try (Writer processedMetricsFileWriter = new BufferedWriter(new 
FileWriter(this.processedMetricsFilePath, true))) {
+      LOG.info("Setting up copiers...");
+      Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
+      for (Set<String> batch : metricNamesBatches) {
+        for (Map.Entry<String, String> entry : 
CLUSTER_AGGREGATE_TABLES_MAPPING.entrySet()) {
+          Set<String> filteredMetrics = filterProcessedMetrics(batch, 
this.processedMetrics, entry.getKey());
+          if (!filteredMetrics.isEmpty()) {
+            copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), 
entry.getValue(), this.hBaseAccessor,
+                filteredMetrics, this.startTimeEpoch, 
processedMetricsFileWriter));
+          }
         }
-      }
 
-      for (Map.Entry<String, String> entry : 
HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
-        Set<String> filteredMetrics = filterProcessedMetrics(batch, 
processedMetrics, entry.getKey());
-        if (!filteredMetrics.isEmpty()) {
-          copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), 
entry.getValue(), hBaseAccessor,
-            filteredMetrics, startTime, processedMetricsFileWriter));
+        for (Map.Entry<String, String> entry : 
HOST_AGGREGATE_TABLES_MAPPING.entrySet()) {
+          Set<String> filteredMetrics = filterProcessedMetrics(batch, 
this.processedMetrics, entry.getKey());
+          if (!filteredMetrics.isEmpty()) {
+            copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), 
entry.getValue(), this.hBaseAccessor,
+                filteredMetrics, this.startTimeEpoch, 
processedMetricsFileWriter));
+          }
         }
       }
-    }
 
-    if (copiers.isEmpty()) {
-      LOG.info("No copy threads to run, looks like all metrics have been 
copied.");
-      processedMetricsFileWriter.close();
-      return;
-    }
-
-    LOG.info("Running the copy threads...");
-    long startTimer = System.currentTimeMillis();
-    ExecutorService executorService = 
Executors.newFixedThreadPool(numberOfThreads == null ? 
DEFAULT_NUMBER_OF_THREADS : numberOfThreads);
-    for (AbstractPhoenixMetricsCopier copier : copiers) {
-      executorService.submit(copier);
-    }
+      if (copiers.isEmpty()) {
+        LOG.info("No copy threads to run, looks like all metrics have been 
copied.");
+        return;
+      }
 
-    executorService.shutdown();
+      LOG.info("Running the copy threads...");
+      long timerStart = System.currentTimeMillis();
+      ExecutorService executorService = null;
+      try {
+        executorService = Executors.newFixedThreadPool(this.numberOfThreads);
+        for (AbstractPhoenixMetricsCopier copier : copiers) {
+          executorService.submit(copier);
+        }
+      } finally {
+        if (executorService != null) {
+          executorService.shutdown();
+          try {
+            executorService.awaitTermination(timeoutInMinutes, 
TimeUnit.MINUTES);
+          } catch (InterruptedException e) {
+            LOG.error(e);
+          }
+        }
+      }
 
-    try {
-      executorService.awaitTermination(timeoutInMinutes, TimeUnit.MINUTES);
-    } catch (InterruptedException e) {
-      LOG.error(e);
+      long timerDelta = System.currentTimeMillis() - timerStart;
+      LOG.info(String.format("Copying took %s seconds", timerDelta / 1000.0));
     }
-
-    long estimatedTime = System.currentTimeMillis() - startTimer;
-    LOG.info(String.format("Copying took %s seconds", estimatedTime/1000.0));
-
-    processedMetricsFileWriter.close();
   }
 
   private void initializeHbaseAccessor() throws MalformedURLException, 
URISyntaxException {
     this.hBaseAccessor = new PhoenixHBaseAccessor(null);
     this.timelineMetricConfiguration = 
TimelineMetricConfiguration.getInstance();
-    timelineMetricConfiguration.initialize();
+    this.timelineMetricConfiguration.initialize();
 
-    timelineMetricMetadataManager = new 
TimelineMetricMetadataManager(hBaseAccessor);
-    timelineMetricMetadataManager.initializeMetadata(false);
+    this.timelineMetricMetadataManager = new 
TimelineMetricMetadataManager(this.hBaseAccessor);
+    this.timelineMetricMetadataManager.initializeMetadata(false);
 
-    hBaseAccessor.setMetadataInstance(timelineMetricMetadataManager);
+    this.hBaseAccessor.setMetadataInstance(this.timelineMetricMetadataManager);
   }
 
   private static Set<String> filterProcessedMetrics(Set<String> metricNames, 
Map<String, Set<String>> processedMetrics, String tableName) {
@@ -219,21 +264,18 @@ public class MetricsDataMigrationLauncher {
    */
   private static Set<String> readMetricWhitelistFromFile(String whitelistFile) 
{
     LOG.info(String.format("Reading metric names from %s", whitelistFile));
-    Set<String> whitelistedMetrics = new HashSet<>();
+    final Set<String> whitelistedMetrics = new HashSet<>();
 
-    BufferedReader br = null;
     String strLine;
 
-    try(FileInputStream fstream = new FileInputStream(whitelistFile)) {
-      br = new BufferedReader(new InputStreamReader(fstream));
-
+    try(BufferedReader br = new BufferedReader(new InputStreamReader(new 
FileInputStream(whitelistFile)))) {
       while ((strLine = br.readLine()) != null)   {
         strLine = strLine.trim();
         if (StringUtils.isEmpty(strLine)) {
           continue;
         }
-        if (strLine.startsWith(patternPrefix)) {
-          strLine = strLine.replace(patternPrefix, "");
+        if (strLine.startsWith(PATTERN_PREFIX)) {
+          strLine = strLine.replace(PATTERN_PREFIX, "");
         }
         if (strLine.contains("*")) {
           strLine = strLine.replaceAll("\\*", "%");
@@ -248,21 +290,24 @@ public class MetricsDataMigrationLauncher {
 
   private void saveMetadata() throws SQLException {
     LOG.info("Saving metadata to store...");
-    timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables();
-    timelineMetricMetadataManager.forceMetricsMetadataSync();
+    this.timelineMetricMetadataManager.updateMetadataCacheUsingV1Tables();
+    this.timelineMetricMetadataManager.forceMetricsMetadataSync();
     LOG.info("Metadata was saved.");
   }
 
-
   /**
    *
    * @param args
    * REQUIRED args[0] - processedMetricsFilePath - full path to the file where 
processed metric are/will be stored
    *
    * OPTIONAL args[1] - whitelistedFilePath      - full path to the file with 
whitelisted metrics filenames
-   *                                               if not provided the default 
whitelist file location will be used if configured
-   *                                               if not configured - will 
result in error
-   *          args[2] - startTime                - default value is set to the 
last 30 days
+   *                                               if not provided and AMS 
whitelisting is enabled the default whitelist
+   *                                               file location will be used 
if configured
+   *                                               if not provided and AMS 
whitelisting is disabled then no whitelisting
+   *                                               will be used and all the 
metrics will be migrated
+   *                                               if --allmetrics switch is 
provided then all the metrics will be migrated
+   *                                               regardless to other settings
+   *          args[2] - startDay                 - default value is set to the 
last 30 days
    *          args[3] - numberOfThreads          - default value is 3
    *          args[4] - batchSize                - default value is 5
    *          args[5] - timeoutInMinutes         - default value is set to the 
equivalent of 24 hours
@@ -270,7 +315,7 @@ public class MetricsDataMigrationLauncher {
   public static void main(String[] args) {
     String processedMetricsFilePath = null;
     String whitelistedFilePath = null;
-    Long startTime = null;
+    Long startDay = null;
     Integer numberOfThreads = null;
     Integer batchSize = null;
     Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
@@ -282,7 +327,7 @@ public class MetricsDataMigrationLauncher {
       whitelistedFilePath = args[1];
     }
     if (args.length>2) {
-      startTime = Long.valueOf(args[2]);
+      startDay = Long.valueOf(args[2]);
     }
     if (args.length>3) {
       numberOfThreads = Integer.valueOf(args[3]);
@@ -297,30 +342,29 @@ public class MetricsDataMigrationLauncher {
     MetricsDataMigrationLauncher dataMigrationLauncher = null;
     try {
       LOG.info("Initializing system...");
-      dataMigrationLauncher = new 
MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, 
startTime, numberOfThreads, batchSize);
+      dataMigrationLauncher = new 
MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, 
startDay, numberOfThreads, batchSize);
     } catch (Exception e) {
       LOG.error("Exception during system setup, exiting...", e);
       System.exit(1);
     }
 
+    int exitCode = 0;
     try {
-      //Setup shutdown hook for metadata save.
-      MetricsDataMigrationLauncher finalDataMigrationLauncher = 
dataMigrationLauncher;
-      Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-        try {
-          finalDataMigrationLauncher.saveMetadata();
-        } catch (SQLException e) {
-          LOG.error("Exception during metadata saving, exiting...", e);
-        }
-      }));
-
       dataMigrationLauncher.runMigration(timeoutInMinutes);
-    } catch (IOException e) {
+    } catch (Throwable e) {
+      exitCode = 1;
       LOG.error("Exception during data migration, exiting...", e);
-      System.exit(1);
+    } finally {
+      try {
+        dataMigrationLauncher.saveMetadata();
+      } catch (SQLException e) {
+        exitCode = 1;
+        LOG.error("Exception while saving the Metadata, exiting...", e);
+      }
     }
 
-    System.exit(0);
+    if(exitCode == 0) LOG.info("Data migration finished successfully.");
 
+    System.exit(exitCode);
   }
 }
diff --git 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
index ee65f00..0840be8 100644
--- 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
+++ 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 
-import java.io.FileWriter;
+import java.io.Writer;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
@@ -32,9 +32,9 @@ import java.util.Set;
 
 public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier {
   private static final Log LOG = 
LogFactory.getLog(PhoenixClusterMetricsCopier.class);
-  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new 
HashMap<>();
+  private final Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = 
new HashMap<>();
 
-  PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, 
FileWriter processedMetricsFileWriter) {
+  PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, 
Writer processedMetricsFileWriter) {
     super(inputTableName, outputTableName, hBaseAccessor, metricNames, 
startTime, processedMetricsFileWriter);
   }
 
@@ -53,7 +53,7 @@ public class PhoenixClusterMetricsCopier extends 
AbstractPhoenixMetricsCopier {
   @Override
   protected void saveMetrics() throws SQLException {
     LOG.debug(String.format("Saving %s results read from %s into %s", 
aggregateMap.size(), inputTable, outputTable));
-    hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, outputTable);
+    this.hBaseAccessor.saveClusterAggregateRecordsSecond(aggregateMap, 
outputTable);
   }
 
   @Override
@@ -62,13 +62,10 @@ public class PhoenixClusterMetricsCopier extends 
AbstractPhoenixMetricsCopier {
             rs.getString("METRIC_NAME"), rs.getString("APP_ID"),
             rs.getString("INSTANCE_ID"), rs.getLong("SERVER_TIME"));
 
-    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
-    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
-    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
-    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
-    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs);
 
-    aggregateMap.put(timelineMetric, metricHostAggregate);
+    this.aggregateMap.put(timelineMetric, metricHostAggregate);
+   }
 
   }
 }
diff --git 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
index a4f0c23..5964a3a 100644
--- 
a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
+++ 
b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
@@ -23,7 +23,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
-import java.io.FileWriter;
+
+import java.io.Writer;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
@@ -32,9 +33,9 @@ import java.util.Set;
 
 public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier {
   private static final Log LOG = 
LogFactory.getLog(PhoenixHostMetricsCopier.class);
-  private Map<TimelineMetric, MetricHostAggregate> aggregateMap = new 
HashMap<>();
+  private final Map<TimelineMetric, MetricHostAggregate> aggregateMap = new 
HashMap<>();
 
-  PhoenixHostMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, 
FileWriter processedMetricsFileWriter) {
+  PhoenixHostMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, 
Writer processedMetricsFileWriter) {
     super(inputTableName, outputTableName, hBaseAccessor, metricNames, 
startTime, processedMetricsFileWriter);
   }
 
@@ -54,7 +55,7 @@ public class PhoenixHostMetricsCopier extends 
AbstractPhoenixMetricsCopier {
   @Override
   protected void saveMetrics() throws SQLException {
     LOG.debug(String.format("Saving %s results read from %s into %s", 
aggregateMap.size(), inputTable, outputTable));
-    hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable);
+    this.hBaseAccessor.saveHostAggregateRecords(aggregateMap, outputTable);
   }
 
   @Override
@@ -66,12 +67,8 @@ public class PhoenixHostMetricsCopier extends 
AbstractPhoenixMetricsCopier {
     timelineMetric.setInstanceId(rs.getString("INSTANCE_ID"));
     timelineMetric.setStartTime(rs.getLong("SERVER_TIME"));
 
-    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
-    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
-    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
-    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
-    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
+    MetricHostAggregate metricHostAggregate = extractMetricHostAggregate(rs);
 
-    aggregateMap.put(timelineMetric, metricHostAggregate);
+    this.aggregateMap.put(timelineMetric, metricHostAggregate);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@ambari.apache.org
For additional commands, e-mail: commits-h...@ambari.apache.org


Reply via email to