Dmitry Lychagin has submitted this change and it was merged. ( 
https://asterix-gerrit.ics.uci.edu/3369 )

Change subject: [NO ISSUE][COMP] Refactor physical window operator
......................................................................

[NO ISSUE][COMP] Refactor physical window operator

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Create a new physical operator (WindowStreamPOperator)
  for window operators that do not require partition materialization
- Create AbstractWindowPOperator which is now a base
  class for both physical window operators
- Rename WindowSimpleRuntime* to WindowStreamRuntime*

Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3369
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
R 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
R 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
13 files changed, 439 insertions(+), 309 deletions(-)

Approvals:
  Jenkins: Verified; Verified
  Anon. E. Moose (1000171):
  Ali Alsuliman: Looks good to me, approved

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index b26eaca..69eecfd 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -56,8 +56,10 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowStreamPOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 import 
org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
@@ -241,19 +243,24 @@
         }

         @Override
-        public WindowPOperator createWindowPOperator(WindowOperator winOp) 
throws AlgebricksException {
-            boolean partitionMaterialization = winOp.hasNestedPlans() || 
AnalysisUtil.hasFunctionWithProperty(winOp,
-                    
BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
-            boolean frameStartIsMonotonic = AnalysisUtil
-                    
.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), 
winOp.getFrameValueExpressions());
-            boolean frameEndIsMonotonic = 
AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
-                    winOp.getFrameValueExpressions());
-            boolean nestedTrivialAggregates = winOp.hasNestedPlans()
-                    && 
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
-
-            return new WindowPOperator(winOp.getPartitionVarList(), 
partitionMaterialization,
-                    winOp.getOrderColumnList(), frameStartIsMonotonic, 
frameEndIsMonotonic, nestedTrivialAggregates,
-                    
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+        public AbstractWindowPOperator createWindowPOperator(WindowOperator 
winOp) throws AlgebricksException {
+            if (winOp.hasNestedPlans()) {
+                boolean frameStartIsMonotonic = 
AnalysisUtil.isWindowFrameBoundaryMonotonic(
+                        winOp.getFrameStartExpressions(), 
winOp.getFrameValueExpressions());
+                boolean frameEndIsMonotonic = 
AnalysisUtil.isWindowFrameBoundaryMonotonic(
+                        winOp.getFrameEndExpressions(), 
winOp.getFrameValueExpressions());
+                boolean nestedTrivialAggregates =
+                        
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+                return new WindowPOperator(winOp.getPartitionVarList(), 
winOp.getOrderColumnList(),
+                        frameStartIsMonotonic, frameEndIsMonotonic, 
nestedTrivialAggregates,
+                        
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+            } else if (AnalysisUtil.hasFunctionWithProperty(winOp,
+                    
BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
+                return new WindowPOperator(winOp.getPartitionVarList(), 
winOp.getOrderColumnList(), false, false, false,
+                        
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+            } else {
+                return new WindowStreamPOperator(winOp.getPartitionVarList(), 
winOp.getOrderColumnList());
+            }
         }
     }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 6cba1b1..a2c1c33 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -25,7 +25,6 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;

 public class OperatorResourcesComputer {

@@ -146,10 +145,10 @@
     }

     private long getWindowRequiredMemory(WindowOperator op) {
-        WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
         // memory budget configuration only applies to window operators that 
materialize partitions (non-streaming)
         // streaming window operators only need 2 frames: output + 
(conservative estimate) last frame partition columns
-        long memorySize = physOp.isPartitionMaterialization() ? 
windowMemorySize : 2 * frameSize;
+        long memorySize = op.getPhysicalOperator().getOperatorTag() == 
PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
+                : windowMemorySize;
         return getOperatorRequiredMemory(op, memorySize);
     }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
index c0fca94..024a13e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -318,7 +318,7 @@
         WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
         visitInternal(op, true);
         addOutputBuffer(op); // + previous frame
-        if (physOp.isPartitionMaterialization()) {
+        if (physOp.getOperatorTag() == PhysicalOperatorTag.WINDOW) {
             addOutputBuffer(op); // + run frame
         }
         return null;
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
index e452d03..c12faf5 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
@@ -11,7 +11,7 @@
                           -- AGGREGATE  |LOCAL|
                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                         }
-                  -- WINDOW  |PARTITIONED|
+                  -- WINDOW_STREAM  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- STABLE_SORT [$$34(ASC), $$48(ASC)]  |PARTITIONED|
                         -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
index ab78ecc..a1e04ad 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |LOCAL|
       -- ASSIGN  |LOCAL|
         -- WINDOW  |LOCAL|
-          -- WINDOW  |LOCAL|
+          -- WINDOW_STREAM  |LOCAL|
             -- ONE_TO_ONE_EXCHANGE  |LOCAL|
               -- STABLE_SORT [$$m(ASC), $$t(ASC)]  |LOCAL|
                 -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
index 5b3d480..b111336 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
@@ -8,7 +8,7 @@
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- WINDOW  |LOCAL|
+            -- WINDOW_STREAM  |LOCAL|
               -- ONE_TO_ONE_EXCHANGE  |LOCAL|
                 -- STABLE_SORT [$$m(ASC), $$t(ASC)]  |LOCAL|
                   -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 8e1f77f..84d19c1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -78,5 +78,6 @@
     UPDATE,
     WRITE_RESULT,
     INTERSECT,
-    WINDOW
+    WINDOW,
+    WINDOW_STREAM
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
new file mode 100644
index 0000000..7065b70
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+public abstract class AbstractWindowPOperator extends AbstractPhysicalOperator 
{
+
+    private final List<LogicalVariable> partitionColumns;
+
+    private final List<OrderColumn> orderColumns;
+
+    AbstractWindowPOperator(List<LogicalVariable> partitionColumns, 
List<OrderColumn> orderColumns) {
+        this.partitionColumns = partitionColumns;
+        this.orderColumns = orderColumns;
+    }
+
+    @Override
+    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) throws AlgebricksException {
+        IPartitioningProperty pp;
+        switch (op.getExecutionMode()) {
+            case PARTITIONED:
+                pp = new UnorderedPartitionedProperty(new 
ListSet<>(partitionColumns),
+                        context.getComputationNodeDomain());
+                break;
+            case UNPARTITIONED:
+                pp = IPartitioningProperty.UNPARTITIONED;
+                break;
+            case LOCAL:
+                pp = null;
+                break;
+            default:
+                throw new IllegalStateException(op.getExecutionMode().name());
+        }
+
+        // require local order property [pc1, ... pcN, oc1, ... ocN]
+        // accounting for cases where there's an overlap between order and 
partition columns
+        // TODO replace with required local grouping on partition columns + 
local order on order columns
+        List<OrderColumn> lopColumns = new ArrayList<>();
+        ListSet<LogicalVariable> pcVars = new ListSet<>();
+        pcVars.addAll(partitionColumns);
+        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+            OrderColumn oc = orderColumns.get(oIdx);
+            LogicalVariable ocVar = oc.getColumn();
+            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, 
pcVars)) {
+                throw new AlgebricksException(ErrorCode.HYRACKS, 
ErrorCode.UNSUPPORTED_WINDOW_SPEC,
+                        op.getSourceLocation(), 
String.valueOf(partitionColumns), String.valueOf(orderColumns));
+            }
+            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+        }
+        int pIdx = 0;
+        for (LogicalVariable pColumn : pcVars) {
+            lopColumns.add(pIdx++, new OrderColumn(pColumn, 
OrderOperator.IOrder.OrderKind.ASC));
+        }
+        List<ILocalStructuralProperty> localProps =
+                lopColumns.isEmpty() ? null : Collections.singletonList(new 
LocalOrderProperty(lopColumns));
+
+        return new PhysicalRequirements(
+                new StructuralPropertiesVector[] { new 
StructuralPropertiesVector(pp, localProps) },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        WindowOperator winOp = (WindowOperator) op;
+
+        int[] partitionColumnsList = 
JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
+
+        IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
+        IBinaryComparatorFactory[] partitionComparatorFactories =
+                
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, 
opTypeEnv, context);
+
+        //TODO not all functions need order comparators
+        IBinaryComparatorFactory[] orderComparatorFactories =
+                
JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, 
context);
+
+        IVariableTypeEnvironment inputTypeEnv = 
context.getTypeEnvironment(op.getInputs().get(0).getValue());
+        IExpressionRuntimeProvider exprRuntimeProvider = 
context.getExpressionRuntimeProvider();
+        IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = 
context.getBinaryComparatorFactoryProvider();
+
+        List<Mutable<ILogicalExpression>> frameStartExprList = 
winOp.getFrameStartExpressions();
+        IScalarEvaluatorFactory[] frameStartExprEvals =
+                createEvaluatorFactories(frameStartExprList, inputSchemas, 
inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameStartValidationExprList = 
winOp.getFrameStartValidationExpressions();
+        IScalarEvaluatorFactory[] frameStartValidationExprEvals = 
createEvaluatorFactories(frameStartValidationExprList,
+                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameEndExprList = 
winOp.getFrameEndExpressions();
+        IScalarEvaluatorFactory[] frameEndExprEvals =
+                createEvaluatorFactories(frameEndExprList, inputSchemas, 
inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameEndValidationExprList = 
winOp.getFrameEndValidationExpressions();
+        IScalarEvaluatorFactory[] frameEndValidationExprEvals = 
createEvaluatorFactories(frameEndValidationExprList,
+                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+        List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
frameValueExprList =
+                winOp.getFrameValueExpressions();
+        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> 
frameValueExprEvalsAndComparators =
+                createEvaluatorAndComparatorFactories(frameValueExprList, 
Pair::getSecond, Pair::getFirst, inputSchemas,
+                        inputTypeEnv, exprRuntimeProvider, 
binaryComparatorFactoryProvider, context);
+
+        List<Mutable<ILogicalExpression>> frameExcludeExprList = 
winOp.getFrameExcludeExpressions();
+        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> 
frameExcludeExprEvalsAndComparators =
+                createEvaluatorAndComparatorFactories(frameExcludeExprList, v 
-> v, v -> OrderOperator.ASC_ORDER,
+                        inputSchemas, inputTypeEnv, exprRuntimeProvider, 
binaryComparatorFactoryProvider, context);
+
+        IScalarEvaluatorFactory frameOffsetExprEval = null;
+        ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
+        if (frameOffsetExpr != null) {
+            frameOffsetExprEval =
+                    
exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, 
inputSchemas, context);
+        }
+
+        int[] projectionColumnsExcludingSubplans = 
JobGenHelper.projectAllVariables(opSchema);
+
+        int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, 
winOp.getVariables());
+
+        List<Mutable<ILogicalExpression>> runningAggExprs = 
winOp.getExpressions();
+        int runningAggExprCount = runningAggExprs.size();
+        IRunningAggregateEvaluatorFactory[] runningAggFactories =
+                new IRunningAggregateEvaluatorFactory[runningAggExprCount];
+        for (int i = 0; i < runningAggExprCount; i++) {
+            StatefulFunctionCallExpression expr = 
(StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
+            runningAggFactories[i] = 
exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
+                    inputSchemas, context);
+        }
+
+        int nestedAggOutSchemaSize = 0;
+        WindowAggregatorDescriptorFactory nestedAggFactory = null;
+        if (winOp.hasNestedPlans()) {
+            int opSchemaSizePreSubplans = opSchema.getSize();
+            AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], 
winOp, opSchema, context);
+            nestedAggOutSchemaSize = opSchema.getSize() - 
opSchemaSizePreSubplans;
+            nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
+            nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
+        }
+
+        AbstractWindowRuntimeFactory runtime = createRuntimeFactory(winOp, 
partitionColumnsList,
+                partitionComparatorFactories, orderComparatorFactories, 
frameValueExprEvalsAndComparators.first,
+                frameValueExprEvalsAndComparators.second, frameStartExprEvals, 
frameStartValidationExprEvals,
+                frameEndExprEvals, frameEndValidationExprEvals, 
frameExcludeExprEvalsAndComparators.first,
+                frameExcludeExprEvalsAndComparators.second, 
frameOffsetExprEval, projectionColumnsExcludingSubplans,
+                runningAggOutColumns, runningAggFactories, 
nestedAggOutSchemaSize, nestedAggFactory, context);
+        runtime.setSourceLocation(winOp.getSourceLocation());
+
+        // contribute one Asterix framewriter
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, 
opSchema, context);
+        builder.contributeMicroOperator(winOp, runtime, recDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = winOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, winOp, 0);
+    }
+
+    protected abstract AbstractWindowRuntimeFactory 
createRuntimeFactory(WindowOperator winOp,
+            int[] partitionColumnsList, IBinaryComparatorFactory[] 
partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IScalarEvaluatorFactory[] frameValueExprEvals,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, 
IScalarEvaluatorFactory[] frameStartExprEvals,
+            IScalarEvaluatorFactory[] frameStartValidationExprEvals, 
IScalarEvaluatorFactory[] frameEndExprEvals,
+            IScalarEvaluatorFactory[] frameEndValidationExprEvals, 
IScalarEvaluatorFactory[] frameExcludeExprEvals,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, 
IScalarEvaluatorFactory frameOffsetExprEval,
+            int[] projectionColumnsExcludingSubplans, int[] 
runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, int 
nestedAggOutSchemaSize,
+            WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext 
context);
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+
+    private IScalarEvaluatorFactory[] 
createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment 
inputTypeEnv,
+            IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext 
context) throws AlgebricksException {
+        if (exprList.isEmpty()) {
+            return null;
+        }
+        int ln = exprList.size();
+        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+        for (int i = 0; i < ln; i++) {
+            ILogicalExpression expr = exprList.get(i).getValue();
+            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, 
inputTypeEnv, inputSchemas, context);
+        }
+        return evals;
+    }
+
+    private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> 
createEvaluatorAndComparatorFactories(
+            List<T> exprList, Function<T, Mutable<ILogicalExpression>> 
exprGetter,
+            Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] 
inputSchemas,
+            IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider 
exprRuntimeProvider,
+            IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, 
JobGenContext context)
+            throws AlgebricksException {
+        if (exprList.isEmpty()) {
+            return new Pair<>(null, null);
+        }
+        int ln = exprList.size();
+        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+        IBinaryComparatorFactory[] comparators = new 
IBinaryComparatorFactory[ln];
+        for (int i = 0; i < ln; i++) {
+            T exprObj = exprList.get(i);
+            ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
+            OrderOperator.IOrder order = orderGetter.apply(exprObj);
+            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, 
inputTypeEnv, inputSchemas, context);
+            comparators[i] = 
binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
+                    order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
+        }
+        return new Pair<>(evals, comparators);
+    }
+
+    private static boolean containsAny(List<OrderColumn> ocList, int startIdx, 
Set<LogicalVariable> varSet) {
+        for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
+            if (varSet.contains(ocList.get(i).getColumn())) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 8bd4610..23853e8 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -19,42 +19,13 @@

 package org.apache.hyracks.algebricks.core.algebra.operators.physical;

-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.function.Function;

-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -63,18 +34,9 @@
 import 
org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
-import 
org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.ErrorCode;

-public class WindowPOperator extends AbstractPhysicalOperator {
-
-    private final List<LogicalVariable> partitionColumns;
-
-    private final boolean partitionMaterialization;
-
-    private final List<OrderColumn> orderColumns;
+public final class WindowPOperator extends AbstractWindowPOperator {

     private final boolean frameStartIsMonotonic;

@@ -85,12 +47,10 @@
     // The maximum number of in-memory frames that this operator can use.
     private final int memSizeInFrames;

-    public WindowPOperator(List<LogicalVariable> partitionColumns, boolean 
partitionMaterialization,
-            List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, 
boolean frameEndIsMonotonic,
-            boolean nestedTrivialAggregates, int memSizeInFrames) {
-        this.partitionColumns = partitionColumns;
-        this.partitionMaterialization = partitionMaterialization;
-        this.orderColumns = orderColumns;
+    public WindowPOperator(List<LogicalVariable> partitionColumns, 
List<OrderColumn> orderColumns,
+            boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, 
boolean nestedTrivialAggregates,
+            int memSizeInFrames) {
+        super(partitionColumns, orderColumns);
         this.frameStartIsMonotonic = frameStartIsMonotonic;
         this.frameEndIsMonotonic = frameEndIsMonotonic;
         this.nestedTrivialAggregates = nestedTrivialAggregates;
@@ -103,245 +63,55 @@
     }

     @Override
-    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) throws AlgebricksException {
-        IPartitioningProperty pp;
-        switch (op.getExecutionMode()) {
-            case PARTITIONED:
-                pp = new UnorderedPartitionedProperty(new 
ListSet<>(partitionColumns),
-                        context.getComputationNodeDomain());
-                break;
-            case UNPARTITIONED:
-                pp = IPartitioningProperty.UNPARTITIONED;
-                break;
-            case LOCAL:
-                pp = null;
-                break;
-            default:
-                throw new IllegalStateException(op.getExecutionMode().name());
-        }
+    protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator 
winOp, int[] partitionColumnsList,
+            IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IScalarEvaluatorFactory[] frameValueExprEvals,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, 
IScalarEvaluatorFactory[] frameStartExprEvals,
+            IScalarEvaluatorFactory[] frameStartValidationExprEvals, 
IScalarEvaluatorFactory[] frameEndExprEvals,
+            IScalarEvaluatorFactory[] frameEndValidationExprEvals, 
IScalarEvaluatorFactory[] frameExcludeExprEvals,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, 
IScalarEvaluatorFactory frameOffsetExprEval,
+            int[] projectionColumnsExcludingSubplans, int[] 
runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, int 
nestedAggOutSchemaSize,
+            WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext 
context) {

-        // require local order property [pc1, ... pcN, oc1, ... ocN]
-        // accounting for cases where there's an overlap between order and 
partition columns
-        // TODO replace with required local grouping on partition columns + 
local order on order columns
-        List<OrderColumn> lopColumns = new ArrayList<>();
-        ListSet<LogicalVariable> pcVars = new ListSet<>();
-        pcVars.addAll(partitionColumns);
-        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
-            OrderColumn oc = orderColumns.get(oIdx);
-            LogicalVariable ocVar = oc.getColumn();
-            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, 
pcVars)) {
-                throw new AlgebricksException(ErrorCode.HYRACKS, 
ErrorCode.UNSUPPORTED_WINDOW_SPEC,
-                        op.getSourceLocation(), 
String.valueOf(partitionColumns), String.valueOf(orderColumns));
-            }
-            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
-        }
-        int pIdx = 0;
-        for (LogicalVariable pColumn : pcVars) {
-            lopColumns.add(pIdx++, new OrderColumn(pColumn, 
OrderOperator.IOrder.OrderKind.ASC));
-        }
-        List<ILocalStructuralProperty> localProps =
-                lopColumns.isEmpty() ? null : Collections.singletonList(new 
LocalOrderProperty(lopColumns));
-
-        return new PhysicalRequirements(
-                new StructuralPropertiesVector[] { new 
StructuralPropertiesVector(pp, localProps) },
-                IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
-        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
-    }
-
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
-            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        WindowOperator winOp = (WindowOperator) op;
-
-        int[] partitionColumnsList = 
JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
-
-        IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
-        IBinaryComparatorFactory[] partitionComparatorFactories =
-                
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, 
opTypeEnv, context);
-
-        //TODO not all functions need order comparators
-        IBinaryComparatorFactory[] orderComparatorFactories =
-                
JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, 
context);
-
-        IVariableTypeEnvironment inputTypeEnv = 
context.getTypeEnvironment(op.getInputs().get(0).getValue());
-        IExpressionRuntimeProvider exprRuntimeProvider = 
context.getExpressionRuntimeProvider();
-        IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = 
context.getBinaryComparatorFactoryProvider();
-
-        List<Mutable<ILogicalExpression>> frameStartExprList = 
winOp.getFrameStartExpressions();
-        IScalarEvaluatorFactory[] frameStartExprEvals =
-                createEvaluatorFactories(frameStartExprList, inputSchemas, 
inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameStartValidationExprList = 
winOp.getFrameStartValidationExpressions();
-        IScalarEvaluatorFactory[] frameStartValidationExprEvals = 
createEvaluatorFactories(frameStartValidationExprList,
-                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameEndExprList = 
winOp.getFrameEndExpressions();
-        IScalarEvaluatorFactory[] frameEndExprEvals =
-                createEvaluatorFactories(frameEndExprList, inputSchemas, 
inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameEndValidationExprList = 
winOp.getFrameEndValidationExpressions();
-        IScalarEvaluatorFactory[] frameEndValidationExprEvals = 
createEvaluatorFactories(frameEndValidationExprList,
-                inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
-        List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> 
frameValueExprList =
-                winOp.getFrameValueExpressions();
-        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> 
frameValueExprEvalsAndComparators =
-                createEvaluatorAndComparatorFactories(frameValueExprList, 
Pair::getSecond, Pair::getFirst, inputSchemas,
-                        inputTypeEnv, exprRuntimeProvider, 
binaryComparatorFactoryProvider, context);
-
-        List<Mutable<ILogicalExpression>> frameExcludeExprList = 
winOp.getFrameExcludeExpressions();
-        Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> 
frameExcludeExprEvalsAndComparators =
-                createEvaluatorAndComparatorFactories(frameExcludeExprList, v 
-> v, v -> OrderOperator.ASC_ORDER,
-                        inputSchemas, inputTypeEnv, exprRuntimeProvider, 
binaryComparatorFactoryProvider, context);
-
-        IScalarEvaluatorFactory frameOffsetExprEval = null;
-        ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
-        if (frameOffsetExpr != null) {
-            frameOffsetExprEval =
-                    
exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, 
inputSchemas, context);
-        }
-
-        int[] projectionColumnsExcludingSubplans = 
JobGenHelper.projectAllVariables(opSchema);
-
-        int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, 
winOp.getVariables());
-
-        List<Mutable<ILogicalExpression>> runningAggExprs = 
winOp.getExpressions();
-        int runningAggExprCount = runningAggExprs.size();
-        IRunningAggregateEvaluatorFactory[] runningAggFactories =
-                new IRunningAggregateEvaluatorFactory[runningAggExprCount];
-        for (int i = 0; i < runningAggExprCount; i++) {
-            StatefulFunctionCallExpression expr = 
(StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
-            runningAggFactories[i] = 
exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
-                    inputSchemas, context);
-        }
-
-        AbstractWindowRuntimeFactory runtime = null;
-        if (winOp.hasNestedPlans()) {
-            int opSchemaSizePreSubplans = opSchema.getSize();
-            AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], 
winOp, opSchema, context);
-            int aggregatorOutputSchemaSize = opSchema.getSize() - 
opSchemaSizePreSubplans;
-            WindowAggregatorDescriptorFactory nestedAggFactory = new 
WindowAggregatorDescriptorFactory(subplans);
-            nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
-
-            int frameMaxObjects = winOp.getFrameMaxObjects();
-
-            // special cases
-            if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() 
&& frameOffsetExpr == null) {
-                if (frameEndExprList.isEmpty()) {
-                    // special case #1: frame == whole partition, no 
exclusions, no offset
-                    runtime = new 
WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
-                            partitionComparatorFactories, 
orderComparatorFactories, frameMaxObjects,
-                            projectionColumnsExcludingSubplans, 
runningAggOutColumns, runningAggFactories,
-                            aggregatorOutputSchemaSize, nestedAggFactory, 
memSizeInFrames);
-                } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
-                    // special case #2: accumulating frame from beginning of 
the partition, no exclusions, no offset,
-                    //                  trivial aggregate subplan ( aggregate 
+ nts )
-                    nestedAggFactory.setPartialOutputEnabled(true);
-                    runtime = new 
WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
-                            partitionComparatorFactories, 
orderComparatorFactories,
-                            frameValueExprEvalsAndComparators.first, 
frameValueExprEvalsAndComparators.second,
-                            frameEndExprEvals, frameEndValidationExprEvals, 
frameMaxObjects,
-                            context.getBinaryBooleanInspectorFactory(), 
projectionColumnsExcludingSubplans,
-                            runningAggOutColumns, runningAggFactories, 
aggregatorOutputSchemaSize, nestedAggFactory,
-                            memSizeInFrames);
-                }
-            }
-            // default case
-            if (runtime == null) {
-                runtime = new 
WindowNestedPlansRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
-                        orderComparatorFactories, 
frameValueExprEvalsAndComparators.first,
-                        frameValueExprEvalsAndComparators.second, 
frameStartExprEvals, frameStartValidationExprEvals,
-                        frameStartIsMonotonic, frameEndExprEvals, 
frameEndValidationExprEvals,
-                        frameExcludeExprEvalsAndComparators.first, 
winOp.getFrameExcludeNegationStartIdx(),
-                        frameExcludeExprEvalsAndComparators.second, 
frameOffsetExprEval, frameMaxObjects,
-                        context.getBinaryBooleanInspectorFactory(), 
context.getBinaryIntegerInspectorFactory(),
-                        projectionColumnsExcludingSubplans, 
runningAggOutColumns, runningAggFactories,
-                        aggregatorOutputSchemaSize, nestedAggFactory, 
memSizeInFrames);
-            }
-        } else if (partitionMaterialization) {
-            runtime = new 
WindowMaterializingRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
+        // special cases
+        if (!winOp.hasNestedPlans()) {
+            return new WindowMaterializingRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
                     orderComparatorFactories, 
projectionColumnsExcludingSubplans, runningAggOutColumns,
                     runningAggFactories, memSizeInFrames);
-        } else {
-            runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
-                    orderComparatorFactories, 
projectionColumnsExcludingSubplans, runningAggOutColumns,
-                    runningAggFactories);
         }
