This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b429b8d7 [IOTDB-3333] Distribution planning for 
SlidingWindowAggregationNode (#6103)
69b429b8d7 is described below

commit 69b429b8d7c44f9b7bc914dd7be72cefec9e8fca
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Wed Jun 1 13:54:11 2022 +0800

    [IOTDB-3333] Distribution planning for SlidingWindowAggregationNode (#6103)
---
 .../distribution/DistributionPlanContext.java      |  11 +
 .../planner/distribution/ExchangeNodeAdder.java    |  18 ++
 .../plan/planner/distribution/SourceRewriter.java  | 182 ++++++++++----
 .../node/process/SlidingWindowAggregationNode.java |  10 +-
 .../distribution/AggregationDistributionTest.java  | 277 +++++++++++++++++++++
 5 files changed, 454 insertions(+), 44 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 6f9e16e6ee..88f13eddf4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -22,9 +22,20 @@ package org.apache.iotdb.db.mpp.plan.planner.distribution;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 
 public class DistributionPlanContext {
+  protected boolean isRoot;
   protected MPPQueryContext queryContext;
 
   protected DistributionPlanContext(MPPQueryContext queryContext) {
+    this.isRoot = true;
     this.queryContext = queryContext;
   }
+
+  protected DistributionPlanContext copy() {
+    return new DistributionPlanContext(queryContext);
+  }
+
+  protected DistributionPlanContext setRoot(boolean isRoot) {
+    this.isRoot = isRoot;
+    return this;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 4de9558101..3373722007 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
@@ -250,6 +251,23 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
+  @Override
+  public PlanNode visitSlidingWindowAggregation(
+      SlidingWindowAggregationNode node, NodeGroupContext context) {
+    return processOneChildNode(node, context);
+  }
+
+  private PlanNode processOneChildNode(PlanNode node, NodeGroupContext 
context) {
+    PlanNode newNode = node.clone();
+    PlanNode child = visit(node.getChildren().get(0), context);
+    newNode.addChild(child);
+    TRegionReplicaSet dataRegion = 
context.getNodeDistribution(child.getPlanNodeId()).region;
+    context.putNodeDistribution(
+        newNode.getPlanNodeId(),
+        new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, 
dataRegion));
+    return newNode;
+  }
+
   private TRegionReplicaSet calculateDataRegionByChildren(
       List<PlanNode> children, NodeGroupContext context) {
     // Step 1: calculate the count of children group by DataRegion.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 7cbc01469a..6c20206302 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
@@ -52,6 +53,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -452,6 +454,17 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return false;
   }
 
+  // This method is only used to process the PlanNodeTree whose root is 
SlidingWindowAggregationNode
+  @Override
+  public PlanNode visitSlidingWindowAggregation(
+      SlidingWindowAggregationNode node, DistributionPlanContext context) {
+    DistributionPlanContext childContext = context.copy().setRoot(false);
+    PlanNode child = visit(node.getChild(), childContext);
+    PlanNode newRoot = node.clone();
+    newRoot.addChild(child);
+    return newRoot;
+  }
+
   private PlanNode planAggregationWithTimeJoin(TimeJoinNode root, 
DistributionPlanContext context) {
 
     List<SeriesAggregationSourceNode> sources = 
splitAggregationSourceByPartition(root, context);
@@ -469,7 +482,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                 rootAggDescriptorList.add(
                     new AggregationDescriptor(
                         descriptor.getAggregationType(),
-                        AggregationStep.FINAL,
+                        context.isRoot ? AggregationStep.FINAL : 
AggregationStep.PARTIAL,
                         descriptor.getInputExpressions()));
               });
     }
@@ -512,6 +525,64 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
         
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
 
+    boolean containsSlidingWindow =
+        root.getChildren().size() == 1
+            && root.getChildren().get(0) instanceof 
SlidingWindowAggregationNode;
+
+    GroupByLevelNode newRoot =
+        containsSlidingWindow
+            ? groupSourcesForGroupByLevelWithSlidingWindow(
+                root,
+                (SlidingWindowAggregationNode) root.getChildren().get(0),
+                sourceGroup,
+                context)
+            : groupSourcesForGroupByLevel(root, sourceGroup, context);
+
+    // Then, we calculate the attributes for GroupByLevelNode in each level
+    calculateGroupByLevelNodeAttributes(newRoot, 0);
+    return newRoot;
+  }
+
+  private GroupByLevelNode groupSourcesForGroupByLevelWithSlidingWindow(
+      GroupByLevelNode root,
+      SlidingWindowAggregationNode slidingWindowNode,
+      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup,
+      DistributionPlanContext context) {
+    GroupByLevelNode newRoot = (GroupByLevelNode) root.clone();
+    List<SlidingWindowAggregationNode> groups = new ArrayList<>();
+    sourceGroup.forEach(
+        (dataRegion, sourceNodes) -> {
+          SlidingWindowAggregationNode parentOfGroup =
+              (SlidingWindowAggregationNode) slidingWindowNode.clone();
+          
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          if (sourceNodes.size() == 1) {
+            parentOfGroup.addChild(sourceNodes.get(0));
+          } else {
+            TimeJoinNode timeJoinNode =
+                new TimeJoinNode(
+                    context.queryContext.getQueryId().genPlanNodeId(), 
root.getScanOrder());
+            sourceNodes.forEach(timeJoinNode::addChild);
+            parentOfGroup.addChild(timeJoinNode);
+          }
+          groups.add(parentOfGroup);
+        });
+    for (int i = 0; i < groups.size(); i++) {
+      if (i == 0) {
+        newRoot.addChild(groups.get(i));
+        continue;
+      }
+      GroupByLevelNode parent = (GroupByLevelNode) root.clone();
+      parent.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+      parent.addChild(groups.get(i));
+      newRoot.addChild(parent);
+    }
+    return newRoot;
+  }
+
+  private GroupByLevelNode groupSourcesForGroupByLevel(
+      GroupByLevelNode root,
+      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup,
+      DistributionPlanContext context) {
     GroupByLevelNode newRoot = (GroupByLevelNode) root.clone();
     final boolean[] addParent = {false};
     sourceGroup.forEach(
@@ -523,8 +594,6 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
               sourceNodes.forEach(newRoot::addChild);
               addParent[0] = true;
             } else {
-              // We clone a TimeJoinNode from root to make the params to be 
consistent.
-              // But we need to assign a new ID to it
               GroupByLevelNode parentOfGroup = (GroupByLevelNode) root.clone();
               
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
               sourceNodes.forEach(parentOfGroup::addChild);
@@ -532,55 +601,69 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
             }
           }
         });
