dengzhhu653 commented on a change in pull request #1070:
URL: https://github.com/apache/hive/pull/1070#discussion_r437062135
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
##########
@@ -733,6 +736,97 @@ private void applyFilterTransitivity(JoinOperator join,
int targetPos, OpWalkerI
}
}
+ public static class GroupByPPD extends DefaultPPD implements
SemanticNodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ super.process(nd, stack, procCtx, nodeOutputs);
+ OpWalkerInfo owi = (OpWalkerInfo) procCtx;
+ GroupByDesc groupByDesc = ((GroupByOperator)nd).getConf();
+ ExprWalkerInfo prunedPred = owi.getPrunedPreds((Operator<? extends
OperatorDesc>) nd);
+ if (prunedPred == null || !prunedPred.hasAnyCandidates() ||
+ !groupByDesc.isGroupingSetsPresent()) {
+ return null;
+ }
+
+ List<Long> groupingSets = groupByDesc.getListGroupingSets();
+ Map<String, List<ExprNodeDesc>> candidates =
prunedPred.getFinalCandidates();
+ FastBitSet[] fastBitSets = new FastBitSet[groupingSets.size()];
+ int groupingSetPosition = groupByDesc.getGroupingSetPosition();
+ for (int pos = 0; pos < fastBitSets.length; pos ++) {
+ fastBitSets[pos] =
GroupByOperator.groupingSet2BitSet(groupingSets.get(pos),
+ groupingSetPosition);
+ }
+ List<ExprNodeDesc> groupByKeys =
((GroupByOperator)nd).getConf().getKeys();
+ Map<ExprNodeDesc, ExprNodeDesc> newToOldExprMap =
prunedPred.getNewToOldExprMap();
+ Map<String, List<ExprNodeDesc>> nonFinalCandidates = new HashMap<String,
List<ExprNodeDesc>>();
+ for (Iterator<Map.Entry<String, List<ExprNodeDesc>>>
Review comment:
Done, Thanks @jcamachor for the review!
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
##########
@@ -733,6 +736,97 @@ private void applyFilterTransitivity(JoinOperator join,
int targetPos, OpWalkerI
}
}
+ public static class GroupByPPD extends DefaultPPD implements
SemanticNodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ super.process(nd, stack, procCtx, nodeOutputs);
+ OpWalkerInfo owi = (OpWalkerInfo) procCtx;
+ GroupByDesc groupByDesc = ((GroupByOperator)nd).getConf();
+ ExprWalkerInfo prunedPred = owi.getPrunedPreds((Operator<? extends
OperatorDesc>) nd);
+ if (prunedPred == null || !prunedPred.hasAnyCandidates() ||
+ !groupByDesc.isGroupingSetsPresent()) {
+ return null;
+ }
+
+ List<Long> groupingSets = groupByDesc.getListGroupingSets();
+ Map<String, List<ExprNodeDesc>> candidates =
prunedPred.getFinalCandidates();
+ FastBitSet[] fastBitSets = new FastBitSet[groupingSets.size()];
+ int groupingSetPosition = groupByDesc.getGroupingSetPosition();
+ for (int pos = 0; pos < fastBitSets.length; pos ++) {
+ fastBitSets[pos] =
GroupByOperator.groupingSet2BitSet(groupingSets.get(pos),
+ groupingSetPosition);
+ }
+ List<ExprNodeDesc> groupByKeys =
((GroupByOperator)nd).getConf().getKeys();
+ Map<ExprNodeDesc, ExprNodeDesc> newToOldExprMap =
prunedPred.getNewToOldExprMap();
+ Map<String, List<ExprNodeDesc>> nonFinalCandidates = new HashMap<String,
List<ExprNodeDesc>>();
+ for (Iterator<Map.Entry<String, List<ExprNodeDesc>>>
+ iter = candidates.entrySet().iterator(); iter.hasNext(); ) {
+ Map.Entry<String, List<ExprNodeDesc>> entry = iter.next();
+ List<ExprNodeDesc> residualExprs = new ArrayList<ExprNodeDesc>();
+ List<ExprNodeDesc> finalCandidates = new ArrayList<ExprNodeDesc>();
+ List<ExprNodeDesc> exprs = entry.getValue();
+ for (ExprNodeDesc expr : exprs) {
+ if (canPredPushdown(expr, groupByKeys, fastBitSets,
groupingSetPosition)) {
+ finalCandidates.add(expr);
+ } else {
+ residualExprs.add(newToOldExprMap.get(expr));
+ }
+ }
+ if (!residualExprs.isEmpty()) {
+ nonFinalCandidates.put(entry.getKey(), residualExprs);
+ }
+
+ if (finalCandidates.isEmpty()) {
+ iter.remove();
+ } else {
+ exprs.clear();
+ exprs.addAll(finalCandidates);
+ }
+ }
+
+ if (!nonFinalCandidates.isEmpty()) {
+ createFilter((Operator) nd, nonFinalCandidates, owi);
+ }
+ return null;
+ }
+
+ private boolean canPredPushdown(ExprNodeDesc expr, List<ExprNodeDesc>
groupByKeys,
+ FastBitSet[] bitSets, int groupingSetPosition) {
+ List<ExprNodeDesc> columns = new ArrayList<ExprNodeDesc>();
+ extractCols(expr, columns);
+ for (ExprNodeDesc col : columns) {
+ int index = groupByKeys.indexOf(col);
+ assert index >= 0;
+ for (FastBitSet bitset : bitSets) {
+ int keyPos = bitset.nextClearBit(0);
+ for (; keyPos < groupingSetPosition && keyPos != index;
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]