-        runtime.setSourceLocation(winOp.getSourceLocation());

-        // contribute one Asterix framewriter
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, 
opSchema, context);
-        builder.contributeMicroOperator(winOp, runtime, recDesc);
-        // and contribute one edge from its child
-        ILogicalOperator src = winOp.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, winOp, 0);
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return true;
-    }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return true;
-    }
-
-    public boolean isPartitionMaterialization() {
-        return partitionMaterialization;
-    }
-
-    private IScalarEvaluatorFactory[] 
createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment 
inputTypeEnv,
-            IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext 
context) throws AlgebricksException {
-        if (exprList.isEmpty()) {
-            return null;
-        }
-        int ln = exprList.size();
-        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
-        for (int i = 0; i < ln; i++) {
-            ILogicalExpression expr = exprList.get(i).getValue();
-            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, 
inputTypeEnv, inputSchemas, context);
-        }
-        return evals;
-    }
-
-    private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> 
createEvaluatorAndComparatorFactories(
-            List<T> exprList, Function<T, Mutable<ILogicalExpression>> 
exprGetter,
-            Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] 
inputSchemas,
-            IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider 
exprRuntimeProvider,
-            IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, 
JobGenContext context)
-            throws AlgebricksException {
-        if (exprList.isEmpty()) {
-            return new Pair<>(null, null);
-        }
-        int ln = exprList.size();
-        IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
-        IBinaryComparatorFactory[] comparators = new 
IBinaryComparatorFactory[ln];
-        for (int i = 0; i < ln; i++) {
-            T exprObj = exprList.get(i);
-            ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
-            OrderOperator.IOrder order = orderGetter.apply(exprObj);
-            evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, 
inputTypeEnv, inputSchemas, context);
-            comparators[i] = 
binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
-                    order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
-        }
-        return new Pair<>(evals, comparators);
-    }
-
-    private boolean containsAny(List<OrderColumn> ocList, int startIdx, 
Set<LogicalVariable> varSet) {
-        for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
-            if (varSet.contains(ocList.get(i).getColumn())) {
-                return true;
+        boolean hasFrameStart = frameStartExprEvals != null && 
frameStartExprEvals.length > 0;
+        boolean hasFrameEnd = frameEndExprEvals != null && 
frameEndExprEvals.length > 0;
+        boolean hasFrameExclude = frameExcludeExprEvals != null && 
frameExcludeExprEvals.length > 0;
+        boolean hasFrameOffset = frameOffsetExprEval != null;
+        if (!hasFrameStart && !hasFrameExclude && !hasFrameOffset) {
+            if (!hasFrameEnd) {
+                // special case #1: frame == whole partition, no exclusions, 
no offset
+                return new 
WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
+                        orderComparatorFactories, winOp.getFrameMaxObjects(), 
projectionColumnsExcludingSubplans,
+                        runningAggOutColumns, runningAggFactories, 
nestedAggOutSchemaSize, nestedAggFactory,
+                        memSizeInFrames);
+            } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
+                // special case #2: accumulating frame from beginning of the 
partition, no exclusions, no offset,
+                //                  trivial aggregate subplan ( aggregate + 
nts )
+                nestedAggFactory.setPartialOutputEnabled(true);
+                return new 
WindowNestedPlansRunningRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
+                        orderComparatorFactories, frameValueExprEvals, 
frameValueComparatorFactories, frameEndExprEvals,
+                        frameEndValidationExprEvals, 
winOp.getFrameMaxObjects(),
+                        context.getBinaryBooleanInspectorFactory(), 
projectionColumnsExcludingSubplans,
+                        runningAggOutColumns, runningAggFactories, 
nestedAggOutSchemaSize, nestedAggFactory,
+                        memSizeInFrames);
             }
         }
