[jira] [Work logged] (HIVE-23365) Put RS deduplication optimization under cost based decision

2020-06-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-23365?focusedWorklogId=439774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-439774
 ]

ASF GitHub Bot logged work on HIVE-23365:
-

Author: ASF GitHub Bot
Created on: 01/Jun/20 22:26
Start Date: 01/Jun/20 22:26
Worklog Time Spent: 10m 
  Work Description: asfgit closed pull request #1035:
URL: https://github.com/apache/hive/pull/1035


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 439774)
Time Spent: 1h  (was: 50m)

> Put RS deduplication optimization under cost based decision
> ---
>
> Key: HIVE-23365
> URL: https://issues.apache.org/jira/browse/HIVE-23365
> Project: Hive
>  Issue Type: Improvement
>  Components: Physical Optimizer
>Reporter: Jesus Camacho Rodriguez
>Assignee: Stamatis Zampetakis
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-23365.01.patch, HIVE-23365.02.patch, 
> HIVE-23365.03.patch, HIVE-23365.04.patch, HIVE-23365.05.patch
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently, RS deduplication is always executed whenever it is semantically 
> correct. However, it could be beneficial to leave both RS operators in the 
> plan, e.g., if the NDV of the second RS is very low. Thus, we would like this 
> decision to be cost-based. We could use a simple heuristic that would work 
> fine for most of the cases without introducing regressions for existing 
> cases, e.g., if NDV for partition column is less than estimated parallelism 
> in the second RS, do not execute deduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-23365) Put RS deduplication optimization under cost based decision

2020-06-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-23365?focusedWorklogId=439759=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-439759
 ]

ASF GitHub Bot logged work on HIVE-23365:
-

Author: ASF GitHub Bot
Created on: 01/Jun/20 21:48
Start Date: 01/Jun/20 21:48
Worklog Time Spent: 10m 
  Work Description: zabetak commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r433508844



##
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##
@@ -197,6 +186,27 @@ public static boolean merge(ReduceSinkOperator cRS, 
ReduceSinkOperator pRS, int
 return true;
   }
 
+  private static long estimateReducers(HiveConf conf, ReduceSinkOperator rs) {
+// TODO: Check if we can somehow exploit the logic in SetReducerParallelism
+if (rs.getConf().getNumReducers() > 0) {
+  return rs.getConf().getNumReducers();
+}
+int constantReducers = conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+if (constantReducers > 0) {
+  return constantReducers;
+}
+long inputTotalBytes = 0;
+List> rsSiblings = 
rs.getChildOperators().get(0).getParentOperators();
+for (Operator sibling : rsSiblings) {
+  if (sibling.getStatistics() != null) {

Review comment:
   In the caller of this method, before checking if the new parallelism is 
very low, we are checking if the parallelism changes (`newParallelism < 
oldParallelism`). If the parallelism does not change then we proceed in the 
deduplication as usual. If we don't have stats then the snippet below will 
return 1. The same will happen for both parent RS and child RS so the 
parallelism does not change since `1 < 1` returns false. 
   
   If you prefer to have an explicit check for the presence/absence of stats I 
can try to add it. Let me know.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 439759)
Time Spent: 50m  (was: 40m)

> Put RS deduplication optimization under cost based decision
> ---
>
> Key: HIVE-23365
> URL: https://issues.apache.org/jira/browse/HIVE-23365
> Project: Hive
>  Issue Type: Improvement
>  Components: Physical Optimizer
>Reporter: Jesus Camacho Rodriguez
>Assignee: Stamatis Zampetakis
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-23365.01.patch, HIVE-23365.02.patch, 
> HIVE-23365.03.patch, HIVE-23365.04.patch, HIVE-23365.05.patch
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Currently, RS deduplication is always executed whenever it is semantically 
> correct. However, it could be beneficial to leave both RS operators in the 
> plan, e.g., if the NDV of the second RS is very low. Thus, we would like this 
> decision to be cost-based. We could use a simple heuristic that would work 
> fine for most of the cases without introducing regressions for existing 
> cases, e.g., if NDV for partition column is less than estimated parallelism 
> in the second RS, do not execute deduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-23365) Put RS deduplication optimization under cost based decision

2020-06-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-23365?focusedWorklogId=439593=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-439593
 ]

ASF GitHub Bot logged work on HIVE-23365:
-

Author: ASF GitHub Bot
Created on: 01/Jun/20 16:34
Start Date: 01/Jun/20 16:34
Worklog Time Spent: 10m 
  Work Description: jcamachor commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r433349779