-
-    // Then, we calculate the attributes for GroupByLevelNode in each level
-    calculateGroupByLevelNodeAttributes(newRoot, 0);
     return newRoot;
   }
 
+  // TODO: (xingtanzjr) consider to implement the descriptor construction in 
every class
   private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
     if (node == null) {
       return;
     }
     node.getChildren().forEach(child -> 
calculateGroupByLevelNodeAttributes(child, level + 1));
-    if (!(node instanceof GroupByLevelNode)) {
-      return;
-    }
-    GroupByLevelNode handle = (GroupByLevelNode) node;
 
     // Construct all outputColumns from children. Using Set here to avoid 
duplication
     Set<String> childrenOutputColumns = new HashSet<>();
-    handle
-        .getChildren()
-        .forEach(child -> 
childrenOutputColumns.addAll(child.getOutputColumnNames()));
-
-    // Check every OutputColumn of GroupByLevelNode and set the Expression of 
corresponding
-    // AggregationDescriptor
-    List<GroupByLevelDescriptor> descriptorList = new ArrayList<>();
-    for (GroupByLevelDescriptor originalDescriptor : 
handle.getGroupByLevelDescriptors()) {
-      List<Expression> descriptorExpression = new ArrayList<>();
-      for (String childColumn : childrenOutputColumns) {
-        // If this condition matched, the childColumn should come from 
GroupByLevelNode
-        if (isAggColumnMatchExpression(childColumn, 
originalDescriptor.getOutputExpression())) {
-          descriptorExpression.add(originalDescriptor.getOutputExpression());
-          continue;
-        }
-        for (Expression exp : originalDescriptor.getInputExpressions()) {
-          if (isAggColumnMatchExpression(childColumn, exp)) {
-            descriptorExpression.add(exp);
+    node.getChildren().forEach(child -> 
childrenOutputColumns.addAll(child.getOutputColumnNames()));
+
+    if (node instanceof SlidingWindowAggregationNode) {
+      SlidingWindowAggregationNode handle = (SlidingWindowAggregationNode) 
node;
+      List<AggregationDescriptor> descriptorList = new ArrayList<>();
+      for (AggregationDescriptor originalDescriptor : 
handle.getAggregationDescriptorList()) {
+        boolean keep = false;
+        for (String childColumn : childrenOutputColumns) {
+          for (Expression exp : originalDescriptor.getInputExpressions()) {
+            if (isAggColumnMatchExpression(childColumn, exp)) {
+              keep = true;
+            }
           }
         }
+        if (keep) {
+          descriptorList.add(originalDescriptor);
+        }
       }
-      if (descriptorExpression.size() == 0) {
-        continue;
-      }
-      GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
-      descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.PARTIAL);
-      descriptor.setInputExpressions(descriptorExpression);
+      handle.setAggregationDescriptorList(descriptorList);
+    }
 
-      descriptorList.add(descriptor);
+    if (node instanceof GroupByLevelNode) {
+      GroupByLevelNode handle = (GroupByLevelNode) node;
+      // Check every OutputColumn of GroupByLevelNode and set the Expression 
of corresponding
+      // AggregationDescriptor
+      List<GroupByLevelDescriptor> descriptorList = new ArrayList<>();
+      for (GroupByLevelDescriptor originalDescriptor : 
handle.getGroupByLevelDescriptors()) {
+        List<Expression> descriptorExpression = new ArrayList<>();
+        for (String childColumn : childrenOutputColumns) {
+          // If this condition matched, the childColumn should come from 
GroupByLevelNode
+          if (isAggColumnMatchExpression(childColumn, 
originalDescriptor.getOutputExpression())) {
+            descriptorExpression.add(originalDescriptor.getOutputExpression());
+            continue;
+          }
+          for (Expression exp : originalDescriptor.getInputExpressions()) {
+            if (isAggColumnMatchExpression(childColumn, exp)) {
+              descriptorExpression.add(exp);
+            }
+          }
+        }
+        if (descriptorExpression.size() == 0) {
+          continue;
+        }
+        GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
+        descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.PARTIAL);
+        descriptor.setInputExpressions(descriptorExpression);
+
+        descriptorList.add(descriptor);
+      }
+      handle.setGroupByLevelDescriptors(descriptorList);
     }
-    handle.setGroupByLevelDescriptors(descriptorList);
   }
 
   // TODO: (xingtanzjr) need to confirm the logic when processing UDF
@@ -592,26 +675,27 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
-      MultiChildNode root, DistributionPlanContext context) {
+      PlanNode root, DistributionPlanContext context) {
+    // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
+    List<SeriesAggregationSourceNode> rawSources = 
findAggregationSourceNode(root);
     // Step 1: split SeriesAggregationSourceNode according to data partition
     List<SeriesAggregationSourceNode> sources = new ArrayList<>();
     Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
-    for (PlanNode child : root.getChildren()) {
-      SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+    for (SeriesAggregationSourceNode child : rawSources) {
       List<TRegionReplicaSet> dataDistribution =
-          analysis.getPartitionInfo(handle.getPartitionPath(), 
handle.getPartitionTimeFilter());
+          analysis.getPartitionInfo(child.getPartitionPath(), 
child.getPartitionTimeFilter());
       for (TRegionReplicaSet dataRegion : dataDistribution) {
-        SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) 
handle.clone();
+        SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) 
child.clone();
         split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
         split.setRegionReplicaSet(dataRegion);
         // Let each split reference different object of 
AggregationDescriptorList
         split.setAggregationDescriptorList(
-            handle.getAggregationDescriptorList().stream()
+            child.getAggregationDescriptorList().stream()
                 .map(AggregationDescriptor::deepClone)
                 .collect(Collectors.toList()));
         sources.add(split);
       }
-      regionCountPerSeries.put(handle.getPartitionPath(), 
dataDistribution.size());
+      regionCountPerSeries.put(child.getPartitionPath(), 
dataDistribution.size());
     }
 
     // Step 2: change the step for each SeriesAggregationSourceNode according 
