Repository: hive Updated Branches: refs/heads/master 8e7c3b340 -> f09db52fd
HIVE-20170: Improve JoinOperator "rows for join key" Logging (BELUGA BEHR via Peter Vary) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f09db52f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f09db52f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f09db52f Branch: refs/heads/master Commit: f09db52fdd3e1fc4ebadc41c36b8347b9a5723a5 Parents: 8e7c3b3 Author: BELUGA BEHR <dam6...@gmail.com> Authored: Thu Jan 17 18:19:04 2019 +0100 Committer: Peter Vary <pv...@cloudera.com> Committed: Thu Jan 17 18:19:04 2019 +0100 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/CommonJoinOperator.java | 26 ++++++++++++++++---- .../hadoop/hive/ql/exec/JoinOperator.java | 24 +++++++++--------- 2 files changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f09db52f/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 3762ee5..1c32588 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * Join operator implementation. */ @@ -145,6 +147,7 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends int joinCacheSize = 0; long nextSz = 0; transient Byte lastAlias = null; + private long logEveryNRows = 0L; transient boolean handleSkewJoin = false; @@ -170,6 +173,7 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends this.joinEmitInterval = clone.joinEmitInterval; this.joinCacheSize = clone.joinCacheSize; this.nextSz = clone.nextSz; + this.logEveryNRows = clone.logEveryNRows; this.childOperators = clone.childOperators; this.parentOperators = clone.parentOperators; this.done = clone.done; @@ -294,6 +298,9 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends joinCacheSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEJOINCACHESIZE); + logEveryNRows = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.HIVE_LOG_N_RECORDS); + // construct dummy null row (indicating empty table) and // construct spill table serde which is used if input is too // large to fit into main memory. @@ -394,13 +401,22 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends super.startGroup(); } + /** + * Determine the frequency with which to emit a log message instead of + * one for every for every event. + * + * @param sz The current number of events + * @return The next event count to emit a log message + */ protected long getNextSize(long sz) { - // A very simple counter to keep track of join entries for a key - if (sz >= 100000) { - return sz + 100000; + Preconditions.checkArgument(sz >= 0L); + // If no logging is configured, log every 1, 10, 100, 1000, ..., 100000 + if (this.logEveryNRows == 0L) { + final long next = (long) Math.pow(10.0, Math.ceil(Math.log10(sz + 1))); + return Math.min(100000L, next); } - - return 2 * sz; + // Log every N rows + return ((sz / this.logEveryNRows) + 1L) * this.logEveryNRows; } protected transient Byte alias; http://git-wip-us.apache.org/repos/asf/hive/blob/f09db52f/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index e995ab7..451ba1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -82,10 +82,6 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial lastAlias = alias; alias = (byte) tag; - if (!alias.equals(lastAlias)) { - nextSz = joinEmitInterval; - } - List<Object> nr = getFilteredValue(alias, row); if (handleSkewJoin) { @@ -93,7 +89,7 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial } // number of rows for the key in the given table - long sz = storage[alias].rowCount(); + final long sz = storage[alias].rowCount(); StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag]; StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY .toString()); @@ -112,14 +108,16 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial checkAndGenObject(); storage[alias].clearRows(); } - } else { - if (LOG.isInfoEnabled() && (sz == nextSz)) { - // Print a message if we reached at least 1000 rows for a join operand - // We won't print a message for the last join operand since the size - // will never goes to joinEmitInterval. - LOG.info("table " + alias + " has " + sz + " rows for join key " + keyObject); - nextSz = getNextSize(nextSz); - } + } + + // The input is sorted by alias, so when an alias change is detected, + // reset the counter for the next join key in the stream + if (!alias.equals(lastAlias)) { + nextSz = getNextSize(0L); + } + if (sz == nextSz) { + LOG.info("Table {} has {} rows for join key {}", alias, sz, keyObject); + nextSz = getNextSize(nextSz); } // Add the value to the vector