HIVE-19602: Refactor inplace progress code in Hive-on-spark progress monitor to 
use ProgressMonitor instance (Bharathkrishna Guruvayoor Murali, reviewed by 
Sahil Takiar, Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c89cf6d5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c89cf6d5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c89cf6d5

Branch: refs/heads/master-txnstats
Commit: c89cf6d5de0343493dc629a0073b5c8e88359a6e
Parents: 3a6ad26
Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com>
Authored: Mon Jun 18 10:03:01 2018 -0500
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Mon Jun 18 10:03:01 2018 -0500

----------------------------------------------------------------------
 .../ql/exec/spark/status/SparkJobMonitor.java   | 166 +------------------
 .../exec/spark/status/SparkProgressMonitor.java | 155 +++++++++++++++++
 2 files changed, 160 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c89cf6d5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index e78b1cd..3531ac2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -22,13 +22,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.fusesource.jansi.Ansi;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashSet;
@@ -38,8 +34,6 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
-import static org.fusesource.jansi.Ansi.ansi;
-
 abstract class SparkJobMonitor {
 
   protected static final String CLASS_NAME = SparkJobMonitor.class.getName();
@@ -48,6 +42,7 @@ abstract class SparkJobMonitor {
   protected final PerfLogger perfLogger = SessionState.getPerfLogger();
   protected final int checkInterval = 1000;
   protected final long monitorTimeoutInterval;
+  private final InPlaceUpdate inPlaceUpdateFn;
 
   private final Set<String> completed = new HashSet<String>();
   private final int printInterval = 3000;
@@ -61,94 +56,20 @@ abstract class SparkJobMonitor {
     FINISHED
   }
 
-  // in-place progress update related variables
   protected final boolean inPlaceUpdate;
-  private int lines = 0;
-  private final PrintStream out;
-
-  private static final int COLUMN_1_WIDTH = 16;
-  private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  
%7s  %6s  ";
-  private static final String STAGE_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  
%7s  %6s  ";
-  private static final String HEADER = String.format(HEADER_FORMAT,
-      "STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", 
"PENDING", "FAILED");
-  private static final int SEPARATOR_WIDTH = 86;
-  private static final String SEPARATOR = new String(new 
char[SEPARATOR_WIDTH]).replace("\0", "-");
-  private static final String FOOTER_FORMAT = "%-15s  %-30s %-4s  %-25s";
-  private static final int progressBarChars = 30;
-
-  private final NumberFormat secondsFormat = new DecimalFormat("#0.00");
 
   protected SparkJobMonitor(HiveConf hiveConf) {
     monitorTimeoutInterval = hiveConf.getTimeVar(
         HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
     inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && 
!SessionState.getConsole().getIsSilent();
     console = new SessionState.LogHelper(LOG);
-    out = SessionState.LogHelper.getInfoStream();
+    inPlaceUpdateFn = new 
InPlaceUpdate(SessionState.LogHelper.getInfoStream());
   }
 
   public abstract int startMonitor();
 
   private void printStatusInPlace(Map<SparkStage, SparkStageProgress> 
progressMap) {
-
-    StringBuilder reportBuffer = new StringBuilder();
-
-    // Num of total and completed tasks
-    int sumTotal = 0;
-    int sumComplete = 0;
-
-    // position the cursor to line 0
-    repositionCursor();
-
-    // header
-    reprintLine(SEPARATOR);
-    reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
-    reprintLine(SEPARATOR);
-
-    SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
-    int idx = 0;
-    final int numKey = keys.size();
-    for (SparkStage stage : keys) {
-      SparkStageProgress progress = progressMap.get(stage);
-      final int complete = progress.getSucceededTaskCount();
-      final int total = progress.getTotalTaskCount();
-      final int running = progress.getRunningTaskCount();
-      final int failed = progress.getFailedTaskCount();
-      sumTotal += total;
-      sumComplete += complete;
-
-      String s = stage.toString();
-      StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED;
-      if (complete > 0 || running > 0 || failed > 0) {
-        if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
-          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
-        }
-        if (complete < total) {
-          state = StageState.RUNNING;
-        } else {
-          state = StageState.FINISHED;
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
-          completed.add(s);
-        }
-      }
-
-      String attempt = String.valueOf(stage.getAttemptId());
-      String stageName = "Stage-" + String.valueOf(stage.getStageId());
-      String nameWithProgress = getNameWithProgress(stageName, complete, 
total);
-
-      final int pending = total - complete - running;
-      String stageStr = String.format(STAGE_FORMAT,
-          nameWithProgress, attempt, state, total, complete, running, pending, 
failed);
-      reportBuffer.append(stageStr);
-      if (idx++ != numKey - 1) {
-        reportBuffer.append("\n");
-      }
-    }
-    reprintMultiLine(reportBuffer.toString());
-    reprintLine(SEPARATOR);
-    final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / 
(float) sumTotal;
-    String footer = getFooter(numKey, completed.size(), progress, startTime);
-    reprintLineWithColorAsBold(footer, Ansi.Color.RED);
-    reprintLine(SEPARATOR);
+    inPlaceUpdateFn.render(getProgressMonitor(progressMap));
   }
 
   protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
@@ -293,84 +214,7 @@ abstract class SparkJobMonitor {
     return true;
   }
 
-  private void repositionCursor() {
-    if (lines > 0) {
-      out.print(ansi().cursorUp(lines).toString());
-      out.flush();
-      lines = 0;
-    }
-  }
-
-  private void reprintLine(String line) {
-    InPlaceUpdate.reprintLine(out, line);
-    lines++;
-  }
-
-  private void reprintLineWithColorAsBold(String line, Ansi.Color color) {
-    
out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
-        .toString());
-    out.flush();
-    lines++;
-  }
-
-  private String getNameWithProgress(String s, int complete, int total) {
-    String result = "";
-    if (s != null) {
-      float percent = total == 0 ? 1.0f : (float) complete / (float) total;
-      // lets use the remaining space in column 1 as progress bar
-      int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
-      String trimmedVName = s;
-
-      // if the vertex name is longer than column 1 width, trim it down
-      if (s.length() > COLUMN_1_WIDTH) {
-        trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2);
-        result = trimmedVName + "..";
-      } else {
-        result = trimmedVName + " ";
-      }
-
-      int toFill = (int) (spaceRemaining * percent);
-      for (int i = 0; i < toFill; i++) {
-        result += ".";
-      }
-    }
-    return result;
-  }
-
-  // STAGES: 03/04            [==================>>-----] 86%  ELAPSED TIME: 
1.71 s
-  private String getFooter(int keySize, int completedSize, float progress, 
long startTime) {
-    String verticesSummary = String.format("STAGES: %02d/%02d", completedSize, 
keySize);
-    String progressBar = getInPlaceProgressBar(progress);
-    final int progressPercent = (int) (progress * 100);
-    String progressStr = "" + progressPercent + "%";
-    float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
-    String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s";
-    String footer = String.format(FOOTER_FORMAT,
-        verticesSummary, progressBar, progressStr, elapsedTime);
-    return footer;
-  }
-
-  // [==================>>-----]
-  private String getInPlaceProgressBar(float percent) {
-    StringBuilder bar = new StringBuilder("[");
-    int remainingChars = progressBarChars - 4;
-    int completed = (int) (remainingChars * percent);
-    int pending = remainingChars - completed;
-    for (int i = 0; i < completed; i++) {
-      bar.append("=");
-    }
-    bar.append(">>");
-    for (int i = 0; i < pending; i++) {
-      bar.append("-");
-    }
-    bar.append("]");
-    return bar.toString();
-  }
-
-  private void reprintMultiLine(String line) {
-    int numLines = line.split("\r\n|\r|\n").length;
-    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
-    out.flush();
-    lines += numLines;
+  private SparkProgressMonitor getProgressMonitor(Map<SparkStage, 
SparkStageProgress> progressMap) {
+    return new SparkProgressMonitor(progressMap, startTime);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c89cf6d5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java
new file mode 100644
index 0000000..0c33db0
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * This class defines various parts of the progress update bar.
+ * Progressbar is displayed in hive-cli and typically rendered using 
InPlaceUpdate.
+ */
+class SparkProgressMonitor implements ProgressMonitor {
+
+  private Map<SparkStage, SparkStageProgress> progressMap;
+  private long startTime;
+  private static final int COLUMN_1_WIDTH = 16;
+
+  SparkProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap, long 
startTime) {
+    this.progressMap = progressMap;
+    this.startTime = startTime;
+  }
+
+  @Override
+  public List<String> headers() {
+    return Arrays.asList("STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", 
"RUNNING", "PENDING", "FAILED", "");
+  }
+
+  @Override
+  public List<List<String>> rows() {
+    List<List<String>> progressRows = new ArrayList<>();
+    SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+    for (SparkStage stage : keys) {
+      SparkStageProgress progress = progressMap.get(stage);
+      final int complete = progress.getSucceededTaskCount();
+      final int total = progress.getTotalTaskCount();
+      final int running = progress.getRunningTaskCount();
+      final int failed = progress.getFailedTaskCount();
+
+      SparkJobMonitor.StageState state =
+          total > 0 ? SparkJobMonitor.StageState.PENDING : 
SparkJobMonitor.StageState.FINISHED;
+      if (complete > 0 || running > 0 || failed > 0) {
+        if (complete < total) {
+          state = SparkJobMonitor.StageState.RUNNING;
+        } else {
+          state = SparkJobMonitor.StageState.FINISHED;
+        }
+      }
+      String attempt = String.valueOf(stage.getAttemptId());
+      String stageName = "Stage-" +String.valueOf(stage.getStageId());
+      String nameWithProgress = getNameWithProgress(stageName, complete, 
total);
+      final int pending = total - complete - running;
+
+      progressRows.add(Arrays
+          .asList(nameWithProgress, attempt, state.toString(), 
String.valueOf(total), String.valueOf(complete),
+              String.valueOf(running), String.valueOf(pending), 
String.valueOf(failed), ""));
+    }
+    return progressRows;
+  }
+
+  @Override
+  public String footerSummary() {
+    return String.format("STAGES: %02d/%02d", getCompletedStages(), 
progressMap.keySet().size());
+  }
+
+  @Override
+  public long startTime() {
+    return startTime;
+  }
+
+  @Override
+  public String executionStatus() {
+    if (getCompletedStages() == progressMap.keySet().size()) {
+      return SparkJobMonitor.StageState.FINISHED.toString();
+    } else {
+      return SparkJobMonitor.StageState.RUNNING.toString();
+    }
+  }
+
+  @Override
+  public double progressedPercentage() {
+
+    SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+    int sumTotal = 0;
+    int sumComplete = 0;
+    for (SparkStage stage : keys) {
+      SparkStageProgress progress = progressMap.get(stage);
+      final int complete = progress.getSucceededTaskCount();
+      final int total = progress.getTotalTaskCount();
+      sumTotal += total;
+      sumComplete += complete;
+    }
+    double progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) 
sumTotal;
+    return progress;
+  }
+
+  private int getCompletedStages() {
+    int completed = 0;
+    SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+    for (SparkStage stage : keys) {
+      SparkStageProgress progress = progressMap.get(stage);
+      final int complete = progress.getSucceededTaskCount();
+      final int total = progress.getTotalTaskCount();
+      if (total > 0 && complete == total) {
+        completed++;
+      }
+    }
+    return completed;
+  }
+
+  private String getNameWithProgress(String s, int complete, int total) {
+
+    if (s == null) {
+      return "";
+    }
+    float percent = total == 0 ? 1.0f : (float) complete / (float) total;
+    // lets use the remaining space in column 1 as progress bar
+    int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+    String trimmedVName = s;
+
+    // if the vertex name is longer than column 1 width, trim it down
+    if (s.length() > COLUMN_1_WIDTH) {
+      trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2);
+      trimmedVName += "..";
+    } else {
+      trimmedVName += " ";
+    }
+    StringBuilder result = new StringBuilder(trimmedVName);
+    int toFill = (int) (spaceRemaining * percent);
+    for (int i = 0; i < toFill; i++) {
+      result.append(".");
+    }
+    return result.toString();
+  }
+}

Reply via email to