[ 
https://issues.apache.org/jira/browse/TAJO-774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14034901#comment-14034901
 ] 

ASF GitHub Bot commented on TAJO-774:
-------------------------------------

Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/13#discussion_r13901665
  
    --- Diff: 
tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
 ---
    @@ -0,0 +1,336 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.engine.planner.physical;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.tajo.catalog.Column;
    +import org.apache.tajo.catalog.Schema;
    +import org.apache.tajo.catalog.SortSpec;
    +import org.apache.tajo.datum.Datum;
    +import org.apache.tajo.engine.eval.WindowFunctionEval;
    +import org.apache.tajo.engine.function.FunctionContext;
    +import org.apache.tajo.engine.planner.logical.WindowAggNode;
    +import org.apache.tajo.engine.planner.logical.WindowSpec;
    +import org.apache.tajo.storage.Tuple;
    +import org.apache.tajo.storage.TupleComparator;
    +import org.apache.tajo.storage.VTuple;
    +import org.apache.tajo.worker.TaskAttemptContext;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * The sort-based window aggregation operator
    + */
    +public class WindowAggExec extends UnaryPhysicalExec {
    +  // plan information
    +  protected final int outputColumnNum;
    +  protected final int nonFunctionColumnNum;
    +  protected final int nonFunctionColumns[];
    +
    +  protected final int functionNum;
    +  protected final WindowFunctionEval functions[];
    +
    +  protected Schema schemaForOrderBy;
    +  protected int sortKeyColumns[];
    +  protected final boolean hasPartitionKeys;
    +  protected final int partitionKeyNum;
    +  protected final int partitionKeyIds[];
    +
    +  // for evaluation
    +  protected FunctionContext contexts [];
    +  protected Tuple lastKey = null;
    +  protected boolean noMoreTuples = false;
    +  private boolean [] orderedFuncFlags;
    +  private boolean [] aggFuncFlags;
    +  private boolean [] windowFuncFlags;
    +  private boolean [] endUnboundedFollowingFlags;
    +  private boolean [] endCurrentRowFlags;
    +
    +  private boolean endCurrentRow = false;
    +
    +  // operator state
    +  enum WindowState {
    +    NEW_WINDOW,
    +    ACCUMULATING_WINDOW,
    +    EVALUATION,
    +    RETRIEVING_FROM_WINDOW,
    +    END_OF_TUPLE
    +  }
    +
    +  // Transient state
    +  boolean firstTime = true;
    +  List<Tuple> evaluatedTuples = null;
    +  List<Tuple> accumulatedInTuples = null;
    +  List<Tuple> nextAccumulatedProjected = null;
    +  List<Tuple> nextAccumulatedInTuples = null;
    +  WindowState state = WindowState.NEW_WINDOW;
    +  Iterator<Tuple> tupleInFrameIterator = null;
    +
    +  public WindowAggExec(TaskAttemptContext context, WindowAggNode plan, 
PhysicalExec child) throws IOException {
    +    super(context, plan.getInSchema(), plan.getOutSchema(), child);
    +
    +    if (plan.hasPartitionKeys()) {
    +      final Column[] keyColumns = plan.getPartitionKeys();
    +      partitionKeyNum = keyColumns.length;
    +      partitionKeyIds = new int[partitionKeyNum];
    +      Column col;
    +      for (int idx = 0; idx < plan.getPartitionKeys().length; idx++) {
    +        col = keyColumns[idx];
    +        partitionKeyIds[idx] = 
inSchema.getColumnId(col.getQualifiedName());
    +      }
    +      hasPartitionKeys = true;
    +    } else {
    +      partitionKeyNum = 0;
    +      partitionKeyIds = null;
    +      hasPartitionKeys = false;
    +    }
    +
    +    if (plan.hasAggFunctions()) {
    +      functions = plan.getWindowFunctions();
    +      functionNum = functions.length;
    +
    +      orderedFuncFlags = new boolean[functions.length];
    +      windowFuncFlags = new boolean[functions.length];
    +      aggFuncFlags = new boolean[functions.length];
    +
    +      endUnboundedFollowingFlags = new boolean[functions.length];
    +      endCurrentRowFlags = new boolean[functions.length];
    +
    +      List<Column> additionalSortKeyColumns = Lists.newArrayList();
    +      Schema rewrittenSchema = new Schema(outSchema);
    +      for (int i = 0; i < functions.length; i++) {
    +        WindowSpec.WindowEndBound endBound = 
functions[i].getWindowFrame().getEndBound();
    +        switch (endBound.getBoundType()) {
    +        case CURRENT_ROW:
    +          endCurrentRowFlags[i] = true; break;
    +        case UNBOUNDED_FOLLOWING:
    +          endUnboundedFollowingFlags[i] = true; break;
    +        default:
    +        }
    +
    +        switch (functions[i].getFuncDesc().getFuncType()) {
    +        case AGGREGATION:
    +        case DISTINCT_AGGREGATION:
    +          aggFuncFlags[i] = true; break;
    +        case WINDOW:
    +          windowFuncFlags[i] = true; break;
    +        default:
    +        }
    +
    +        if (functions[i].hasSortSpecs()) {
    +          orderedFuncFlags[i] = true;
    +
    +          for (SortSpec sortSpec : functions[i].getSortSpecs()) {
    +            if (!rewrittenSchema.contains(sortSpec.getSortKey())) {
    +              additionalSortKeyColumns.add(sortSpec.getSortKey());
    +            }
    +          }
    +        }
    +      }
    +
    +      sortKeyColumns = new int[additionalSortKeyColumns.size()];
    +      schemaForOrderBy = new Schema(outSchema);
    +      for (int i = 0; i < additionalSortKeyColumns.size(); i++) {
    +        sortKeyColumns[i] = i;
    +        schemaForOrderBy.addColumn(additionalSortKeyColumns.get(i));
    +      }
    +    } else {
    +      functions = new WindowFunctionEval[0];
    +      functionNum = 0;
    +      schemaForOrderBy = outSchema;
    +    }
    +
    +
    +    nonFunctionColumnNum = plan.getTargets().length - functionNum;
    +    nonFunctionColumns = new int[nonFunctionColumnNum];
    +    for (int idx = 0; idx < plan.getTargets().length - functionNum; idx++) 
{
    +      nonFunctionColumns[idx] = 
inSchema.getColumnId(plan.getTargets()[idx].getCanonicalName());
    +    }
    +
    +    outputColumnNum = nonFunctionColumnNum + functionNum;
    +  }
    +
    +  private void transition(WindowState state) {
    +    this.state = state;
    +  }
    +
    +  @Override
    +  public Tuple next() throws IOException {
    +    Tuple currentKey = null;
    +    Tuple readTuple = null;
    +
    +    while(!context.isStopped() && state != WindowState.END_OF_TUPLE) {
    +
    +      if (state == WindowState.NEW_WINDOW) {
    +        initWindow();
    +        transition(WindowState.ACCUMULATING_WINDOW);
    +      }
    +
    +      if (state != WindowState.RETRIEVING_FROM_WINDOW) { // read an input 
tuple and build a partition key
    +        readTuple = child.next();
    +
    +        if (readTuple == null) { // the end of tuple
    +          noMoreTuples = true;
    +          transition(WindowState.EVALUATION);
    +        }
    +
    +        if (readTuple != null && hasPartitionKeys) { // get a key tuple
    +          currentKey = new VTuple(partitionKeyIds.length);
    +          for (int i = 0; i < partitionKeyIds.length; i++) {
    +            currentKey.put(i, readTuple.get(partitionKeyIds[i]));
    +          }
    +        }
    +      }
    +
    +      if (state == WindowState.ACCUMULATING_WINDOW) {
    +        accumulatingWindow(currentKey, readTuple);
    +      }
    +
    +      if (state == WindowState.EVALUATION) {
    +        evaluationWindowFrame();
    +
    +        tupleInFrameIterator = evaluatedTuples.iterator();
    +        transition(WindowState.RETRIEVING_FROM_WINDOW);
    +      }
    +
    +      if (state == WindowState.RETRIEVING_FROM_WINDOW) {
    +        if (tupleInFrameIterator.hasNext()) {
    +          return tupleInFrameIterator.next();
    +        } else {
    +          finalizeWindow();
    +        }
    +      }
    +    }
    +
    +    return null;
    +  }
    +
    +  private void initWindow() {
    +    if (firstTime) {
    +      accumulatedInTuples = Lists.newArrayList();
    +
    +      contexts = new FunctionContext[functionNum];
    +      for(int evalIdx = 0; evalIdx < functionNum; evalIdx++) {
    +        contexts[evalIdx] = functions[evalIdx].newContext();
    +      }
    +      firstTime = false;
    +    }
    +  }
    +
    +  private void accumulatingWindow(Tuple currentKey, Tuple inTuple) {
    +    if (lastKey == null || lastKey.equals(currentKey)) {
    +      accumulatedInTuples.add(new VTuple(inTuple));
    +
    +    } else {
    +      preAccumulatingNextWindow(inTuple);
    +      transition(WindowState.EVALUATION);
    +    }
    +
    +    lastKey = currentKey;
    +  }
    +
    +  private void preAccumulatingNextWindow(Tuple inTuple) {
    +    Tuple projectedTuple = new VTuple(outSchema.size());
    +    for(int idx = 0; idx < nonFunctionColumnNum; idx++) {
    +      projectedTuple.put(idx, inTuple.get(nonFunctionColumns[idx]));
    +    }
    +    nextAccumulatedProjected = Lists.newArrayList();
    +    nextAccumulatedProjected.add(projectedTuple);
    +    nextAccumulatedInTuples = Lists.newArrayList();
    +    nextAccumulatedInTuples.add(new VTuple(inTuple));
    +  }
    +
    --- End diff --
    
    Would you add some description of this function?


> Implement logical plan part and physical executor for window function.
> ----------------------------------------------------------------------
>
>                 Key: TAJO-774
>                 URL: https://issues.apache.org/jira/browse/TAJO-774
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: planner/optimizer
>            Reporter: Hyunsik Choi
>            Assignee: Hyunsik Choi
>             Fix For: 0.9.0
>
>
> See the title. The main objective of this issue is to implement the logical 
> planning part for window function support.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to