HIVE-19602: Refactor inplace progress code in Hive-on-spark progress monitor to use ProgressMonitor instance (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar, Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c89cf6d5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c89cf6d5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c89cf6d5 Branch: refs/heads/master-txnstats Commit: c89cf6d5de0343493dc629a0073b5c8e88359a6e Parents: 3a6ad26 Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Mon Jun 18 10:03:01 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Jun 18 10:03:01 2018 -0500 ---------------------------------------------------------------------- .../ql/exec/spark/status/SparkJobMonitor.java | 166 +------------------ .../exec/spark/status/SparkProgressMonitor.java | 155 +++++++++++++++++ 2 files changed, 160 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c89cf6d5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index e78b1cd..3531ac2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -22,13 +22,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState; -import org.fusesource.jansi.Ansi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.PrintStream; -import java.text.DecimalFormat; -import java.text.NumberFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashSet; @@ -38,8 +34,6 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import static org.fusesource.jansi.Ansi.ansi; - abstract class SparkJobMonitor { protected static final String CLASS_NAME = SparkJobMonitor.class.getName(); @@ -48,6 +42,7 @@ abstract class SparkJobMonitor { protected final PerfLogger perfLogger = SessionState.getPerfLogger(); protected final int checkInterval = 1000; protected final long monitorTimeoutInterval; + private final InPlaceUpdate inPlaceUpdateFn; private final Set<String> completed = new HashSet<String>(); private final int printInterval = 3000; @@ -61,94 +56,20 @@ abstract class SparkJobMonitor { FINISHED } - // in-place progress update related variables protected final boolean inPlaceUpdate; - private int lines = 0; - private final PrintStream out; - - private static final int COLUMN_1_WIDTH = 16; - private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s "; - private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s "; - private static final String HEADER = String.format(HEADER_FORMAT, - "STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED"); - private static final int SEPARATOR_WIDTH = 86; - private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-"); - private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; - private static final int progressBarChars = 30; - - private final NumberFormat secondsFormat = new DecimalFormat("#0.00"); protected SparkJobMonitor(HiveConf hiveConf) { monitorTimeoutInterval = hiveConf.getTimeVar( HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent(); console = new SessionState.LogHelper(LOG); - out = SessionState.LogHelper.getInfoStream(); + inPlaceUpdateFn = new InPlaceUpdate(SessionState.LogHelper.getInfoStream()); } public abstract int startMonitor(); private void printStatusInPlace(Map<SparkStage, SparkStageProgress> progressMap) { - - StringBuilder reportBuffer = new StringBuilder(); - - // Num of total and completed tasks - int sumTotal = 0; - int sumComplete = 0; - - // position the cursor to line 0 - repositionCursor(); - - // header - reprintLine(SEPARATOR); - reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); - reprintLine(SEPARATOR); - - SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); - int idx = 0; - final int numKey = keys.size(); - for (SparkStage stage : keys) { - SparkStageProgress progress = progressMap.get(stage); - final int complete = progress.getSucceededTaskCount(); - final int total = progress.getTotalTaskCount(); - final int running = progress.getRunningTaskCount(); - final int failed = progress.getFailedTaskCount(); - sumTotal += total; - sumComplete += complete; - - String s = stage.toString(); - StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED; - if (complete > 0 || running > 0 || failed > 0) { - if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); - } - if (complete < total) { - state = StageState.RUNNING; - } else { - state = StageState.FINISHED; - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); - completed.add(s); - } - } - - String attempt = String.valueOf(stage.getAttemptId()); - String stageName = "Stage-" + String.valueOf(stage.getStageId()); - String nameWithProgress = getNameWithProgress(stageName, complete, total); - - final int pending = total - complete - running; - String stageStr = String.format(STAGE_FORMAT, - nameWithProgress, attempt, state, total, complete, running, pending, failed); - reportBuffer.append(stageStr); - if (idx++ != numKey - 1) { - reportBuffer.append("\n"); - } - } - reprintMultiLine(reportBuffer.toString()); - reprintLine(SEPARATOR); - final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; - String footer = getFooter(numKey, completed.size(), progress, startTime); - reprintLineWithColorAsBold(footer, Ansi.Color.RED); - reprintLine(SEPARATOR); + inPlaceUpdateFn.render(getProgressMonitor(progressMap)); } protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap, @@ -293,84 +214,7 @@ abstract class SparkJobMonitor { return true; } - private void repositionCursor() { - if (lines > 0) { - out.print(ansi().cursorUp(lines).toString()); - out.flush(); - lines = 0; - } - } - - private void reprintLine(String line) { - InPlaceUpdate.reprintLine(out, line); - lines++; - } - - private void reprintLineWithColorAsBold(String line, Ansi.Color color) { - out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset() - .toString()); - out.flush(); - lines++; - } - - private String getNameWithProgress(String s, int complete, int total) { - String result = ""; - if (s != null) { - float percent = total == 0 ? 1.0f : (float) complete / (float) total; - // lets use the remaining space in column 1 as progress bar - int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; - String trimmedVName = s; - - // if the vertex name is longer than column 1 width, trim it down - if (s.length() > COLUMN_1_WIDTH) { - trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2); - result = trimmedVName + ".."; - } else { - result = trimmedVName + " "; - } - - int toFill = (int) (spaceRemaining * percent); - for (int i = 0; i < toFill; i++) { - result += "."; - } - } - return result; - } - - // STAGES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s - private String getFooter(int keySize, int completedSize, float progress, long startTime) { - String verticesSummary = String.format("STAGES: %02d/%02d", completedSize, keySize); - String progressBar = getInPlaceProgressBar(progress); - final int progressPercent = (int) (progress * 100); - String progressStr = "" + progressPercent + "%"; - float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000; - String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s"; - String footer = String.format(FOOTER_FORMAT, - verticesSummary, progressBar, progressStr, elapsedTime); - return footer; - } - - // [==================>>-----] - private String getInPlaceProgressBar(float percent) { - StringBuilder bar = new StringBuilder("["); - int remainingChars = progressBarChars - 4; - int completed = (int) (remainingChars * percent); - int pending = remainingChars - completed; - for (int i = 0; i < completed; i++) { - bar.append("="); - } - bar.append(">>"); - for (int i = 0; i < pending; i++) { - bar.append("-"); - } - bar.append("]"); - return bar.toString(); - } - - private void reprintMultiLine(String line) { - int numLines = line.split("\r\n|\r|\n").length; - out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); - out.flush(); - lines += numLines; + private SparkProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) { + return new SparkProgressMonitor(progressMap, startTime); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c89cf6d5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java new file mode 100644 index 0000000..0c33db0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import org.apache.hadoop.hive.common.log.ProgressMonitor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * This class defines various parts of the progress update bar. + * Progressbar is displayed in hive-cli and typically rendered using InPlaceUpdate. + */ +class SparkProgressMonitor implements ProgressMonitor { + + private Map<SparkStage, SparkStageProgress> progressMap; + private long startTime; + private static final int COLUMN_1_WIDTH = 16; + + SparkProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap, long startTime) { + this.progressMap = progressMap; + this.startTime = startTime; + } + + @Override + public List<String> headers() { + return Arrays.asList("STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", ""); + } + + @Override + public List<List<String>> rows() { + List<List<String>> progressRows = new ArrayList<>(); + SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskCount(); + + SparkJobMonitor.StageState state = + total > 0 ? SparkJobMonitor.StageState.PENDING : SparkJobMonitor.StageState.FINISHED; + if (complete > 0 || running > 0 || failed > 0) { + if (complete < total) { + state = SparkJobMonitor.StageState.RUNNING; + } else { + state = SparkJobMonitor.StageState.FINISHED; + } + } + String attempt = String.valueOf(stage.getAttemptId()); + String stageName = "Stage-" +String.valueOf(stage.getStageId()); + String nameWithProgress = getNameWithProgress(stageName, complete, total); + final int pending = total - complete - running; + + progressRows.add(Arrays + .asList(nameWithProgress, attempt, state.toString(), String.valueOf(total), String.valueOf(complete), + String.valueOf(running), String.valueOf(pending), String.valueOf(failed), "")); + } + return progressRows; + } + + @Override + public String footerSummary() { + return String.format("STAGES: %02d/%02d", getCompletedStages(), progressMap.keySet().size()); + } + + @Override + public long startTime() { + return startTime; + } + + @Override + public String executionStatus() { + if (getCompletedStages() == progressMap.keySet().size()) { + return SparkJobMonitor.StageState.FINISHED.toString(); + } else { + return SparkJobMonitor.StageState.RUNNING.toString(); + } + } + + @Override + public double progressedPercentage() { + + SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); + int sumTotal = 0; + int sumComplete = 0; + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + sumTotal += total; + sumComplete += complete; + } + double progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; + return progress; + } + + private int getCompletedStages() { + int completed = 0; + SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet()); + for (SparkStage stage : keys) { + SparkStageProgress progress = progressMap.get(stage); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + if (total > 0 && complete == total) { + completed++; + } + } + return completed; + } + + private String getNameWithProgress(String s, int complete, int total) { + + if (s == null) { + return ""; + } + float percent = total == 0 ? 1.0f : (float) complete / (float) total; + // lets use the remaining space in column 1 as progress bar + int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; + String trimmedVName = s; + + // if the vertex name is longer than column 1 width, trim it down + if (s.length() > COLUMN_1_WIDTH) { + trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2); + trimmedVName += ".."; + } else { + trimmedVName += " "; + } + StringBuilder result = new StringBuilder(trimmedVName); + int toFill = (int) (spaceRemaining * percent); + for (int i = 0; i < toFill; i++) { + result.append("."); + } + return result.toString(); + } +}