to its split count
@@ -626,6 +710,18 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return sources;
   }
 
+  private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode 
node) {
+    if (node == null) {
+      return new ArrayList<>();
+    }
+    if (node instanceof SeriesAggregationSourceNode) {
+      return Collections.singletonList((SeriesAggregationSourceNode) node);
+    }
+    List<SeriesAggregationSourceNode> ret = new ArrayList<>();
+    node.getChildren().forEach(child -> 
ret.addAll(findAggregationSourceNode(child)));
+    return ret;
+  }
+
   public PlanNode visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
index d0df526b93..9362e40227 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
@@ -40,7 +40,7 @@ public class SlidingWindowAggregationNode extends ProcessNode 
{
 
   // The list of aggregate functions, each AggregateDescriptor will be output 
as one column of
   // result TsBlock
-  private final List<AggregationDescriptor> aggregationDescriptorList;
+  private List<AggregationDescriptor> aggregationDescriptorList;
 
   // The parameter of `group by time`.
   private final GroupByTimeParameter groupByTimeParameter;
@@ -74,6 +74,10 @@ public class SlidingWindowAggregationNode extends 
ProcessNode {
     return aggregationDescriptorList;
   }
 
+  public void setAggregationDescriptorList(List<AggregationDescriptor> 
aggregationDescriptorList) {
+    this.aggregationDescriptorList = aggregationDescriptorList;
+  }
+
   public GroupByTimeParameter getGroupByTimeParameter() {
     return groupByTimeParameter;
   }
@@ -175,4 +179,8 @@ public class SlidingWindowAggregationNode extends 
ProcessNode {
   public int hashCode() {
     return Objects.hash(super.hashCode(), aggregationDescriptorList, 
groupByTimeParameter, child);
   }
+
+  public String toString() {
+    return String.format("SlidingWindowAggregationNode-%s", getPlanNodeId());
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index cfa79db920..e47068a7ff 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -35,8 +35,10 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
@@ -45,10 +47,12 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -57,6 +61,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -101,6 +106,54 @@ public class AggregationDistributionTest {
     root.getChildren().forEach(child -> verifyAggregationStep(expected, 
child));
   }
 
+  @Test
+  public void testTimeJoinAggregationWithSlidingWindow() throws 
IllegalPathException {
+    QueryId queryId = new QueryId("test_query_time_join_agg_with_sliding");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), 
OrderBy.TIMESTAMP_ASC);
+    String d1s1Path = "root.sg.d1.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, 
AggregationType.COUNT));
+
+    String d3s1Path = "root.sg.d333.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, 
AggregationType.COUNT));
+
+    SlidingWindowAggregationNode slidingWindowAggregationNode =
+        genSlidingWindowAggregationNode(
+            queryId,
+            Arrays.asList(new PartialPath(d1s1Path), new 
PartialPath(d3s1Path)),
+            AggregationType.COUNT,
+            AggregationStep.PARTIAL,
+            null);
+
+    slidingWindowAggregationNode.addChild(timeJoinNode);
+
+    Analysis analysis = Util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(
+            analysis, new LogicalQueryPlan(context, 
slidingWindowAggregationNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, 
f.getFragment().getRoot()));
+    AggregationNode aggregationNode =
+        (AggregationNode)
+            fragmentInstances
+                .get(0)
+                .getFragment()
+                .getRoot()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0);
+    aggregationNode
+        .getAggregationDescriptorList()
+        .forEach(d -> Assert.assertEquals(AggregationStep.PARTIAL, 
d.getStep()));
+  }
+
   @Test
   public void testTimeJoinAggregationMultiPerRegion() throws 