##
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##
@@ -197,6 +186,27 @@ public static boolean merge(ReduceSinkOperator cRS, 
ReduceSinkOperator pRS, int
 return true;
   }
 
+  private static long estimateReducers(HiveConf conf, ReduceSinkOperator rs) {
+// TODO: Check if we can somehow exploit the logic in SetReducerParallelism
+if (rs.getConf().getNumReducers() > 0) {
+  return rs.getConf().getNumReducers();
+}
+int constantReducers = conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+if (constantReducers > 0) {
+  return constantReducers;
+}
+long inputTotalBytes = 0;
+List> rsSiblings = 
rs.getChildOperators().get(0).getParentOperators();
+for (Operator sibling : rsSiblings) {
+  if (sibling.getStatistics() != null) {

Review comment:
   I am wondering, if you do not have stats available for any of the 
siblings, we should probably skip the parallelism check, and thus, fallback to 
previous behavior.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 439593)
Time Spent: 40m  (was: 0.5h)

> Put RS deduplication optimization under cost based decision
> ---
>
> Key: HIVE-23365
> URL: https://issues.apache.org/jira/browse/HIVE-23365
> Project: Hive
>  Issue Type: Improvement
>  Components: Physical Optimizer
>Reporter: Jesus Camacho Rodriguez
>Assignee: Stamatis Zampetakis
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-23365.01.patch, HIVE-23365.02.patch, 
> HIVE-23365.03.patch, HIVE-23365.04.patch, HIVE-23365.05.patch
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently, RS deduplication is always executed whenever it is semantically 
> correct. However, it could be beneficial to leave both RS operators in the 
> plan, e.g., if the NDV of the second RS is very low. Thus, we would like this 
> decision to be cost-based. We could use a simple heuristic that would work 
> fine for most of the cases without introducing regressions for existing 
> cases, e.g., if NDV for partition column is less than estimated parallelism 
> in the second RS, do not execute deduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-23365) Put RS deduplication optimization under cost based decision

2020-05-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-23365?focusedWorklogId=437920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-437920
 ]

ASF GitHub Bot logged work on HIVE-23365:
-

Author: ASF GitHub Bot
Created on: 27/May/20 17:18
Start Date: 27/May/20 17:18
Worklog Time Spent: 10m 
  Work Description: jcamachor commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r431276400



