This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a21e94  HIVE-22862: Remove unnecessary calls to isEnoughToCompact 
(Karen Coppage via Laszlo Pinter)
5a21e94 is described below

commit 5a21e946b1adbca25859e3c57516ba691162ae4c
Author: Karen Coppage <karen.copp...@cloudera.com>
AuthorDate: Mon Feb 24 11:16:13 2020 +0100

    HIVE-22862: Remove unnecessary calls to isEnoughToCompact (Karen Coppage 
via Laszlo Pinter)
---
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |  3 --
 .../hive/ql/txn/compactor/MinorQueryCompactor.java |  3 --
 .../ql/txn/compactor/MmMajorQueryCompactor.java    |  4 --
 .../hive/ql/txn/compactor/QueryCompactor.java      | 57 --------------------
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 63 +++++++++++++++++++++-
 5 files changed, 61 insertions(+), 69 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index b4bc3c7..25c14e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -231,9 +231,6 @@ public class CompactorMR {
     }
 
     JobConf job = createBaseJobConf(conf, jobName, t, sd, writeIds, ci);
-    if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), dir, 
sd)) {
-      return;
-    }
 
     List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
     int maxDeltasToHandle = 
conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index f96a048..59dcf2c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -56,9 +56,6 @@ final class MinorQueryCompactor extends QueryCompactor {
     AcidUtils.Directory dir = AcidUtils
         .getAcidState(null, new Path(storageDescriptor.getLocation()), 
hiveConf, writeIds, Ref.from(false), false,
             table.getParameters(), false);
-    if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, 
storageDescriptor)) {
-      return;
-    }
     // Set up the session for driver.
     HiveConf conf = new HiveConf(hiveConf);
     conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 48387c9..41fdd7e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -54,10 +54,6 @@ final class MmMajorQueryCompactor extends QueryCompactor {
             table.getParameters(), false);
     MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
 