IllegalPathException {
     QueryId queryId = new QueryId("test_query_time_join_aggregation");
@@ -234,6 +287,89 @@ public class AggregationDistributionTest {
         (GroupByLevelNode) 
fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
   }
 
+  @Test
+  public void testGroupByLevelNodeWithSlidingWindow() throws 
IllegalPathException {
+    QueryId queryId = new QueryId("test_group_by_level_with_sliding_window");
+    String d3s1Path = "root.sg.d333.s1";
+    String d4s1Path = "root.sg.d4444.s1";
+    String groupedPath = "root.sg.*.s1";
+
+    SlidingWindowAggregationNode slidingWindowAggregationNode =
+        genSlidingWindowAggregationNode(
+            queryId,
+            Arrays.asList(new PartialPath(d3s1Path), new 
PartialPath(d4s1Path)),
+            AggregationType.COUNT,
+            AggregationStep.PARTIAL,
+            null);
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), 
OrderBy.TIMESTAMP_ASC);
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, 
AggregationType.COUNT));
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path, 
AggregationType.COUNT));
+    slidingWindowAggregationNode.addChild(timeJoinNode);
+
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("TestGroupByLevelNode"),
+            Collections.singletonList(slidingWindowAggregationNode),
+            Collections.singletonList(
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Arrays.asList(
+                        new TimeSeriesOperand(new PartialPath(d3s1Path)),
+                        new TimeSeriesOperand(new PartialPath(d4s1Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPath)))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
+
+    Analysis analysis = Util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
groupByLevelNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, 
f.getFragment().getRoot()));
+
+    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+    expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, 
d3s1Path, d4s1Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue,
+        (GroupByLevelNode) 
fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+    expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, 
d4s1Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue2,
+        (GroupByLevelNode) 
fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+
+    verifySlidingWindowDescriptor(
+        Arrays.asList(d3s1Path, d4s1Path),
+        (SlidingWindowAggregationNode)
+            fragmentInstances
+                .get(0)
+                .getFragment()
+                .getRoot()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0));
+    verifySlidingWindowDescriptor(
+        Arrays.asList(d3s1Path, d4s1Path),
+        (SlidingWindowAggregationNode)
+            fragmentInstances
+                .get(1)
+                .getFragment()
+                .getRoot()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0));
+  }
+
   @Test
   public void testGroupByLevelTwoSeries() throws IllegalPathException {
     QueryId queryId = new QueryId("test_group_by_level_two_series");
@@ -349,6 +485,118 @@ public class AggregationDistributionTest {
         (GroupByLevelNode) 
fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
   }
 
+  @Test
+  public void testGroupByLevelWithSliding2Series2Devices3Regions() throws 
IllegalPathException {
+    QueryId queryId = new QueryId("test_group_by_level_two_series");
+    String d1s1Path = "root.sg.d1.s1";
+    String d1s2Path = "root.sg.d1.s2";
+    String d2s1Path = "root.sg.d22.s1";
+    String groupedPathS1 = "root.sg.*.s1";
+    String groupedPathS2 = "root.sg.*.s2";
+
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), 
OrderBy.TIMESTAMP_ASC);
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, 
AggregationType.COUNT));
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s2Path, 
AggregationType.COUNT));
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path, 
AggregationType.COUNT));
+
+    SlidingWindowAggregationNode slidingWindowAggregationNode =
+        genSlidingWindowAggregationNode(
+            queryId,
+            Arrays.asList(
+                new PartialPath(d1s1Path), new PartialPath(d1s2Path), new 
PartialPath(d2s1Path)),
+            AggregationType.COUNT,
+            AggregationStep.PARTIAL,
+            null);
+    slidingWindowAggregationNode.addChild(timeJoinNode);
+
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("TestGroupByLevelNode"),
+            Collections.singletonList(slidingWindowAggregationNode),
+            Arrays.asList(
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Arrays.asList(
+                        new TimeSeriesOperand(new PartialPath(d1s1Path)),
+                        new TimeSeriesOperand(new PartialPath(d2s1Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Collections.singletonList(new TimeSeriesOperand(new 
PartialPath(d1s2Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
+    Analysis analysis = Util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
groupByLevelNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
+    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, 
f.getFragment().getRoot()));
+
+    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+    expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, 
d1s1Path));
+    expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, 
d1s2Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue,
+        (GroupByLevelNode) 
fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+    expectedDescriptorValue2.put(groupedPathS1, 
Collections.singletonList(d2s1Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue2,
+        (GroupByLevelNode) 
fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+
+    Map<String, List<String>> expectedDescriptorValue3 = new HashMap<>();
+    expectedDescriptorValue3.put(groupedPathS1, 
Collections.singletonList(d1s1Path));
+    expectedDescriptorValue3.put(groupedPathS2, 
Collections.singletonList(d1s2Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue3,
+        (GroupByLevelNode) 
fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+
+    verifySlidingWindowDescriptor(
+        Arrays.asList(d1s1Path, d1s2Path),
+        (SlidingWindowAggregationNode)
+            fragmentInstances
+                .get(0)
+                .getFragment()
+                .getRoot()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0));
+    verifySlidingWindowDescriptor(
+        Collections.singletonList(d2s1Path),
+        (SlidingWindowAggregationNode)
+            fragmentInstances
+                .get(1)
+                .getFragment()
+                .getRoot()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0));
+    verifySlidingWindowDescriptor(
+        Arrays.asList(d1s1Path, d1s2Path),
+        (SlidingWindowAggregationNode)
+            fragmentInstances
+                .get(2)
+                .getFragment()
+                .getRoot()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0));
+  }
+
   @Test
   public void testAggregation1Series1Region() throws IllegalPathException {
     QueryId queryId = new QueryId("test_aggregation_1_series_1_region");
@@ -378,6 +626,35 @@ public class AggregationDistributionTest {
     }
   }
 
+  private void verifySlidingWindowDescriptor(
+      List<String> expected, SlidingWindowAggregationNode node) {
+    List<AggregationDescriptor> descriptorList = 
node.getAggregationDescriptorList();
+    assertEquals(expected.size(), descriptorList.size());
+    Map<String, Integer> verification = new HashMap<>();
+    descriptorList.forEach(
+        d -> 
verification.put(d.getInputExpressions().get(0).getExpressionString(), 1));
+    assertEquals(expected.size(), verification.size());
+    expected.forEach(v -> assertEquals(1, (int) verification.get(v)));
+  }
+
+  private SlidingWindowAggregationNode genSlidingWindowAggregationNode(
+      QueryId queryId,
+      List<PartialPath> paths,
+      AggregationType type,
+      AggregationStep step,
+      GroupByTimeParameter groupByTimeParameter) {
+    return new SlidingWindowAggregationNode(
+        queryId.genPlanNodeId(),
+        paths.stream()
+            .map(
+                path ->
+                    new AggregationDescriptor(
+                        type, step, Collections.singletonList(new 
TimeSeriesOperand(path))))
+            .collect(Collectors.toList()),
+        groupByTimeParameter,
+        OrderBy.TIMESTAMP_ASC);
+  }
+
   private SeriesAggregationSourceNode genAggregationSourceNode(
       QueryId queryId, String path, AggregationType type) throws 
IllegalPathException {
     List<AggregationDescriptor> descriptors = new ArrayList<>();

Reply via email to