##
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, 
JoinOperator pJoin, int minR
* If parent RS has not been assigned any partitioning column, we will use
* partitioning columns (if exist) of child RS.
*/
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, 
int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, 
ReduceSinkOperator pRS, int minReducer)
   throws SemanticException {
 int[] result = extractMergeDirections(cRS, pRS, minReducer);
 if (result == null) {
   return false;
 }
 
+// The partitioning columns of the child RS will replace the columns of the
+// parent RS in two cases:
+// - Parent RS columns are more specific than those of the child RS,
+// and child columns are assigned;
+// - Child RS columns are more specific than those of the parent RS,
+// and parent columns are not assigned.
+List childPCs = cRS.getConf().getPartitionCols();
+List parentPCs = pRS.getConf().getPartitionCols();
+boolean useChildsPartitionColumns =
+result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+if (useChildsPartitionColumns) {
+  List newPartitionCols = 
ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+  long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+  long newParallelism = estimateMaxPartitions(hiveConf, pRS, 
newPartitionCols);
+  long threshold = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+  if (oldParallelism / newParallelism > threshold) {
+return false;
+  }

Review comment:
   I think you are right, adding the check using the existing config seems 
to be the correct approach. We could still add on/off config for the new 
behavior optimization (default true... but in case we need to disable it). 
Could you make those changes?

##
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, 
JoinOperator pJoin, int minR
* If parent RS has not been assigned any partitioning column, we will use
* partitioning columns (if exist) of child RS.
*/
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, 
int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, 
ReduceSinkOperator pRS, int minReducer)
   throws SemanticException {
 int[] result = extractMergeDirections(cRS, pRS, minReducer);
 if (result == null) {
   return false;
 }
 
+// The partitioning columns of the child RS will replace the columns of the
+// parent RS in two cases:
+// - Parent RS columns are more specific than those of the child RS,
+// and child columns are assigned;
+// - Child RS columns are more specific than those of the parent RS,
+// and parent columns are not assigned.
+List childPCs = cRS.getConf().getPartitionCols();
+List parentPCs = pRS.getConf().getPartitionCols();
+boolean useChildsPartitionColumns =
+result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+if (useChildsPartitionColumns) {
+  List newPartitionCols = 
ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+  long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+  long newParallelism = estimateMaxPartitions(hiveConf, pRS, 
newPartitionCols);
+  long threshold = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+  if (oldParallelism / newParallelism > threshold) {
+return false;

Review comment:
   Do you think it makes sense to add these checks to the 
`extractMergeDirections` method? It seems the rest of checks are done within 
that method; if `extractMergeDirections` was successful, this method was only 
modifying the operators accordingly. I think keeping that separation may make 
the code more clear.





This is an automated message from the 

[jira] [Work logged] (HIVE-23365) Put RS deduplication optimization under cost based decision

2020-05-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-23365?focusedWorklogId=437802=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-437802
 ]

ASF GitHub Bot logged work on HIVE-23365:
-

Author: ASF GitHub Bot
Created on: 27/May/20 14:21
Start Date: 27/May/20 14:21
Worklog Time Spent: 10m 
  Work Description: zabetak commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r431170080



##
File path: 
ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, 
JoinOperator pJoin, int minR
* If parent RS has not been assigned any partitioning column, we will use
* partitioning columns (if exist) of child RS.
*/
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, 
int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, 
ReduceSinkOperator pRS, int minReducer)
   throws SemanticException {
 int[] result = extractMergeDirections(cRS, pRS, minReducer);
 if (result == null) {
   return false;
 }
 
+// The partitioning columns of the child RS will replace the columns of the
+// parent RS in two cases:
+// - Parent RS columns are more specific than those of the child RS,
+// and child columns are assigned;
+// - Child RS columns are more specific than those of the parent RS,
+// and parent columns are not assigned.
+List childPCs = cRS.getConf().getPartitionCols();
+List parentPCs = pRS.getConf().getPartitionCols();
+boolean useChildsPartitionColumns =
+result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+if (useChildsPartitionColumns) {
+  List newPartitionCols = 
ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+  long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+  long newParallelism = estimateMaxPartitions(hiveConf, pRS, 
newPartitionCols);
+  long threshold = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+  if (oldParallelism / newParallelism > threshold) {
+return false;
+  }

Review comment:
   If we don't care about comparing parallelism before/after we could 
possibly use the existing `hive.optimize.reducededuplication.min.reducer` 
config parameter and not introduce a new one. 
   ```
   long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
   if (newParallelism < minReducer) {
 return false;
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 437802)
Time Spent: 20m  (was: 10m)

> Put RS deduplication optimization under cost based decision
> ---
>
> Key: HIVE-23365
> URL: https://issues.apache.org/jira/browse/HIVE-23365
> Project: Hive
>  Issue Type: Improvement
>  Components: Physical Optimizer
>Reporter: Jesus Camacho Rodriguez
>Assignee: Stamatis Zampetakis
>Priority: Major
>  Labels: pull-request-available
> Attachments: HIVE-23365.01.patch, HIVE-23365.02.patch, 
> HIVE-23365.03.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, RS deduplication is always executed whenever it is semantically 
> correct. However, it could be beneficial to leave both RS operators in the 
> plan, e.g., if the NDV of the second RS is very low. Thus, we would like this 
> decision to be cost-based. We could use a simple heuristic that would work 
> fine for most of the cases without introducing regressions for existing 
> cases, e.g., if NDV for partition column is less than estimated parallelism 
> in the second RS, do not execute deduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (HIVE-23365) Put RS deduplication optimization under cost based decision

2020-05-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HIVE-23365?focusedWorklogId=437553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-437553
 ]

ASF GitHub Bot logged work on HIVE-23365:
-

Author: ASF GitHub Bot
Created on: 27/May/20 04:07
Start Date: 27/May/20 04:07
Worklog Time Spent: 10m 
  Work Description: zabetak opened a new pull request #1035:
URL: https://github.com/apache/hive/pull/1035


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 437553)
Remaining Estimate: 0h
Time Spent: 10m

> Put RS deduplication optimization under cost based decision
> ---
>
> Key: HIVE-23365
> URL: https://issues.apache.org/jira/browse/HIVE-23365
> Project: Hive
>  Issue Type: Improvement
>  Components: Physical Optimizer
>Reporter: Jesus Camacho Rodriguez
>Assignee: Stamatis Zampetakis
>Priority: Major
> Attachments: HIVE-23365.01.patch, HIVE-23365.02.patch, 
> HIVE-23365.03.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, RS deduplication is always executed whenever it is semantically 
> correct. However, it could be beneficial to leave both RS operators in the 
> plan, e.g., if the NDV of the second RS is very low. Thus, we would like this 
> decision to be cost-based. We could use a simple heuristic that would work 
> fine for most of the cases without introducing regressions for existing 
> cases, e.g., if NDV for partition column is less than estimated parallelism 
> in the second RS, do not execute deduplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)