-        return false;
+
+        // default case
+        return new WindowNestedPlansRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
+                orderComparatorFactories, frameValueExprEvals, 
frameValueComparatorFactories, frameStartExprEvals,
+                frameStartValidationExprEvals, frameStartIsMonotonic, 
frameEndExprEvals, frameEndValidationExprEvals,
+                frameExcludeExprEvals, 
winOp.getFrameExcludeNegationStartIdx(), frameExcludeComparatorFactories,
+                frameOffsetExprEval, winOp.getFrameMaxObjects(), 
context.getBinaryBooleanInspectorFactory(),
+                context.getBinaryIntegerInspectorFactory(), 
projectionColumnsExcludingSubplans, runningAggOutColumns,
+                runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, 
memSizeInFrames);
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
new file mode 100644
index 0000000..33b47ec
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.win.WindowStreamRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public final class WindowStreamPOperator extends AbstractWindowPOperator {
+
+    public WindowStreamPOperator(List<LogicalVariable> partitionColumns, 
List<OrderColumn> orderColumns) {
+        super(partitionColumns, orderColumns);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.WINDOW_STREAM;
+    }
+
+    @Override
+    protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator 
winOp, int[] partitionColumnsList,
+            IBinaryComparatorFactory[] partitionComparatorFactories,
+            IBinaryComparatorFactory[] orderComparatorFactories, 
IScalarEvaluatorFactory[] frameValueExprEvals,
+            IBinaryComparatorFactory[] frameValueComparatorFactories, 
IScalarEvaluatorFactory[] frameStartExprEvals,
+            IScalarEvaluatorFactory[] frameStartValidationExprEvals, 
IScalarEvaluatorFactory[] frameEndExprEvals,
+            IScalarEvaluatorFactory[] frameEndValidationExprEvals, 
IScalarEvaluatorFactory[] frameExcludeExprEvals,
+            IBinaryComparatorFactory[] frameExcludeComparatorFactories, 
IScalarEvaluatorFactory frameOffsetExprEval,
+            int[] projectionColumnsExcludingSubplans, int[] 
runningAggOutColumns,
+            IRunningAggregateEvaluatorFactory[] runningAggFactories, int 
nestedAggOutSchemaSize,
+            WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext 
context) {
+        return new WindowStreamRuntimeFactory(partitionColumnsList, 
partitionComparatorFactories,
+                orderComparatorFactories, projectionColumnsExcludingSubplans, 
runningAggOutColumns,
+                runningAggFactories);
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f127898..e6cdc28 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -79,6 +79,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator;
@@ -468,8 +469,8 @@
             return createWindowPOperator(op);
         }

