[ https://issues.apache.org/jira/browse/HIVE-23365?focusedWorklogId=437920&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-437920 ]
ASF GitHub Bot logged work on HIVE-23365: ----------------------------------------- Author: ASF GitHub Bot Created on: 27/May/20 17:18 Start Date: 27/May/20 17:18 Worklog Time Spent: 10m Work Description: jcamachor commented on a change in pull request #1035: URL: https://github.com/apache/hive/pull/1035#discussion_r431276400 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java ########## @@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR * If parent RS has not been assigned any partitioning column, we will use * partitioning columns (if exist) of child RS. */ - public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) + public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) throws SemanticException { int[] result = extractMergeDirections(cRS, pRS, minReducer); if (result == null) { return false; } + // The partitioning columns of the child RS will replace the columns of the + // parent RS in two cases: + // - Parent RS columns are more specific than those of the child RS, + // and child columns are assigned; + // - Child RS columns are more specific than those of the parent RS, + // and parent columns are not assigned. + List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols(); + List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols(); + boolean useChildsPartitionColumns = + result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) || + result[1] > 0 && (parentPCs == null || parentPCs.isEmpty()); + + if (useChildsPartitionColumns) { + List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS); + long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs); + long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols); + long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD); + if (oldParallelism / newParallelism > threshold) { + return false; + } Review comment: I think you are right, adding the check using the existing config seems to be the correct approach. We could still add on/off config for the new behavior optimization (default true... but in case we need to disable it). Could you make those changes? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java ########## @@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR * If parent RS has not been assigned any partitioning column, we will use * partitioning columns (if exist) of child RS. */ - public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) + public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer) throws SemanticException { int[] result = extractMergeDirections(cRS, pRS, minReducer); if (result == null) { return false; } + // The partitioning columns of the child RS will replace the columns of the + // parent RS in two cases: + // - Parent RS columns are more specific than those of the child RS, + // and child columns are assigned; + // - Child RS columns are more specific than those of the parent RS, + // and parent columns are not assigned. + List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols(); + List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols(); + boolean useChildsPartitionColumns = + result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) || + result[1] > 0 && (parentPCs == null || parentPCs.isEmpty()); + + if (useChildsPartitionColumns) { + List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS); + long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs); + long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols); + long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD); + if (oldParallelism / newParallelism > threshold) { + return false; Review comment: Do you think it makes sense to add these checks to the `extractMergeDirections` method? It seems the rest of checks are done within that method; if `extractMergeDirections` was successful, this method was only modifying the operators accordingly. I think keeping that separation may make the code more clear. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 437920) Time Spent: 0.5h (was: 20m) > Put RS deduplication optimization under cost based decision > ----------------------------------------------------------- > > Key: HIVE-23365 > URL: https://issues.apache.org/jira/browse/HIVE-23365 > Project: Hive > Issue Type: Improvement > Components: Physical Optimizer > Reporter: Jesus Camacho Rodriguez > Assignee: Stamatis Zampetakis > Priority: Major > Labels: pull-request-available > Attachments: HIVE-23365.01.patch, HIVE-23365.02.patch, > HIVE-23365.03.patch, HIVE-23365.04.patch > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently, RS deduplication is always executed whenever it is semantically > correct. However, it could be beneficial to leave both RS operators in the > plan, e.g., if the NDV of the second RS is very low. Thus, we would like this > decision to be cost-based. We could use a simple heuristic that would work > fine for most of the cases without introducing regressions for existing > cases, e.g., if NDV for partition column is less than estimated parallelism > in the second RS, do not execute deduplication. -- This message was sent by Atlassian Jira (v8.3.4#803005)