This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty-mpp by this push:
     new d05878b2 finish most FragmentInstanceManager
d05878b2 is described below

commit d05878b281087eed45a335b6ecf8ae77f404b54b
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Mar 31 20:57:55 2022 +0800

    finish most FragmentInstanceManager
---
 .../iotdb/db/mpp/execution/DriverContext.java      |  23 +++-
 .../db/mpp/execution/FragmentInstanceContext.java  |  10 ++
 .../mpp/execution/FragmentInstanceExecution.java   |  85 +++++++++++++
 .../db/mpp/execution/FragmentInstanceInfo.java     |  32 +++++
 .../db/mpp/execution/FragmentInstanceManager.java  |  93 ++++++++++++++
 .../db/mpp/operator/process/TimeJoinOperator.java  |   3 +-
 ...Manager.java => FragmentInstanceScheduler.java} |  10 +-
 ...anager.java => IFragmentInstanceScheduler.java} |   2 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 134 ++++++++++++++++++---
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |   7 ++
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |   5 +
 .../planner/plan/node/process/TimeJoinNode.java    |   8 ++
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |   1 -
 .../db/mpp/operator/TimeJoinOperatorTest.java      |   1 -
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  |   2 +-
 .../mpp/schedule/FragmentInstanceManagerTest.java  |   2 +-
 16 files changed, 387 insertions(+), 31 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