-    if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, 
storageDescriptor)) {
-      return;
-    }
-
     String tmpLocation = Util.generateTmpPath(storageDescriptor);
     Path baseLocation = new Path(tmpLocation, "_base");
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index d234910..3ce4dde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -40,10 +40,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.stream.Collectors;
 
 /**
  * Common interface for query based compactions.
@@ -144,61 +142,6 @@ abstract class QueryCompactor {
    */
   static class Util {
     /**
-     * Determine if compaction can run in a specified directory.
-     * @param isMajorCompaction type of compaction.
-     * @param dir the delta directory
-     * @param sd resolved storage descriptor
-     * @return true, if compaction can run.
-     */
-    static boolean isEnoughToCompact(boolean isMajorCompaction, 
AcidUtils.Directory dir, StorageDescriptor sd) {
-      int deltaCount = dir.getCurrentDirectories().size();
-      int origCount = dir.getOriginalFiles().size();
-
-      StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
-      boolean isEnoughToCompact;
-
-      if (isMajorCompaction) {
-        isEnoughToCompact = (origCount > 0 || deltaCount + 
(dir.getBaseDirectory() == null ? 0 : 1) > 1);
-
-      } else {
-        isEnoughToCompact = (deltaCount > 1);
-
-        if (deltaCount == 2) {
-          Map<String, Long> deltaByType = 
dir.getCurrentDirectories().stream().collect(Collectors
-              .groupingBy(delta -> (delta.isDeleteDelta() ? 
AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX),
-                  Collectors.counting()));
-
-          isEnoughToCompact = (deltaByType.size() != deltaCount);
-          deltaInfo.append(" ").append(deltaByType);
-        }
-      }
-
-      if (!isEnoughToCompact) {
-        LOG.debug("Not compacting {}; current base: {}, delta files: {}, 
originals: {}", sd.getLocation(),
-            dir.getBaseDirectory(), deltaInfo, origCount);
-      }
-      return isEnoughToCompact;
-    }
-
-    /**
-     * Check for obsolete directories, and return true if any exist and 
Cleaner should be
-     * run. For example if we insert overwrite into a table with only deltas, 
a new base file with
-     * the highest writeId is created so there will be no live delta 
directories, only obsolete
-     * ones. Compaction is not needed, but the cleaner should still be run.
-     *
-     * @return true if cleaning is needed
-     */
-    public static boolean needsCleaning(AcidUtils.Directory dir, 
StorageDescriptor sd) {
-      int numObsoleteDirs = dir.getObsolete().size();
-      boolean needsJustCleaning = numObsoleteDirs > 0;
-      if (needsJustCleaning) {
-        LOG.debug("{} obsolete directories in {} found; marked for cleaning.",
-            numObsoleteDirs, sd.getLocation());
-      }
-      return needsJustCleaning;
-    }
-
-    /**
      * Generate a random tmp path, under the provided storage.
      * @param sd storage descriptor, must be not null.
      * @return path, always not null
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 383969a..90c699a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 /**
  * A class to do compactions.  This will run in a separate thread.  It will 
spin on the
@@ -188,8 +189,8 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
         // Don't start compaction or cleaning if not necessary
         AcidUtils.Directory dir = AcidUtils.getAcidState(null, new 
Path(sd.getLocation()), conf,
             tblValidWriteIds, Ref.from(false), true, null, false);
-        if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), 
dir, sd)) {
-          if (QueryCompactor.Util.needsCleaning(dir, sd)) {
+        if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
+          if (needsCleaning(dir, sd)) {
             msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
           } else {
             // do nothing
@@ -436,4 +437,62 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
       }
     }
   }
+
+  /**
+   * Determine if compaction can run in a specified directory.
+   * @param isMajorCompaction type of compaction.
+   * @param dir the delta directory
+   * @param sd resolved storage descriptor
+   * @return true, if compaction can run.
+   */
+  static boolean isEnoughToCompact(boolean isMajorCompaction, 
AcidUtils.Directory dir,
+      StorageDescriptor sd) {
+    int deltaCount = dir.getCurrentDirectories().size();
+    int origCount = dir.getOriginalFiles().size();
+
+    StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
+    boolean isEnoughToCompact;
+
+    if (isMajorCompaction) {
+      isEnoughToCompact =
+          (origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 
1) > 1);
+
+    } else {
+      isEnoughToCompact = (deltaCount > 1);
+
+      if (deltaCount == 2) {
+        Map<String, Long> deltaByType = 
dir.getCurrentDirectories().stream().collect(Collectors
+            .groupingBy(delta -> (delta
+                    .isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : 
AcidUtils.DELTA_PREFIX),
+                Collectors.counting()));
+
+        isEnoughToCompact = (deltaByType.size() != deltaCount);
+        deltaInfo.append(" ").append(deltaByType);
+      }
+    }
+
+    if (!isEnoughToCompact) {
+      LOG.debug("Not compacting {}; current base: {}, delta files: {}, 
originals: {}",
+          sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount);
+    }
+    return isEnoughToCompact;
+  }
+
+  /**
+   * Check for obsolete directories, and return true if any exist and Cleaner 
should be
+   * run. For example if we insert overwrite into a table with only deltas, a 
new base file with
+   * the highest writeId is created so there will be no live delta 
directories, only obsolete
+   * ones. Compaction is not needed, but the cleaner should still be run.
+   *
+   * @return true if cleaning is needed
+   */
+  public static boolean needsCleaning(AcidUtils.Directory dir, 
StorageDescriptor sd) {
+    int numObsoleteDirs = dir.getObsolete().size();
+    boolean needsJustCleaning = numObsoleteDirs > 0;
+    if (needsJustCleaning) {
+      LOG.debug("{} obsolete directories in {} found; marked for cleaning.", 
numObsoleteDirs,
+          sd.getLocation());
+    }
+    return needsJustCleaning;
+  }
 }

Reply via email to