This is an automated email from the ASF dual-hosted git repository.

payert pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new a4990d9  AMBARI-25569 Reassess Ambari Metrics data migration - 2nd 
part (#3254)
a4990d9 is described below

commit a4990d9431643b82d1ec35ae1cd0c0739b5f7035
Author: Tamas Payer <35402259+pay...@users.noreply.github.com>
AuthorDate: Fri Dec 4 11:21:43 2020 +0100

    AMBARI-25569 Reassess Ambari Metrics data migration - 2nd part (#3254)
    
    * Introduce --allmetrics to enforce migration of all metrics regardless 
other settings.
    
    Due to the suboptimal argumenet handling if one wants to define define 
argument that is behind the 'whitelist file'
    argument - like the 'starttime' - the whitelist file cannot be left empty, 
so the --allmetrics argument can be provided instead.
    
    Change-Id: I1df6eb7ecdddb412c08c7cc48781da2679d2b75d
    
    * Improve performance by reducing synchronization granularity and 
leveraging BufferedWriter
    
    Change-Id: I26dced203a49e69e428ec7802fc3fb7dd7a68baf
    
    * Fix erroneous startTime handling
    
    When external starttime was specified it was not subtracted from the actual 
time.
    
    Change-Id: I52815fc04548ed9cf11505f311f0f893db9bf352
    
    * Adding log message about the migration time frame.
    
    Change-Id: Ibdfd747239ffe2153ec4293b68513a417aee8cb2
    
    * Fix review change requests
    
    Change-Id: I714d3f98fbade93ba69e8ef0dd75e630a744710a
    
    Co-authored-by: Tamas Payer <tpa...@cloudera.com>
---
 .../upgrade/core/AbstractPhoenixMetricsCopier.java | 48 +++++++----------
 .../upgrade/core/MetricsDataMigrationLauncher.java | 62 ++++++++++++++++------
 .../upgrade/core/PhoenixClusterMetricsCopier.java  |  4 +-
 .../upgrade/core/PhoenixHostMetricsCopier.java     |  5 +-
 4 files changed, 69 insertions(+), 50 deletions(-)

diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
index d69f28a..8d075fe 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java
@@ -22,8 +22,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 
-import java.io.FileWriter;
 import java.io.IOException;
+import java.io.Writer;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -32,16 +32,16 @@ import java.util.Set;
 
 public abstract class AbstractPhoenixMetricsCopier implements Runnable {
   private static final Log LOG = 
LogFactory.getLog(AbstractPhoenixMetricsCopier.class);
-  private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
-  private final Long startTime;
-  protected final FileWriter processedMetricsFile;
+  private static final long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L;
+  private final long startTime;
+  protected final Writer processedMetricsFile;
   protected String inputTable;
   protected String outputTable;
   protected Set<String> metricNames;
   protected PhoenixHBaseAccessor hBaseAccessor;
 
   public AbstractPhoenixMetricsCopier(String inputTableName, String 
outputTableName, PhoenixHBaseAccessor hBaseAccessor,
-                                      Set<String> metricNames, Long startTime, 
FileWriter outputStream) {
+                                      Set<String> metricNames, long startTime, 
Writer outputStream) {
     this.inputTable = inputTableName;
     this.outputTable = outputTableName;
     this.hBaseAccessor = hBaseAccessor;
@@ -53,8 +53,8 @@ public abstract class AbstractPhoenixMetricsCopier implements 
Runnable {
   @Override
   public void run(){
     LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, 
inputTable, outputTable));
-    final long startTimer = System.currentTimeMillis();
-    final String query = String.format("SELECT %s %s FROM %s WHERE %s AND 
SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
+    long timerStart = System.currentTimeMillis();
+    String query = String.format("SELECT %s %s FROM %s WHERE %s AND 
SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME",
       getQueryHint(startTime), getColumnsClause(), inputTable, 
getMetricNamesLikeClause(), startTime);
 
     runPhoenixQueryAndAddToResults(query);
@@ -64,24 +64,19 @@ public abstract class AbstractPhoenixMetricsCopier 
implements Runnable {
     } catch (SQLException e) {
       LOG.error(e);
     } finally {
-      final long estimatedTime = System.currentTimeMillis() - startTimer;
-      LOG.debug(String.format("Copying took %s seconds from table %s to table 
%s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, 
metricNames));
+      long timerDelta = System.currentTimeMillis() - timerStart;
+      LOG.debug(String.format("Copying took %s seconds from table %s to table 
%s for metric names %s", timerDelta/ 1000.0, inputTable, outputTable, 
metricNames));
 
       saveMetricsProgress();
     }
   }
 
   private String getMetricNamesLikeClause() {
-    StringBuilder sb = new StringBuilder();
+    StringBuilder sb = new StringBuilder(256);
     sb.append('(');
     int i = 0;
     for (String metricName : metricNames) {
-      sb.append("METRIC_NAME");
-      sb.append(" LIKE ");
-      sb.append("'");
-      sb.append(metricName);
-      sb.append("'");
-
+      sb.append("METRIC_NAME LIKE '").append(metricName).append("'");
       if (i < metricNames.size() - 1) {
           sb.append(" OR ");
         }
@@ -116,25 +111,20 @@ public abstract class AbstractPhoenixMetricsCopier 
implements Runnable {
       LOG.info("Skipping metrics progress save as the file is null");
       return;
     }
-    synchronized (this.processedMetricsFile) {
-      for (String metricName : metricNames) {
-        try {
+
+    for (String metricName : metricNames) {
+      try {
+        synchronized (this.processedMetricsFile) {
           
this.processedMetricsFile.append(inputTable).append(":").append(metricName).append(System.lineSeparator());
-        } catch (IOException e) {
-          LOG.error(e);
         }
+      } catch (IOException e) {
+        LOG.error(e);
       }
     }
   }
 
-  protected String getQueryHint(Long startTime) {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("/*+ ");
-    sb.append("NATIVE_TIME_RANGE(");
-    sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY);
-    sb.append(") ");
-    sb.append("*/");
-    return sb.toString();
+  protected String getQueryHint(long startTime) {
+    return new StringBuilder().append("/*+ 
NATIVE_TIME_RANGE(").append(startTime - 
DEFAULT_NATIVE_TIME_RANGE_DELAY).append(") */").toString();
   }
 
   protected MetricHostAggregate extractMetricHostAggregate(ResultSet rs) 
throws SQLException {
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
index 889158f..0c2f8e6 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java
@@ -28,17 +28,22 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 
+
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.FileInputStream;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.Writer;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -68,12 +73,12 @@ public class MetricsDataMigrationLauncher {
   private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L;
   private static final String PATTERN_PREFIX = "._p_";
   private static final int DEFAULT_BATCH_SIZE = 5;
+  private static final String MIGRATE_ALL_METRICS_ARG = "--allmetrics";
   public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = 
new HashMap<>();
   public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new 
HashMap<>();
   public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = 
"/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt";
   public static final int DEFAULT_NUMBER_OF_THREADS = 3;
-  public static final long ONE_MONTH_MILLIS = 2592000000L;
-  public static final long DEFAULT_START_TIME = System.currentTimeMillis() - 
ONE_MONTH_MILLIS; //Last month
+  public static final int DEFAULT_START_DAYS = 30; // 30 Days, Last month
 
   static {
     
CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME,
 METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
@@ -88,15 +93,15 @@ public class MetricsDataMigrationLauncher {
   private final Set<Set<String>> metricNamesBatches;
   private final String processedMetricsFilePath;
 
-  private final Long startTime;
-  private final Integer numberOfThreads;
+  private final long startTimeEpoch;
+  private final int numberOfThreads;
   private TimelineMetricConfiguration timelineMetricConfiguration;
   private PhoenixHBaseAccessor hBaseAccessor;
   private TimelineMetricMetadataManager timelineMetricMetadataManager;
   private Map<String, Set<String>> processedMetrics;
 
-  public MetricsDataMigrationLauncher(String whitelistedFilePath, String 
processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer 
batchSize) throws Exception {
-    this.startTime = (startTime == null) ? DEFAULT_START_TIME : startTime;
+  public MetricsDataMigrationLauncher(String whitelistedFilePath, String 
processedMetricsFilePath, Long startDay, Integer numberOfThreads, Integer 
batchSize) throws Exception {
+    this.startTimeEpoch = calculateStartEpochTime(startDay);
     this.numberOfThreads = (numberOfThreads == null) ? 
DEFAULT_NUMBER_OF_THREADS : numberOfThreads;
     this.processedMetricsFilePath = (processedMetricsFilePath == null) ? 
DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath;
 
@@ -114,8 +119,29 @@ public class MetricsDataMigrationLauncher {
     LOG.info(String.format("Split metric names into %s batches with size of 
%s", metricNamesBatches.size(), batchSize));
   }
 
+  private long calculateStartEpochTime(Long startDay) {
+    final long days;
+    if (startDay == null) {
+      LOG.info(String.format("No starting day have been provided, using 
default: %d", DEFAULT_START_DAYS));
+      days = DEFAULT_START_DAYS;
+    } else {
+      LOG.info(String.format("%d days have been provided as migration starting 
day.", startDay));
+      days = startDay;
+    }
+    LOG.info(String.format("The last %d days' data will be migrated.", days));
+
+    return LocalDateTime.now().minusDays(days).toEpochSecond(ZoneOffset.UTC);
+  }
+
   private Set<String> getMetricNames(String whitelistedFilePath) throws 
MalformedURLException, URISyntaxException, SQLException {
-    if(whitelistedFilePath != null) {
+    if(StringUtils.isNotEmpty(whitelistedFilePath) && 
whitelistedFilePath.equalsIgnoreCase(MIGRATE_ALL_METRICS_ARG)) {
+      LOG.info("Migration of all metrics has been requested by the " + 
MIGRATE_ALL_METRICS_ARG + " argument.");
+      LOG.info("Looking for all the metric names in the Metrics Database...");
+      return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream()
+          
.map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet());
+    }
+
+    if(StringUtils.isNotEmpty(whitelistedFilePath)) {
       LOG.info(String.format("Whitelist file %s has been provided.", 
whitelistedFilePath));
       LOG.info("Looking for whitelisted metric names based on the file 
content...");
       return readMetricWhitelistFromFile(whitelistedFilePath);
@@ -164,7 +190,7 @@ public class MetricsDataMigrationLauncher {
   }
 
   public void runMigration(Long timeoutInMinutes) throws IOException {
-    try (FileWriter processedMetricsFileWriter = new 
FileWriter(this.processedMetricsFilePath, true)) {
+    try (Writer processedMetricsFileWriter = new BufferedWriter(new 
FileWriter(this.processedMetricsFilePath, true))) {
       LOG.info("Setting up copiers...");
       Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>();
       for (Set<String> batch : metricNamesBatches) {
@@ -172,7 +198,7 @@ public class MetricsDataMigrationLauncher {
           Set<String> filteredMetrics = filterProcessedMetrics(batch, 
this.processedMetrics, entry.getKey());
           if (!filteredMetrics.isEmpty()) {
             copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), 
entry.getValue(), this.hBaseAccessor,
-                filteredMetrics, this.startTime, processedMetricsFileWriter));
+                filteredMetrics, this.startTimeEpoch, 
processedMetricsFileWriter));
           }
         }
 
@@ -180,7 +206,7 @@ public class MetricsDataMigrationLauncher {
           Set<String> filteredMetrics = filterProcessedMetrics(batch, 
this.processedMetrics, entry.getKey());
           if (!filteredMetrics.isEmpty()) {
             copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), 
entry.getValue(), this.hBaseAccessor,
-                filteredMetrics, this.startTime, processedMetricsFileWriter));
+                filteredMetrics, this.startTimeEpoch, 
processedMetricsFileWriter));
           }
         }
       }
@@ -191,7 +217,7 @@ public class MetricsDataMigrationLauncher {
       }
 
       LOG.info("Running the copy threads...");
-      long startTimer = System.currentTimeMillis();
+      long timerStart = System.currentTimeMillis();
       ExecutorService executorService = null;
       try {
         executorService = Executors.newFixedThreadPool(this.numberOfThreads);
@@ -209,8 +235,8 @@ public class MetricsDataMigrationLauncher {
         }
       }
 
-      long estimatedTime = System.currentTimeMillis() - startTimer;
-      LOG.info(String.format("Copying took %s seconds", estimatedTime / 
1000.0));
+      long timerDelta = System.currentTimeMillis() - timerStart;
+      LOG.info(String.format("Copying took %s seconds", timerDelta / 1000.0));
     }
   }
 
@@ -279,7 +305,9 @@ public class MetricsDataMigrationLauncher {
    *                                               file location will be used 
if configured
    *                                               if not provided and AMS 
whitelisting is disabled then no whitelisting
    *                                               will be used and all the 
metrics will be migrated
-   *          args[2] - startTime                - default value is set to the 
last 30 days
+   *                                               if --allmetrics switch is 
provided then all the metrics will be migrated
+   *                                               regardless to other settings
+   *          args[2] - startDay                 - default value is set to the 
last 30 days
    *          args[3] - numberOfThreads          - default value is 3
    *          args[4] - batchSize                - default value is 5
    *          args[5] - timeoutInMinutes         - default value is set to the 
equivalent of 24 hours
@@ -287,7 +315,7 @@ public class MetricsDataMigrationLauncher {
   public static void main(String[] args) {
     String processedMetricsFilePath = null;
     String whitelistedFilePath = null;
-    Long startTime = null;
+    Long startDay = null;
     Integer numberOfThreads = null;
     Integer batchSize = null;
     Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES;
@@ -299,7 +327,7 @@ public class MetricsDataMigrationLauncher {
       whitelistedFilePath = args[1];
     }
     if (args.length>2) {
-      startTime = Long.valueOf(args[2]);
+      startDay = Long.valueOf(args[2]);
     }
     if (args.length>3) {
       numberOfThreads = Integer.valueOf(args[3]);
@@ -314,7 +342,7 @@ public class MetricsDataMigrationLauncher {
     MetricsDataMigrationLauncher dataMigrationLauncher = null;
     try {
       LOG.info("Initializing system...");
-      dataMigrationLauncher = new 
MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, 
startTime, numberOfThreads, batchSize);
+      dataMigrationLauncher = new 
MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, 
startDay, numberOfThreads, batchSize);
     } catch (Exception e) {
       LOG.error("Exception during system setup, exiting...", e);
       System.exit(1);
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
index 037b3d2..177d202 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 
-import java.io.FileWriter;
+import java.io.Writer;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
@@ -34,7 +34,7 @@ public class PhoenixClusterMetricsCopier extends 
AbstractPhoenixMetricsCopier {
   private static final Log LOG = 
LogFactory.getLog(PhoenixClusterMetricsCopier.class);
   private final Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = 
new HashMap<>();
 
-  PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, 
FileWriter processedMetricsFileWriter) {
+  PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, 
Writer processedMetricsFileWriter) {
     super(inputTableName, outputTableName, hBaseAccessor, metricNames, 
startTime, processedMetricsFileWriter);
   }
 
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
index 11c1df9..5964a3a 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java
@@ -23,7 +23,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
-import java.io.FileWriter;
+
+import java.io.Writer;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
@@ -34,7 +35,7 @@ public class PhoenixHostMetricsCopier extends 
AbstractPhoenixMetricsCopier {
   private static final Log LOG = 
LogFactory.getLog(PhoenixHostMetricsCopier.class);
   private final Map<TimelineMetric, MetricHostAggregate> aggregateMap = new 
HashMap<>();
 
-  PhoenixHostMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, 
FileWriter processedMetricsFileWriter) {
+  PhoenixHostMetricsCopier(String inputTableName, String outputTableName, 
PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, 
Writer processedMetricsFileWriter) {
     super(inputTableName, outputTableName, hBaseAccessor, metricNames, 
startTime, processedMetricsFileWriter);
   }
 

Reply via email to