[ https://issues.apache.org/jira/browse/HIVE-21217?focusedWorklogId=199295&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-199295 ]
ASF GitHub Bot logged work on HIVE-21217: ----------------------------------------- Author: ASF GitHub Bot Created on: 15/Feb/19 15:39 Start Date: 15/Feb/19 15:39 Worklog Time Spent: 10m Work Description: pvary commented on pull request #538: HIVE-21217: Optimize range calculation for PTF URL: https://github.com/apache/hive/pull/538#discussion_r257280122 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java ########## @@ -44,10 +49,207 @@ public ValueBoundaryScanner(BoundaryDef start, BoundaryDef end, boolean nullsLas this.nullsLast = nullsLast; } + public abstract Object computeValue(Object row) throws HiveException; + + /** + * Checks if the distance of v2 to v1 is greater than the given amt. + * @return True if the value of v1 - v2 is greater than amt or either value is null. + */ + public abstract boolean isDistanceGreater(Object v1, Object v2, int amt); + + /** + * Checks if the values of v1 or v2 are the same. + * @return True if both values are the same or both are nulls. + */ + public abstract boolean isEqual(Object v1, Object v2); + public abstract int computeStart(int rowIdx, PTFPartition p) throws HiveException; public abstract int computeEnd(int rowIdx, PTFPartition p) throws HiveException; + /** + * Checks and maintains cache content - optimizes cache window to always be around current row + * thereby makes it follow the current progress. + * @param rowIdx current row + * @param p current partition for the PTF operator + * @throws HiveException + */ + public void handleCache(int rowIdx, PTFPartition p) throws HiveException { + BoundaryCache cache = p.getBoundaryCache(); + if (cache == null) { + return; + } + + //Start of partition + if (rowIdx == 0) { + cache.clear(); + } + if (cache.isComplete()) { + return; + } + + int cachePos = cache.approxCachePositionOf(rowIdx); + + if (cache.isEmpty()) { + fillCacheUntilEndOrFull(rowIdx, p); + } else if (cachePos > 50 && cachePos <= 75) { + if (!start.isPreceding() && end.isFollowing()) { + cache.evictHalf(); + fillCacheUntilEndOrFull(rowIdx, p); + } + } else if (cachePos > 75 && cachePos <= 95) { + if (start.isPreceding() && end.isFollowing()) { + cache.evictHalf(); + fillCacheUntilEndOrFull(rowIdx, p); + } + } else if (cachePos >= 95) { + if (start.isPreceding() && !end.isFollowing()) { + cache.evictHalf(); + fillCacheUntilEndOrFull(rowIdx, p); + } + + } + } + + /** + * Inserts values into cache starting from rowIdx in the current partition p. Stops if cache + * reaches its maximum size or we get out of rows in p. + * @param rowIdx + * @param p + * @throws HiveException + */ + private void fillCacheUntilEndOrFull(int rowIdx, PTFPartition p) throws HiveException { + BoundaryCache cache = p.getBoundaryCache(); + if (cache == null || p.size() <= 0) { + return; + } + + //If we continue building cache + Map.Entry<Integer, Object> ceilingEntry = cache.getMaxEntry(); + if (ceilingEntry != null) { + rowIdx = ceilingEntry.getKey(); + } + + Object rowVal = null; + Object lastRowVal = null; + + while (rowIdx < p.size()) { + rowVal = computeValue(p.getAt(rowIdx)); + if (!isEqual(rowVal, lastRowVal)){ + if (!cache.putIfNotFull(rowIdx, rowVal)){ + break; + } + } + lastRowVal = rowVal; + ++rowIdx; + + } + //Signaling end of all rows in a partition + if (cache.putIfNotFull(rowIdx, null)) { + cache.setComplete(true); + } + } + + /** + * Uses cache content to jump backwards if possible. If not, it steps one back. + * @param r + * @param p + * @return pair of (row we stepped/jumped onto ; row value at this position) + * @throws HiveException + */ + protected Pair<Integer, Object> skipOrStepBack(int r, PTFPartition p) + throws HiveException { + Object rowVal = null; + BoundaryCache cache = p.getBoundaryCache(); + + Map.Entry<Integer, Object> floorEntry = null; + Map.Entry<Integer, Object> ceilingEntry = null; + + if (cache != null) { + floorEntry = cache.floorEntry(r); + ceilingEntry = cache.ceilingEntry(r); + } + + if (floorEntry != null && ceilingEntry != null) { + r = floorEntry.getKey() - 1; + floorEntry = cache.floorEntry(r); + if (floorEntry != null) { + rowVal = floorEntry.getValue(); + } else if (r >= 0){ + rowVal = computeValue(p.getAt(r)); + } + } else { + r--; + if (r >= 0) { + rowVal = computeValue(p.getAt(r)); + } + } + return new ImmutablePair<>(r, rowVal); + } + + /** + * Uses cache content to jump forward if possible. If not, it steps one forward. + * @param r + * @param p + * @return pair of (row we stepped/jumped onto ; row value at this position) + * @throws HiveException + */ + protected Pair<Integer, Object> skipOrStepForward(int r, PTFPartition p) + throws HiveException { + Object rowVal = null; + BoundaryCache cache = p.getBoundaryCache(); + + Map.Entry<Integer, Object> floorEntry = null; + Map.Entry<Integer, Object> ceilingEntry = null; + + if (cache != null) { + floorEntry = cache.floorEntry(r); + ceilingEntry = cache.ceilingEntry(r); + } + + if (ceilingEntry != null && ceilingEntry.getKey().equals(r)){ + ceilingEntry = cache.ceilingEntry(r + 1); + } + if (floorEntry != null && ceilingEntry != null) { + r = ceilingEntry.getKey(); + rowVal = ceilingEntry.getValue(); + } else { + r++; + if (r < p.size()) { + rowVal = computeValue(p.getAt(r)); + } + } + return new ImmutablePair<>(r, rowVal); + } + + /** + * Uses cache to lookup row value. Computes it on the fly on cache miss. + * @param r + * @param p + * @return row value. + * @throws HiveException + */ + protected Object computeValueUseCache(int r, PTFPartition p) throws HiveException { + BoundaryCache cache = p.getBoundaryCache(); Review comment: Looks like copy-pate. Might worth to put to a separate method? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 199295) Time Spent: 0.5h (was: 20m) > Optimize range calculation for PTF > ---------------------------------- > > Key: HIVE-21217 > URL: https://issues.apache.org/jira/browse/HIVE-21217 > Project: Hive > Issue Type: Improvement > Reporter: Adam Szita > Assignee: Adam Szita > Priority: Major > Labels: pull-request-available > Attachments: HIVE-21217.0.patch, HIVE-21217.1.patch, > HIVE-21217.2.patch > > Time Spent: 0.5h > Remaining Estimate: 0h > > During window function execution Hive has to iterate on neighbouring rows of > the current row to find the beginning and end of the proper range (on which > the aggregation will be executed). > When we're using range based windows and have many rows with a certain key > value this can take a lot of time. (e.g. partition size of 80M, in which we > have 2 ranges of 40M rows according to the orderby column: within these 40M > rowsets we're doing 40M x 40M/2 steps.. which is of n^2 time complexity) > I propose to introduce a cache that keeps track of already calculated range > ends so it can be reused in future scans. -- This message was sent by Atlassian JIRA (v7.6.3#76005)