[ 
https://issues.apache.org/jira/browse/TAJO-774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14034870#comment-14034870
 ] 

ASF GitHub Bot commented on TAJO-774:
-------------------------------------

Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/13#discussion_r13900969
  
    --- Diff: 
tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java ---
    @@ -359,62 +415,241 @@ private EvalExprNode 
buildPlanForNoneFromStatement(PlanContext context, Stack<Ex
         return targets;
       }
     
    +  /**
    +   * It checks if all targets of Projectable plan node can be evaluated 
from the child node.
    +   * It can avoid potential errors which possibly occur in physical 
operators.
    +   *
    +   * @param block QueryBlock which includes the Projectable node
    +   * @param projectable Projectable node to be valid
    +   * @throws PlanningException
    +   */
       public static void verifyProjectedFields(QueryBlock block, Projectable 
projectable) throws PlanningException {
    -    if (projectable instanceof ProjectionNode && 
block.hasNode(NodeType.GROUP_BY)) {
    -      for (Target target : projectable.getTargets()) {
    -        Set<Column> columns = 
EvalTreeUtil.findUniqueColumns(target.getEvalTree());
    -        for (Column c : columns) {
    -          if (!projectable.getInSchema().contains(c)) {
    -            throw new PlanningException(c.getQualifiedName()
    -                + " must appear in the GROUP BY clause or be used in an 
aggregate function at node ("
    -                + projectable.getPID() + ")" );
    -          }
    -        }
    -      }
    -    } else  if (projectable instanceof GroupbyNode) {
    +    if (projectable instanceof GroupbyNode) {
           GroupbyNode groupbyNode = (GroupbyNode) projectable;
    -      // It checks if all column references within each target can be 
evaluated with the input schema.
    -      int groupingColumnNum = groupbyNode.getGroupingColumns().length;
    -      for (int i = 0; i < groupingColumnNum; i++) {
    -        Set<Column> columns = 
EvalTreeUtil.findUniqueColumns(groupbyNode.getTargets()[i].getEvalTree());
    -        if (!projectable.getInSchema().containsAll(columns)) {
    -          throw new PlanningException(String.format("Cannot get the 
field(s) \"%s\" at node (%d)",
    -              TUtil.collectionToString(columns), projectable.getPID()));
    +
    +      if (!groupbyNode.isEmptyGrouping()) { // it should be targets 
instead of
    +        int groupingKeyNum = groupbyNode.getGroupingColumns().length;
    +
    +        for (int i = 0; i < groupingKeyNum; i++) {
    +          Target target = groupbyNode.getTargets()[i];
    +          if (groupbyNode.getTargets()[i].getEvalTree().getType() == 
EvalType.FIELD) {
    +            FieldEval grpKeyEvalNode = target.getEvalTree();
    +            if 
(!groupbyNode.getInSchema().contains(grpKeyEvalNode.getColumnRef())) {
    +              throwCannotEvaluateException(projectable, 
grpKeyEvalNode.getName());
    +            }
    +          }
             }
           }
    +
           if (groupbyNode.hasAggFunctions()) {
    -        for (AggregationFunctionCallEval f : 
groupbyNode.getAggFunctions()) {
    -          Set<Column> columns = EvalTreeUtil.findUniqueColumns(f);
    -          for (Column c : columns) {
    -            if (!projectable.getInSchema().contains(c)) {
    -              throw new PlanningException(String.format("Cannot get the 
field \"%s\" at node (%d)",
    -                  c, projectable.getPID()));
    -            }
    +        verifyIfEvalNodesCanBeEvaluated(projectable, 
groupbyNode.getAggFunctions());
    +      }
    +
    +    } else if (projectable instanceof WindowAggNode) {
    +      WindowAggNode windowAggNode = (WindowAggNode) projectable;
    +
    +      if (windowAggNode.hasPartitionKeys()) {
    +        verifyIfColumnCanBeEvaluated(projectable.getInSchema(), 
projectable, windowAggNode.getPartitionKeys());
    +      }
    +
    +      if (windowAggNode.hasAggFunctions()) {
    +        verifyIfEvalNodesCanBeEvaluated(projectable, 
windowAggNode.getWindowFunctions());
    +      }
    +
    +      if (windowAggNode.hasSortSpecs()) {
    +        Column [] sortKeys = 
PlannerUtil.sortSpecsToSchema(windowAggNode.getSortSpecs()).toArray();
    +        verifyIfColumnCanBeEvaluated(projectable.getInSchema(), 
projectable, sortKeys);
    +      }
    +
    +      // verify targets except for function slots
    +      for (int i = 0; i < windowAggNode.getTargets().length - 
windowAggNode.getWindowFunctions().length; i++) {
    +        Target target = windowAggNode.getTargets()[i];
    +        Set<Column> columns = 
EvalTreeUtil.findUniqueColumns(target.getEvalTree());
    +        for (Column c : columns) {
    +          if (!windowAggNode.getInSchema().contains(c)) {
    +            throwCannotEvaluateException(projectable, 
c.getQualifiedName());
               }
             }
           }
    +
         } else if (projectable instanceof RelationNode) {
           RelationNode relationNode = (RelationNode) projectable;
    -      for (Target target : projectable.getTargets()) {
    -        Set<Column> columns = 
EvalTreeUtil.findUniqueColumns(target.getEvalTree());
    -        for (Column c : columns) {
    -          if (!relationNode.getTableSchema().contains(c)) {
    -            throw new PlanningException(String.format("Cannot get the 
field \"%s\" at node (%d)",
    -                c, projectable.getPID()));
    -          }
    +      verifyIfTargetsCanBeEvaluated(relationNode.getTableSchema(), 
(Projectable) relationNode);
    +
    +    } else {
    +      verifyIfTargetsCanBeEvaluated(projectable.getInSchema(), 
projectable);
    +    }
    +  }
    +
    +  public static void verifyIfEvalNodesCanBeEvaluated(Projectable 
projectable, EvalNode[] evalNodes)
    +      throws PlanningException {
    +    for (EvalNode e : evalNodes) {
    +      Set<Column> columns = EvalTreeUtil.findUniqueColumns(e);
    +      for (Column c : columns) {
    +        if (!projectable.getInSchema().contains(c)) {
    +          throwCannotEvaluateException(projectable, c.getQualifiedName());
    +        }
    +      }
    +    }
    +  }
    +
    +  public static void verifyIfTargetsCanBeEvaluated(Schema baseSchema, 
Projectable projectable)
    +      throws PlanningException {
    +    for (Target target : projectable.getTargets()) {
    +      Set<Column> columns = 
EvalTreeUtil.findUniqueColumns(target.getEvalTree());
    +      for (Column c : columns) {
    +        if (!baseSchema.contains(c)) {
    +          throwCannotEvaluateException(projectable, c.getQualifiedName());
             }
           }
    +    }
    +  }
    +
    +  public static void verifyIfColumnCanBeEvaluated(Schema baseSchema, 
Projectable projectable, Column [] columns)
    +      throws PlanningException {
    +    for (Column c : columns) {
    +      if (!baseSchema.contains(c)) {
    +        throwCannotEvaluateException(projectable, c.getQualifiedName());
    +      }
    +    }
    +  }
    +
    +  public static void throwCannotEvaluateException(Projectable projectable, 
String columnName) throws PlanningException {
    +    if (projectable instanceof UnaryNode && ((UnaryNode) 
projectable).getChild().getType() == NodeType.GROUP_BY) {
    +      throw new PlanningException(columnName
    +          + " must appear in the GROUP BY clause or be used in an 
aggregate function at node ("
    +          + projectable.getPID() + ")");
         } else {
    -      for (Target target : projectable.getTargets()) {
    -        Set<Column> columns = 
EvalTreeUtil.findUniqueColumns(target.getEvalTree());
    -        for (Column c : columns) {
    -          if (!projectable.getInSchema().contains(c)) {
    -            throw new PlanningException(String.format("Cannot get the 
field \"%s\" at node (%d)",
    -                c, projectable.getPID()));
    +      throw new PlanningException(String.format("Cannot evaluate the field 
\"%s\" at node (%d)",
    +          columnName, projectable.getPID()));
    +    }
    +  }
    +
    +  private LogicalNode insertWindowAggNode(PlanContext context, LogicalNode 
child, Stack<Expr> stack,
    +                                          String [] referenceNames,
    +                                          
ExprNormalizer.WindowSpecReferences [] windowSpecReferenceses)
    +      throws PlanningException {
    +    LogicalPlan plan = context.plan;
    +    QueryBlock block = context.queryBlock;
    +    WindowAggNode windowAggNode = 
context.plan.createNode(WindowAggNode.class);
    +    if (child.getType() == NodeType.LIMIT) {
    +      LimitNode limitNode = (LimitNode) child;
    +      windowAggNode.setChild(limitNode.getChild());
    +      windowAggNode.setInSchema(limitNode.getChild().getOutSchema());
    +      limitNode.setChild(windowAggNode);
    +    } else if (child.getType() == NodeType.SORT) {
    +      SortNode sortNode = (SortNode) child;
    +      windowAggNode.setChild(sortNode.getChild());
    +      windowAggNode.setInSchema(sortNode.getChild().getOutSchema());
    +      sortNode.setChild(windowAggNode);
    +    } else {
    +      windowAggNode.setChild(child);
    +      windowAggNode.setInSchema(child.getOutSchema());
    +    }
    +
    +    List<String> winFuncRefs = new ArrayList<String>();
    +    List<WindowFunctionEval> winFuncs = new 
ArrayList<WindowFunctionEval>();
    +    List<WindowSpec> rawWindowSpecs = Lists.newArrayList();
    +    for (Iterator<NamedExpr> it = 
block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
    +      NamedExpr rawTarget = it.next();
    +      try {
    +        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, 
context.queryBlock, rawTarget.getExpr());
    +        if (evalNode.getType() == EvalType.WINDOW_FUNCTION) {
    +          winFuncRefs.add(rawTarget.getAlias());
    +          winFuncs.add((WindowFunctionEval) evalNode);
    +          block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), 
evalNode);
    +
    +          // TODO - Later, we also consider the possibility that a window 
function contains only a window name.
    +          rawWindowSpecs.add(((WindowFunctionExpr) 
(rawTarget.getExpr())).getWindowSpec());
    +        }
    +      } catch (VerifyException ve) {
    --- End diff --
    
    Don't we need to handle this exception?


> Implement logical plan part and physical executor for window function.
> ----------------------------------------------------------------------
>
>                 Key: TAJO-774
>                 URL: https://issues.apache.org/jira/browse/TAJO-774
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: planner/optimizer
>            Reporter: Hyunsik Choi
>            Assignee: Hyunsik Choi
>             Fix For: 0.9.0
>
>
> See the title. The main objective of this issue is to implement the logical 
> planning part for window function support.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to