index aa3a265..ea11b12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -27,11 +27,24 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import java.util.List;
 
 public class DriverContext {
-  private FragmentInstanceContext fragmentInstanceContext;
-  private List<PartialPath> paths;
-  private Filter timeFilter;
-  private VirtualStorageGroupProcessor dataRegion;
-  private List<SourceOperator> sourceOperators;
+  private final FragmentInstanceContext fragmentInstanceContext;
+  private final List<PartialPath> paths;
+  private final Filter timeFilter;
+  private final VirtualStorageGroupProcessor dataRegion;
+  private final List<SourceOperator> sourceOperators;
+
+  public DriverContext(
+      FragmentInstanceContext fragmentInstanceContext,
+      List<PartialPath> paths,
+      Filter timeFilter,
+      VirtualStorageGroupProcessor dataRegion,
+      List<SourceOperator> sourceOperators) {
+    this.fragmentInstanceContext = fragmentInstanceContext;
+    this.paths = paths;
+    this.timeFilter = timeFilter;
+    this.dataRegion = dataRegion;
+    this.sourceOperators = sourceOperators;
+  }
 
   public FragmentInstanceId getId() {
     return fragmentInstanceContext.getId();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index f9333da..fc3c82d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -37,6 +37,8 @@ public class FragmentInstanceContext extends QueryContext {
   private final List<OperatorContext> operatorContexts = new ArrayList<>();
   private final long createNanos = System.nanoTime();
 
+  private DriverContext driverContext;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -73,4 +75,12 @@ public class FragmentInstanceContext extends QueryContext {
   public FragmentInstanceId getId() {
     return id;
   }
+
+  public DriverContext getDriverContext() {
+    return driverContext;
+  }
+
+  public void setDriverContext(DriverContext driverContext) {
+    this.driverContext = driverContext;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
new file mode 100644
index 0000000..2a46ae8
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceExecution.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+
+import com.google.common.collect.ImmutableList;
+
+import static java.util.Objects.requireNonNull;
+
+public class FragmentInstanceExecution {
+
+  private final IFragmentInstanceScheduler scheduler;
+
+  private final FragmentInstanceId instanceId;
+  private final FragmentInstanceContext context;
+
+  private final Driver driver;
+
+  private FragmentInstanceState state;
+
+  private long lastHeartbeat;
+
+  public FragmentInstanceExecution(
+      IFragmentInstanceScheduler scheduler,
+      FragmentInstanceId instanceId,
+      FragmentInstanceContext context,
+      Driver driver) {
+    this.scheduler = scheduler;
+    this.instanceId = instanceId;
+    this.context = context;
+    this.driver = driver;
+    scheduler.submitFragmentInstances(instanceId.getQueryId(), 
ImmutableList.of(driver));
+  }
+
+  public void recordHeartbeat() {
+    lastHeartbeat = System.currentTimeMillis();
+  }
+
+  public void setLastHeartbeat(long lastHeartbeat) {
+    this.lastHeartbeat = lastHeartbeat;
+  }
+
+  public FragmentInstanceState getInstanceState() {
+    return state;
+  }
+
+  public void setState(FragmentInstanceState state) {
+    this.state = state;
+  }
+
+  public FragmentInstanceInfo getInstanceInfo() {
+    return new FragmentInstanceInfo(state);
+  }
+
+  public void failed(Throwable cause) {
+    requireNonNull(cause, "cause is null");
+    // TODO
+  }
+
+  public void cancel() {
+    // TODO
+  }
+
+  public void abort() {
+    // TODO
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
new file mode 100644
index 0000000..e32087b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+public class FragmentInstanceInfo {
+
+  private final FragmentInstanceState state;
+
+  public FragmentInstanceInfo(FragmentInstanceState state) {
+    this.state = state;
+  }
+
+  public FragmentInstanceState getState() {
+    return state;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
new file mode 100644
index 0000000..f8471ab
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.schedule.IFragmentInstanceScheduler;
+import org.apache.iotdb.db.mpp.sql.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.Objects.requireNonNull;
+
+public class FragmentInstanceManager {
+
+  private final Map<FragmentInstanceId, FragmentInstanceContext> 
instanceContext;
+  private final Map<FragmentInstanceId, FragmentInstanceExecution> 
instanceExecution;
+  private final LocalExecutionPlanner planner = 
LocalExecutionPlanner.getInstance();
+  private final IFragmentInstanceScheduler scheduler = 
FragmentInstanceScheduler.getInstance();
+
+  public static FragmentInstanceManager getInstance() {
+    return FragmentInstanceManager.InstanceHolder.INSTANCE;
+  }
+
+  private FragmentInstanceManager() {
+    this.instanceContext = new ConcurrentHashMap<>();
+    this.instanceExecution = new ConcurrentHashMap<>();
+  }
+
+  public FragmentInstanceInfo execDataQueryFragmentInstance(
+      FragmentInstance instance, VirtualStorageGroupProcessor dataRegion) {
+    FragmentInstanceId instanceId = instance.getId();
+
+    FragmentInstanceExecution execution =
+        instanceExecution.computeIfAbsent(
+            instanceId,
+            id -> {
+              FragmentInstanceContext context =
+                  instanceContext.computeIfAbsent(instanceId, 
FragmentInstanceContext::new);
+
+              Driver driver =
+                  planner.plan(
+                      instance.getFragment().getRoot(),
+                      context,
+                      instance.getTimeFilter(),
+                      dataRegion);
+              return new FragmentInstanceExecution(scheduler, instanceId, 
context, driver);
+            });
+
+    return execution.getInstanceInfo();
+  }
+
+  /**
+   * Gets the info for the specified fragment instance.
+   *
+   * <p>NOTE: this design assumes that only fragment instances that will 
eventually exist are
+   * queried.
+   */
+  public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) {
+    requireNonNull(instanceId, "instanceId is null");
+    FragmentInstanceExecution execution = instanceExecution.get(instanceId);
+    if (execution == null) {
+      return null;
+    }
+    return execution.getInstanceInfo();
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static final FragmentInstanceManager INSTANCE = new 
FragmentInstanceManager();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 0d0feb6..e38a150 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -65,7 +65,6 @@ public class TimeJoinOperator implements ProcessOperator {
       OperatorContext operatorContext,
       List<Operator> children,
       OrderBy mergeOrder,
-      int columnCount,
       List<TSDataType> dataTypes) {
     this.operatorContext = operatorContext;
     this.children = children;
@@ -74,7 +73,7 @@ public class TimeJoinOperator implements ProcessOperator {
     this.inputIndex = new int[this.inputCount];
     this.noMoreTsBlocks = new boolean[this.inputCount];
     this.timeSelector = new TimeSelector(this.inputCount << 1, 
OrderBy.TIMESTAMP_ASC == mergeOrder);
-    this.columnCount = columnCount;
+    this.columnCount = dataTypes.size();
     this.dataTypes = dataTypes;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index c75bfc5..dcc80bd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -51,11 +51,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /** the manager of fragment instances scheduling */
-public class FragmentInstanceManager implements IFragmentInstanceManager, 
IService {
+public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, 
IService {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(FragmentInstanceManager.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(FragmentInstanceScheduler.class);
 
-  public static FragmentInstanceManager getInstance() {
+  public static FragmentInstanceScheduler getInstance() {
     return InstanceHolder.instance;
   }
 
@@ -73,7 +73,7 @@ public class FragmentInstanceManager implements 
IFragmentInstanceManager, IServi
   private InternalService.Client mppServiceClient; // TODO: use from client 
pool
   private final List<AbstractExecutor> threads;
 
-  public FragmentInstanceManager() {
+  private FragmentInstanceScheduler() {
     this.readyQueue =
         new L2PriorityQueue<>(
             MAX_CAPACITY,
@@ -238,7 +238,7 @@ public class FragmentInstanceManager implements 
IFragmentInstanceManager, IServi
 
     private InstanceHolder() {}
 
-    private static final FragmentInstanceManager instance = new 
FragmentInstanceManager();
+    private static final FragmentInstanceScheduler instance = new 
FragmentInstanceScheduler();
   }
   /** the default scheduler implementation */
   private class Scheduler implements ITaskScheduler {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
similarity index 97%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
index 4fb4326..46ee0ea 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
 import java.util.List;
 
 /** the interface of fragment instance scheduling */
-public interface IFragmentInstanceManager {
+public interface IFragmentInstanceScheduler {
 
   /**
    * Submit one or more {@link ExecFragmentInstance} in one query for later 
scheduling.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index b3c60d7..da4f158 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,17 +18,24 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
 import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
+import org.apache.iotdb.db.mpp.execution.Driver;
+import org.apache.iotdb.db.mpp.execution.DriverContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
@@ -37,11 +44,18 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
 
 /**
  * used to plan a fragment instance. Currently, we simply change it from 
PlanNode to executable
@@ -50,6 +64,30 @@ import java.util.List;
  */
 public class LocalExecutionPlanner {
 
+  public static LocalExecutionPlanner getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  public Driver plan(
+      PlanNode plan,
+      FragmentInstanceContext instanceContext,
+      Filter timeFilter,
+      VirtualStorageGroupProcessor dataRegion) {
+    LocalExecutionPlanContext context = new 
LocalExecutionPlanContext(instanceContext);
+
+    Operator root = plan.accept(new Visitor(), context);
+
+    DriverContext driverContext =
+        new DriverContext(
+            instanceContext,
+            context.getPaths(),
+            timeFilter,
+            dataRegion,
+            context.getSourceOperators());
+    instanceContext.setDriverContext(driverContext);
+    return new Driver(root, context.getSinkHandle(), driverContext);
+  }
+
   /** This Visitor is responsible for transferring PlanNode Tree to Operator 
Tree */
   private class Visitor extends PlanVisitor<Operator, 
LocalExecutionPlanContext> {
 
@@ -63,16 +101,23 @@ public class LocalExecutionPlanner {
       PartialPath seriesPath = node.getSeriesPath();
       boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
       OperatorContext operatorContext =
-          context.taskContext.addOperatorContext(
+          context.instanceContext.addOperatorContext(
               context.getNextOperatorId(), node.getId(), 
SeriesScanOperator.class.getSimpleName());
-      return new SeriesScanOperator(
-          seriesPath,
-          node.getAllSensors(),
-          seriesPath.getSeriesType(),
-          operatorContext,
-          node.getTimeFilter(),
-          node.getValueFilter(),
-          ascending);
+
+      SeriesScanOperator seriesScanOperator =
+          new SeriesScanOperator(
+              seriesPath,
+              node.getAllSensors(),
+              seriesPath.getSeriesType(),
+              operatorContext,
+              node.getTimeFilter(),
+              node.getValueFilter(),
+              ascending);
+
+      context.addSourceOperator(seriesScanOperator);
+      context.addPath(seriesPath);
+
+      return seriesScanOperator;
     }
 
     @Override
@@ -114,7 +159,7 @@ public class LocalExecutionPlanner {
     public Operator visitLimit(LimitNode node, LocalExecutionPlanContext 
context) {
       Operator child = node.getChild().accept(this, context);
       return new LimitOperator(
-          context.taskContext.addOperatorContext(
+          context.instanceContext.addOperatorContext(
               context.getNextOperatorId(), node.getId(), 
LimitOperator.class.getSimpleName()),
           node.getLimit(),
           child);
@@ -138,20 +183,81 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext 
context) {
-      return super.visitTimeJoin(node, context);
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(), node.getId(), 
TimeJoinOperator.class.getSimpleName());
+      return new TimeJoinOperator(operatorContext, children, 
node.getMergeOrder(), node.getTypes());
+    }
+
+    @Override
+    public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext 
context) {
+      return super.visitExchange(node, context);
+    }
+
+    @Override
+    public Operator visitFragmentSink(FragmentSinkNode node, 
LocalExecutionPlanContext context) {
+      Operator child = node.getChild().accept(this, context);
+      // TODO(jackie tien) create SinkHandle here
+      ISinkHandle sinkHandle = null;
+      context.setSinkHandle(sinkHandle);
+      return child;
     }
   }
 
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static final LocalExecutionPlanner INSTANCE = new 
LocalExecutionPlanner();
+  }
+
   private static class LocalExecutionPlanContext {
-    private final FragmentInstanceContext taskContext;
+    private final FragmentInstanceContext instanceContext;
+    private final List<PartialPath> paths;
+    private final List<SourceOperator> sourceOperators;
+    private ISinkHandle sinkHandle;
+
     private int nextOperatorId = 0;
 
-    public LocalExecutionPlanContext(FragmentInstanceContext taskContext) {
-      this.taskContext = taskContext;
+    public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) {
+      this.instanceContext = instanceContext;
+      this.paths = new ArrayList<>();
+      this.sourceOperators = new ArrayList<>();
     }
 
     private int getNextOperatorId() {
       return nextOperatorId++;
     }
+
+    public List<PartialPath> getPaths() {
+      return paths;
+    }
+
+    public List<SourceOperator> getSourceOperators() {
+      return sourceOperators;
+    }
+
+    public void addPath(PartialPath path) {
+      paths.add(path);
+    }
+
+    public void addSourceOperator(SourceOperator sourceOperator) {
+      sourceOperators.add(sourceOperator);
+    }
+
+    public ISinkHandle getSinkHandle() {
+      return sinkHandle;
+    }
+
+    public void setSinkHandle(ISinkHandle sinkHandle) {
+      requireNonNull(sinkHandle, "sinkHandle is null");
+      checkArgument(this.sinkHandle == null, "There must be at most one 
SinkNode");
+
+      this.sinkHandle = sinkHandle;
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 61f9292..d4c31e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.nio.ByteBuffer;
 
@@ -40,6 +41,8 @@ public class FragmentInstance implements IConsensusRequest {
   private DataRegionReplicaSet dataRegion;
   private EndPoint hostEndpoint;
 
+  private Filter timeFilter;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -87,6 +90,10 @@ public class FragmentInstance implements IConsensusRequest {
     return "<No downstream>";
   }
 
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
+
   public String toString() {
     StringBuilder ret = new StringBuilder();
     ret.append(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 916f516..c035a01 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 
@@ -91,4 +92,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitExchange(ExchangeNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitFragmentSink(FragmentSinkNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 67e874e..f2c0d84 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
@@ -51,6 +52,9 @@ public class TimeJoinNode extends ProcessNode {
 
   private List<PlanNode> children;
 
+  // output columns' data type
+  private List<TSDataType> types;
+
   public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, FilterNullPolicy 
filterNullPolicy) {
     super(id);
     this.mergeOrder = mergeOrder;
@@ -131,6 +135,10 @@ public class TimeJoinNode extends ProcessNode {
     return "TimeJoinNode-" + this.getId();
   }
 
+  public List<TSDataType> getTypes() {
+    return types;
+  }
+
   @TestOnly
   public Pair<String, List<String>> print() {
     String title = String.format("[TimeJoinNode (%s)]", this.getId());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index b9b439c..e901ab6 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -123,7 +123,6 @@ public class LimitOperatorTest {
               fragmentInstanceContext.getOperatorContexts().get(2),
               Arrays.asList(seriesScanOperator1, seriesScanOperator2),
               OrderBy.TIMESTAMP_ASC,
-              2,
               Arrays.asList(TSDataType.INT32, TSDataType.INT32));
 
       LimitOperator limitOperator =
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 5548b79..a22554f 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -117,7 +117,6 @@ public class TimeJoinOperatorTest {
               fragmentInstanceContext.getOperatorContexts().get(2),
               Arrays.asList(seriesScanOperator1, seriesScanOperator2),
               OrderBy.TIMESTAMP_ASC,
-              2,
               Arrays.asList(TSDataType.INT32, TSDataType.INT32));
       int count = 0;
       while (timeJoinOperator.hasNext()) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index 5d385ae..bade6dd 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -41,7 +41,7 @@ import java.util.concurrent.TimeUnit;
 
 public class DefaultTaskSchedulerTest {
 
-  private final FragmentInstanceManager manager = 
FragmentInstanceManager.getInstance();
+  private final FragmentInstanceScheduler manager = 
FragmentInstanceScheduler.getInstance();
 
   @After
   public void tearDown() {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
index 0233e4c..c79d376 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManagerTest.java
@@ -38,7 +38,7 @@ import java.util.List;
 
 public class FragmentInstanceManagerTest {
 
-  private final FragmentInstanceManager manager = 
FragmentInstanceManager.getInstance();
+  private final FragmentInstanceScheduler manager = 
FragmentInstanceScheduler.getInstance();
 
   @After
   public void tearDown() {

Reply via email to