This is an automated email from the ASF dual-hosted git repository. payert pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new a4990d9 AMBARI-25569 Reassess Ambari Metrics data migration - 2nd part (#3254) a4990d9 is described below commit a4990d9431643b82d1ec35ae1cd0c0739b5f7035 Author: Tamas Payer <35402259+pay...@users.noreply.github.com> AuthorDate: Fri Dec 4 11:21:43 2020 +0100 AMBARI-25569 Reassess Ambari Metrics data migration - 2nd part (#3254) * Introduce --allmetrics to enforce migration of all metrics regardless other settings. Due to the suboptimal argumenet handling if one wants to define define argument that is behind the 'whitelist file' argument - like the 'starttime' - the whitelist file cannot be left empty, so the --allmetrics argument can be provided instead. Change-Id: I1df6eb7ecdddb412c08c7cc48781da2679d2b75d * Improve performance by reducing synchronization granularity and leveraging BufferedWriter Change-Id: I26dced203a49e69e428ec7802fc3fb7dd7a68baf * Fix erroneous startTime handling When external starttime was specified it was not subtracted from the actual time. Change-Id: I52815fc04548ed9cf11505f311f0f893db9bf352 * Adding log message about the migration time frame. Change-Id: Ibdfd747239ffe2153ec4293b68513a417aee8cb2 * Fix review change requests Change-Id: I714d3f98fbade93ba69e8ef0dd75e630a744710a Co-authored-by: Tamas Payer <tpa...@cloudera.com> --- .../upgrade/core/AbstractPhoenixMetricsCopier.java | 48 +++++++---------- .../upgrade/core/MetricsDataMigrationLauncher.java | 62 ++++++++++++++++------ .../upgrade/core/PhoenixClusterMetricsCopier.java | 4 +- .../upgrade/core/PhoenixHostMetricsCopier.java | 5 +- 4 files changed, 69 insertions(+), 50 deletions(-) diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java index d69f28a..8d075fe 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/AbstractPhoenixMetricsCopier.java @@ -22,8 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import java.io.FileWriter; import java.io.IOException; +import java.io.Writer; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -32,16 +32,16 @@ import java.util.Set; public abstract class AbstractPhoenixMetricsCopier implements Runnable { private static final Log LOG = LogFactory.getLog(AbstractPhoenixMetricsCopier.class); - private static final Long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L; - private final Long startTime; - protected final FileWriter processedMetricsFile; + private static final long DEFAULT_NATIVE_TIME_RANGE_DELAY = 120000L; + private final long startTime; + protected final Writer processedMetricsFile; protected String inputTable; protected String outputTable; protected Set<String> metricNames; protected PhoenixHBaseAccessor hBaseAccessor; public AbstractPhoenixMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, - Set<String> metricNames, Long startTime, FileWriter outputStream) { + Set<String> metricNames, long startTime, Writer outputStream) { this.inputTable = inputTableName; this.outputTable = outputTableName; this.hBaseAccessor = hBaseAccessor; @@ -53,8 +53,8 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { @Override public void run(){ LOG.info(String.format("Copying %s metrics from %s to %s", metricNames, inputTable, outputTable)); - final long startTimer = System.currentTimeMillis(); - final String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME", + long timerStart = System.currentTimeMillis(); + String query = String.format("SELECT %s %s FROM %s WHERE %s AND SERVER_TIME > %s ORDER BY METRIC_NAME, SERVER_TIME", getQueryHint(startTime), getColumnsClause(), inputTable, getMetricNamesLikeClause(), startTime); runPhoenixQueryAndAddToResults(query); @@ -64,24 +64,19 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { } catch (SQLException e) { LOG.error(e); } finally { - final long estimatedTime = System.currentTimeMillis() - startTimer; - LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", estimatedTime/ 1000.0, inputTable, outputTable, metricNames)); + long timerDelta = System.currentTimeMillis() - timerStart; + LOG.debug(String.format("Copying took %s seconds from table %s to table %s for metric names %s", timerDelta/ 1000.0, inputTable, outputTable, metricNames)); saveMetricsProgress(); } } private String getMetricNamesLikeClause() { - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(256); sb.append('('); int i = 0; for (String metricName : metricNames) { - sb.append("METRIC_NAME"); - sb.append(" LIKE "); - sb.append("'"); - sb.append(metricName); - sb.append("'"); - + sb.append("METRIC_NAME LIKE '").append(metricName).append("'"); if (i < metricNames.size() - 1) { sb.append(" OR "); } @@ -116,25 +111,20 @@ public abstract class AbstractPhoenixMetricsCopier implements Runnable { LOG.info("Skipping metrics progress save as the file is null"); return; } - synchronized (this.processedMetricsFile) { - for (String metricName : metricNames) { - try { + + for (String metricName : metricNames) { + try { + synchronized (this.processedMetricsFile) { this.processedMetricsFile.append(inputTable).append(":").append(metricName).append(System.lineSeparator()); - } catch (IOException e) { - LOG.error(e); } + } catch (IOException e) { + LOG.error(e); } } } - protected String getQueryHint(Long startTime) { - final StringBuilder sb = new StringBuilder(); - sb.append("/*+ "); - sb.append("NATIVE_TIME_RANGE("); - sb.append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY); - sb.append(") "); - sb.append("*/"); - return sb.toString(); + protected String getQueryHint(long startTime) { + return new StringBuilder().append("/*+ NATIVE_TIME_RANGE(").append(startTime - DEFAULT_NATIVE_TIME_RANGE_DELAY).append(") */").toString(); } protected MetricHostAggregate extractMetricHostAggregate(ResultSet rs) throws SQLException { diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java index 889158f..0c2f8e6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/MetricsDataMigrationLauncher.java @@ -28,17 +28,22 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; + import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; +import java.io.Writer; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -68,12 +73,12 @@ public class MetricsDataMigrationLauncher { private static final Long DEFAULT_TIMEOUT_MINUTES = 60*24L; private static final String PATTERN_PREFIX = "._p_"; private static final int DEFAULT_BATCH_SIZE = 5; + private static final String MIGRATE_ALL_METRICS_ARG = "--allmetrics"; public static final Map<String, String> CLUSTER_AGGREGATE_TABLES_MAPPING = new HashMap<>(); public static final Map<String, String> HOST_AGGREGATE_TABLES_MAPPING = new HashMap<>(); public static final String DEFAULT_PROCESSED_METRICS_FILE_LOCATION = "/var/log/ambari-metrics-collector/ambari-metrics-migration-state.txt"; public static final int DEFAULT_NUMBER_OF_THREADS = 3; - public static final long ONE_MONTH_MILLIS = 2592000000L; - public static final long DEFAULT_START_TIME = System.currentTimeMillis() - ONE_MONTH_MILLIS; //Last month + public static final int DEFAULT_START_DAYS = 30; // 30 Days, Last month static { CLUSTER_AGGREGATE_TABLES_MAPPING.put(METRICS_CLUSTER_AGGREGATE_MINUTE_V1_TABLE_NAME, METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME); @@ -88,15 +93,15 @@ public class MetricsDataMigrationLauncher { private final Set<Set<String>> metricNamesBatches; private final String processedMetricsFilePath; - private final Long startTime; - private final Integer numberOfThreads; + private final long startTimeEpoch; + private final int numberOfThreads; private TimelineMetricConfiguration timelineMetricConfiguration; private PhoenixHBaseAccessor hBaseAccessor; private TimelineMetricMetadataManager timelineMetricMetadataManager; private Map<String, Set<String>> processedMetrics; - public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startTime, Integer numberOfThreads, Integer batchSize) throws Exception { - this.startTime = (startTime == null) ? DEFAULT_START_TIME : startTime; + public MetricsDataMigrationLauncher(String whitelistedFilePath, String processedMetricsFilePath, Long startDay, Integer numberOfThreads, Integer batchSize) throws Exception { + this.startTimeEpoch = calculateStartEpochTime(startDay); this.numberOfThreads = (numberOfThreads == null) ? DEFAULT_NUMBER_OF_THREADS : numberOfThreads; this.processedMetricsFilePath = (processedMetricsFilePath == null) ? DEFAULT_PROCESSED_METRICS_FILE_LOCATION : processedMetricsFilePath; @@ -114,8 +119,29 @@ public class MetricsDataMigrationLauncher { LOG.info(String.format("Split metric names into %s batches with size of %s", metricNamesBatches.size(), batchSize)); } + private long calculateStartEpochTime(Long startDay) { + final long days; + if (startDay == null) { + LOG.info(String.format("No starting day have been provided, using default: %d", DEFAULT_START_DAYS)); + days = DEFAULT_START_DAYS; + } else { + LOG.info(String.format("%d days have been provided as migration starting day.", startDay)); + days = startDay; + } + LOG.info(String.format("The last %d days' data will be migrated.", days)); + + return LocalDateTime.now().minusDays(days).toEpochSecond(ZoneOffset.UTC); + } + private Set<String> getMetricNames(String whitelistedFilePath) throws MalformedURLException, URISyntaxException, SQLException { - if(whitelistedFilePath != null) { + if(StringUtils.isNotEmpty(whitelistedFilePath) && whitelistedFilePath.equalsIgnoreCase(MIGRATE_ALL_METRICS_ARG)) { + LOG.info("Migration of all metrics has been requested by the " + MIGRATE_ALL_METRICS_ARG + " argument."); + LOG.info("Looking for all the metric names in the Metrics Database..."); + return this.hBaseAccessor.getTimelineMetricMetadataV1().keySet().stream() + .map(TimelineMetricMetadataKey::getMetricName).collect(Collectors.toSet()); + } + + if(StringUtils.isNotEmpty(whitelistedFilePath)) { LOG.info(String.format("Whitelist file %s has been provided.", whitelistedFilePath)); LOG.info("Looking for whitelisted metric names based on the file content..."); return readMetricWhitelistFromFile(whitelistedFilePath); @@ -164,7 +190,7 @@ public class MetricsDataMigrationLauncher { } public void runMigration(Long timeoutInMinutes) throws IOException { - try (FileWriter processedMetricsFileWriter = new FileWriter(this.processedMetricsFilePath, true)) { + try (Writer processedMetricsFileWriter = new BufferedWriter(new FileWriter(this.processedMetricsFilePath, true))) { LOG.info("Setting up copiers..."); Set<AbstractPhoenixMetricsCopier> copiers = new HashSet<>(); for (Set<String> batch : metricNamesBatches) { @@ -172,7 +198,7 @@ public class MetricsDataMigrationLauncher { Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); if (!filteredMetrics.isEmpty()) { copiers.add(new PhoenixClusterMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, - filteredMetrics, this.startTime, processedMetricsFileWriter)); + filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter)); } } @@ -180,7 +206,7 @@ public class MetricsDataMigrationLauncher { Set<String> filteredMetrics = filterProcessedMetrics(batch, this.processedMetrics, entry.getKey()); if (!filteredMetrics.isEmpty()) { copiers.add(new PhoenixHostMetricsCopier(entry.getKey(), entry.getValue(), this.hBaseAccessor, - filteredMetrics, this.startTime, processedMetricsFileWriter)); + filteredMetrics, this.startTimeEpoch, processedMetricsFileWriter)); } } } @@ -191,7 +217,7 @@ public class MetricsDataMigrationLauncher { } LOG.info("Running the copy threads..."); - long startTimer = System.currentTimeMillis(); + long timerStart = System.currentTimeMillis(); ExecutorService executorService = null; try { executorService = Executors.newFixedThreadPool(this.numberOfThreads); @@ -209,8 +235,8 @@ public class MetricsDataMigrationLauncher { } } - long estimatedTime = System.currentTimeMillis() - startTimer; - LOG.info(String.format("Copying took %s seconds", estimatedTime / 1000.0)); + long timerDelta = System.currentTimeMillis() - timerStart; + LOG.info(String.format("Copying took %s seconds", timerDelta / 1000.0)); } } @@ -279,7 +305,9 @@ public class MetricsDataMigrationLauncher { * file location will be used if configured * if not provided and AMS whitelisting is disabled then no whitelisting * will be used and all the metrics will be migrated - * args[2] - startTime - default value is set to the last 30 days + * if --allmetrics switch is provided then all the metrics will be migrated + * regardless to other settings + * args[2] - startDay - default value is set to the last 30 days * args[3] - numberOfThreads - default value is 3 * args[4] - batchSize - default value is 5 * args[5] - timeoutInMinutes - default value is set to the equivalent of 24 hours @@ -287,7 +315,7 @@ public class MetricsDataMigrationLauncher { public static void main(String[] args) { String processedMetricsFilePath = null; String whitelistedFilePath = null; - Long startTime = null; + Long startDay = null; Integer numberOfThreads = null; Integer batchSize = null; Long timeoutInMinutes = DEFAULT_TIMEOUT_MINUTES; @@ -299,7 +327,7 @@ public class MetricsDataMigrationLauncher { whitelistedFilePath = args[1]; } if (args.length>2) { - startTime = Long.valueOf(args[2]); + startDay = Long.valueOf(args[2]); } if (args.length>3) { numberOfThreads = Integer.valueOf(args[3]); @@ -314,7 +342,7 @@ public class MetricsDataMigrationLauncher { MetricsDataMigrationLauncher dataMigrationLauncher = null; try { LOG.info("Initializing system..."); - dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startTime, numberOfThreads, batchSize); + dataMigrationLauncher = new MetricsDataMigrationLauncher(whitelistedFilePath, processedMetricsFilePath, startDay, numberOfThreads, batchSize); } catch (Exception e) { LOG.error("Exception during system setup, exiting...", e); System.exit(1); diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java index 037b3d2..177d202 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixClusterMetricsCopier.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import java.io.FileWriter; +import java.io.Writer; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; @@ -34,7 +34,7 @@ public class PhoenixClusterMetricsCopier extends AbstractPhoenixMetricsCopier { private static final Log LOG = LogFactory.getLog(PhoenixClusterMetricsCopier.class); private final Map<TimelineClusterMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); - PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) { + PhoenixClusterMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, Writer processedMetricsFileWriter) { super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter); } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java index 11c1df9..5964a3a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/upgrade/core/PhoenixHostMetricsCopier.java @@ -23,7 +23,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import java.io.FileWriter; + +import java.io.Writer; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; @@ -34,7 +35,7 @@ public class PhoenixHostMetricsCopier extends AbstractPhoenixMetricsCopier { private static final Log LOG = LogFactory.getLog(PhoenixHostMetricsCopier.class); private final Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); - PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, Long startTime, FileWriter processedMetricsFileWriter) { + PhoenixHostMetricsCopier(String inputTableName, String outputTableName, PhoenixHBaseAccessor hBaseAccessor, Set<String> metricNames, long startTime, Writer processedMetricsFileWriter) { super(inputTableName, outputTableName, hBaseAccessor, metricNames, startTime, processedMetricsFileWriter); }