-        protected WindowPOperator createWindowPOperator(WindowOperator op) 
throws AlgebricksException {
-            return new WindowPOperator(op.getPartitionVarList(), true, 
op.getOrderColumnList(), false, false, false,
+        protected AbstractWindowPOperator createWindowPOperator(WindowOperator 
op) throws AlgebricksException {
+            return new WindowPOperator(op.getPartitionVarList(), 
op.getOrderColumnList(), false, false, false,
                     
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
         }

diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
similarity index 94%
rename from 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
rename to 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
index f7f1a25..d23d4e7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
@@ -30,9 +30,9 @@
 /**
  * Runtime for window operators that evaluates running aggregates without 
partition materialization.
  */
-class WindowSimplePushRuntime extends AbstractWindowPushRuntime {
+class WindowStreamPushRuntime extends AbstractWindowPushRuntime {

-    WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
+    WindowStreamPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumns, int[] runningAggOutColumns,
             IRunningAggregateEvaluatorFactory[] runningAggFactories, 
IHyracksTaskContext ctx,
             SourceLocation sourceLoc) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
similarity index 82%
rename from 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
rename to 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
index 2d1cdde..be368a9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
@@ -27,13 +27,14 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;

 /**
- * Runtime factory for window operators that evaluates running aggregates 
without partition materialization.
+ * Runtime factory for window operators that evaluates running aggregates in a 
streaming fashion
+ * (without partition materialization).
  */
-public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory {
+public class WindowStreamRuntimeFactory extends AbstractWindowRuntimeFactory {

     private static final long serialVersionUID = 1L;

-    public WindowSimpleRuntimeFactory(int[] partitionColumns, 
IBinaryComparatorFactory[] partitionComparatorFactories,
+    public WindowStreamRuntimeFactory(int[] partitionColumns, 
IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, int[] 
projectionColumnsExcludingSubplans,
             int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] 
runningAggFactories) {
         super(partitionColumns, partitionComparatorFactories, 
orderComparatorFactories,
@@ -42,13 +43,13 @@

     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
-        return new WindowSimplePushRuntime(partitionColumns, 
partitionComparatorFactories, orderComparatorFactories,
+        return new WindowStreamPushRuntime(partitionColumns, 
partitionComparatorFactories, orderComparatorFactories,
                 projectionList, runningAggOutColumns, runningAggFactories, 
ctx, sourceLoc);
     }

     @Override
     public String toString() {
-        return "window (" + Arrays.toString(partitionColumns) + ") " + 
Arrays.toString(runningAggOutColumns) + " := "
-                + Arrays.toString(runningAggFactories);
+        return "window-stream (" + Arrays.toString(partitionColumns) + ") " + 
Arrays.toString(runningAggOutColumns)
+                + " := " + Arrays.toString(runningAggFactories);
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/3369
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e
Gerrit-Change-Number: 3369
Gerrit-PatchSet: 3
Gerrit-Owner: Dmitry Lychagin <dmitry.lycha...@couchbase.com>
Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>

Reply via email to