This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/sonar by this push:
     new 3496ab12d22 
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process done 
except for fill, join and last
3496ab12d22 is described below

commit 3496ab12d224b7c97d667e876d60f4f236c9e874
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 20 21:14:14 2023 +0800

    server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process 
done except for fill, join and last
---
 .../process/AbstractConsumeAllOperator.java        |   1 +
 .../operator/process/AbstractIntoOperator.java     |  10 +-
 .../operator/process/AggregationOperator.java      |   1 +
 .../operator/process/DeviceMergeOperator.java      | 316 ---------------------
 .../operator/process/DeviceViewIntoOperator.java   |   7 +-
 .../operator/process/DeviceViewOperator.java       |   1 +
 .../execution/operator/process/FillOperator.java   |   3 +-
 .../operator/process/FilterAndProjectOperator.java |  45 ++-
 .../execution/operator/process/IntoOperator.java   |   1 +
 .../execution/operator/process/LimitOperator.java  |   1 +
 .../operator/process/LinearFillOperator.java       |  16 +-
 .../operator/process/MergeSortOperator.java        |  11 +-
 .../execution/operator/process/OffsetOperator.java |   1 +
 .../operator/process/ProcessOperator.java          |   2 +-
 .../process/RawDataAggregationOperator.java        |  10 +-
 .../process/SingleInputAggregationOperator.java    |   1 +
 .../process/SlidingWindowAggregationOperator.java  |  14 +-
 .../execution/operator/process/SortOperator.java   |  47 +--
 .../operator/process/TagAggregationOperator.java   |  68 +++--
 .../operator/process/TransformOperator.java        |  45 +--
 .../operator/source/AlignedSeriesScanUtil.java     |  12 +-
 .../execution/operator/source/SeriesScanUtil.java  |  48 +++-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  36 ---
 .../mpp/execution/operator/OperatorMemoryTest.java |  49 ----
 24 files changed, 225 insertions(+), 521 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
index c13415d6d12..170c6d7035b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractConsumeAllOperator.java
@@ -81,6 +81,7 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
    *
    * @return true if results of all children are ready. Return false if some 
children is blocked or
    *     return null.
+   * @throws Exception errors happened while getting tsblock from children
    */
   protected boolean prepareInput() throws Exception {
     boolean allReady = true;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 8816776d88f..ed618dcc173 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -109,13 +109,12 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
     if (memAllowedMaxRowNumber > Integer.MAX_VALUE) {
       memAllowedMaxRowNumber = Integer.MAX_VALUE;
     }
-    int maxRowNumberInStatement =
+    this.maxRowNumberInStatement =
         Math.min(
             (int) memAllowedMaxRowNumber,
             
IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit());
     long maxStatementSize = maxRowNumberInStatement * statementSizePerLine;
 
-    this.maxRowNumberInStatement = maxRowNumberInStatement;
     this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
     this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
   }
@@ -175,6 +174,9 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
   /**
    * Check whether the last write operation was executed successfully, and 
throw an exception if the
    * execution failed, otherwise continue to execute the operator.
+   *
+   * @throws IntoProcessException wrap InterruptedException with 
IntoProcessException while
+   *     Interruption happened
    */
   private void checkLastWriteOperation() {
     if (writeOperationFuture == null) {
@@ -203,8 +205,6 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
 
       writeOperationFuture = null;
     } catch (InterruptedException e) {
-      LOGGER.warn(
-          "{}: interrupted when processing write operation future with 
exception {}", this, e);
       Thread.currentThread().interrupt();
       throw new IntoProcessException(e.getMessage());
     } catch (ExecutionException e) {
@@ -249,7 +249,7 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
 
   /** Return true if write task is submitted successfully. */
   protected boolean insertMultiTabletsInternally(boolean needCheck) {
-    InsertMultiTabletsStatement insertMultiTabletsStatement =
+    final InsertMultiTabletsStatement insertMultiTabletsStatement =
         constructInsertMultiTabletsStatement(needCheck);
     if (insertMultiTabletsStatement == null) {
       return false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 0102c0be073..0b50b6c8ebb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
deleted file mode 100644
index cb7630dea2f..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.operator.process;
-
-import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import 
org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.google.common.util.concurrent.Futures.successfulAsList;
-
-/**
- * DeviceMergeOperator is responsible for merging tsBlock coming from 
DeviceViewOperators.
- *
- * <p>If the devices in different dataNodes are different, we need to output 
tsBlocks of each node
- * in order of device. If the same device exists in different nodes, the 
tsBlocks need to be merged
- * by time within the device.
- *
- * <p>The form of tsBlocks from input operators should be the same strictly, 
which is transferred by
- * DeviceViewOperator.
- */
-@Deprecated
-public class DeviceMergeOperator implements ProcessOperator {
-
-  private final OperatorContext operatorContext;
-  private final List<String> devices;
-  private final List<Operator> deviceOperators;
-  private final List<TSDataType> dataTypes;
-  private final TsBlockBuilder tsBlockBuilder;
-
-  private final int inputOperatorsCount;
-  private final TsBlock[] inputTsBlocks;
-  // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2
-  private final String[] deviceOfInputTsBlocks;
-  private final boolean[] noMoreTsBlocks;
-
-  private int curDeviceIndex;
-  // the index of curDevice in inputTsBlocks
-  private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>();
-
-  private boolean finished;
-
-  private final TimeSelector timeSelector;
-  private final TimeComparator comparator;
-
-  public DeviceMergeOperator(
-      OperatorContext operatorContext,
-      List<String> devices,
-      List<Operator> deviceOperators,
-      List<TSDataType> dataTypes,
-      TimeSelector selector,
-      TimeComparator comparator) {
-    this.operatorContext = operatorContext;
-    this.devices = devices;
-    this.deviceOperators = deviceOperators;
-    this.inputOperatorsCount = deviceOperators.size();
-    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
-    this.deviceOfInputTsBlocks = new String[inputOperatorsCount];
-    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
-    this.dataTypes = dataTypes;
-    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeSelector = selector;
-    this.comparator = comparator;
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
-        ListenableFuture<?> blocked = deviceOperators.get(i).isBlocked();
-        if (!blocked.isDone()) {
-          listenableFutures.add(blocked);
-        }
-      }
-    }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
-  }
-
-  @Override
-  public TsBlock next() throws Exception {
-    // get new input TsBlock
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && 
deviceOperators.get(i).hasNextWithTimer()) {
-        inputTsBlocks[i] = deviceOperators.get(i).nextWithTimer();
-        if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
-          return null;
-        }
-        deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]);
-        tryToAddCurDeviceTsBlockList(i);
-      }
-    }
-    // move to next device
-    while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 < 
devices.size()) {
-      getNextDeviceTsBlocks();
-    }
-    // process the curDeviceTsBlocks
-    if (curDeviceTsBlockIndexList.size() == 1) {
-      TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)];
-      inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null;
-      curDeviceTsBlockIndexList.clear();
-      return resultTsBlock;
-    } else {
-      tsBlockBuilder.reset();
-      int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size();
-      TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice];
-      TsBlockSingleColumnIterator[] tsBlockIterators =
-          new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice];
-      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
-        deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)];
-        tsBlockIterators[i] = 
deviceTsBlocks[i].getTsBlockSingleColumnIterator();
-      }
-      // Use the min end time of all tsBlocks as the end time of result tsBlock
-      // i.e. only one tsBlock will be consumed totally
-      long currentEndTime = deviceTsBlocks[0].getEndTime();
-      for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
-        currentEndTime =
-            comparator.getCurrentEndTime(currentEndTime, 
deviceTsBlocks[i].getEndTime());
-      }
-
-      TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-      ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-      while (!timeSelector.isEmpty()
-          && comparator.satisfyCurEndTime(timeSelector.first(), 
currentEndTime)) {
-        long timestamp = timeSelector.pollFirst();
-        timeBuilder.writeLong(timestamp);
-        // TODO process by column
-        // Try to find the tsBlock that timestamp belongs to
-        for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
-          // TODO the same timestamp in different data region
-          if (tsBlockIterators[i].hasNext() && 
tsBlockIterators[i].currentTime() == timestamp) {
-            int rowIndex = tsBlockIterators[i].getRowIndex();
-            for (int j = 0; j < valueColumnBuilders.length; j++) {
-              // the jth column of rowIndex of ith tsBlock
-              if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) {
-                valueColumnBuilders[j].appendNull();
-                continue;
-              }
-              valueColumnBuilders[j].write(deviceTsBlocks[i].getColumn(j), 
rowIndex);
-            }
-            tsBlockIterators[i].next();
-            break;
-          }
-        }
-        tsBlockBuilder.declarePosition();
-      }
-      // update tsBlock after consuming
-      int consumedTsBlockIndex = 0;
-      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
-        if (tsBlockIterators[i].hasNext()) {
-          int rowIndex = tsBlockIterators[i].getRowIndex();
-          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] =
-              
inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex);
-        } else {
-          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null;
-          consumedTsBlockIndex = i;
-        }
-      }
-      curDeviceTsBlockIndexList.remove(consumedTsBlockIndex);
-      return tsBlockBuilder.build();
-    }
-  }
-
-  @Override
-  public boolean hasNext() throws Exception {
-    if (finished) {
-      return false;
-    }
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!isTsBlockEmpty(i)) {
-        return true;
-      } else if (!noMoreTsBlocks[i]) {
-        if (deviceOperators.get(i).hasNextWithTimer()) {
-          return true;
-        } else {
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    for (Operator deviceOperator : deviceOperators) {
-      deviceOperator.close();
-    }
-  }
-
-  @Override
-  public boolean isFinished() throws Exception {
-    if (finished) {
-      return true;
-    }
-    finished = true;
-
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      // has more tsBlock output from children[i] or has cached tsBlock in 
inputTsBlocks[i]
-      if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
-        finished = false;
-        break;
-      }
-    }
-    return finished;
-  }
-
-  /** DeviceColumn must be the first value column of tsBlock transferred by 
DeviceViewOperator. */
-  private String getDeviceNameFromTsBlock(TsBlock tsBlock) {
-    if (tsBlock == null || tsBlock.getPositionCount() == 0 || 
tsBlock.getColumn(0).isNull(0)) {
-      return null;
-    }
-    return tsBlock.getColumn(0).getBinary(0).toString();
-  }
-
-  private String getCurDeviceName() {
-    return devices.get(curDeviceIndex);
-  }
-
-  private void getNextDeviceTsBlocks() {
-    curDeviceIndex++;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      tryToAddCurDeviceTsBlockList(i);
-    }
-  }
-
-  private void tryToAddCurDeviceTsBlockList(int tsBlockIndex) {
-    if (deviceOfInputTsBlocks[tsBlockIndex] != null
-        && deviceOfInputTsBlocks[tsBlockIndex].equals(getCurDeviceName())) {
-      // add tsBlock of curDevice to a list
-      curDeviceTsBlockIndexList.add(tsBlockIndex);
-      // add all timestamp of curDevice to timeSelector
-      int rowSize = inputTsBlocks[tsBlockIndex].getPositionCount();
-      for (int row = 0; row < rowSize; row++) {
-        timeSelector.add(inputTsBlocks[tsBlockIndex].getTimeByIndex(row));
-      }
-    }
-  }
-
-  /**
-   * If the tsBlock of tsBlockIndex is null or has no more data in the 
tsBlock, return true; else
-   * return false;
-   */
-  private boolean isTsBlockEmpty(int tsBlockIndex) {
-    return inputTsBlocks[tsBlockIndex] == null
-        || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    // timeSelector will cache time, we use a single time column to represent 
max memory cost
-    long maxPeekMemory = 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    // inputTsBlocks will cache all TsBlocks returned by deviceOperators
-    for (Operator operator : deviceOperators) {
-      maxPeekMemory += operator.calculateMaxReturnSize();
-      maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
-    }
-    for (Operator operator : deviceOperators) {
-      maxPeekMemory = Math.max(maxPeekMemory, 
operator.calculateMaxPeekMemory());
-    }
-    return Math.max(maxPeekMemory, calculateMaxReturnSize());
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    // time + all value columns
-    return (1L + dataTypes.size()) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
-    for (Operator child : deviceOperators) {
-      long maxReturnSize = child.calculateMaxReturnSize();
-      currentRetainedSize += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
-      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
-    }
-    // max cached TsBlock
-    return currentRetainedSize - minChildReturnSize;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 32a0959d80f..95f7c4ce5fe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -54,6 +54,7 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
 
   private final TsBlockBuilder resultTsBlockBuilder;
 
+  @SuppressWarnings("squid:S107")
   public DeviceViewIntoOperator(
       OperatorContext operatorContext,
       Operator child,
@@ -96,7 +97,7 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
 
     String device = 
String.valueOf(inputTsBlock.getValueColumns()[deviceColumnIndex].getBinary(0));
     if (!Objects.equals(device, currentDevice)) {
-      InsertMultiTabletsStatement insertMultiTabletsStatement =
+      final InsertMultiTabletsStatement insertMultiTabletsStatement =
           constructInsertMultiTabletsStatement(false);
       updateResultTsBlock();
 
@@ -113,7 +114,7 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
     int readIndex = 0;
     while (readIndex < inputTsBlock.getPositionCount()) {
       int lastReadIndex = readIndex;
-      for (IntoOperator.InsertTabletStatementGenerator generator :
+      for (AbstractIntoOperator.InsertTabletStatementGenerator generator :
           insertTabletStatementGenerators) {
         lastReadIndex = Math.max(lastReadIndex, 
generator.processTsBlock(inputTsBlock, readIndex));
       }
@@ -142,7 +143,7 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
     return resultTsBlockBuilder.build();
   }
 
-  private List<IntoOperator.InsertTabletStatementGenerator>
+  private List<AbstractIntoOperator.InsertTabletStatementGenerator>
       constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
     Map<PartialPath, Map<String, InputLocation>> 
targetPathToSourceInputLocationMap =
         deviceToTargetPathSourceInputLocationMap.get(currentDevice);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 384163d1395..50052c798dc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index c88ab34483f..db9f6d6c55d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -29,7 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
-/** Used for previous and constant value fill */
+/** Used for previous and constant value fill. */
 public class FillOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
index 232b6c08cc6..6e8cc0fe096 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterAndProjectOperator.java
@@ -48,15 +48,15 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
 
   private final Operator inputOperator;
 
-  private List<LeafColumnTransformer> filterLeafColumnTransformerList;
+  private final List<LeafColumnTransformer> filterLeafColumnTransformerList;
 
-  private ColumnTransformer filterOutputTransformer;
+  private final ColumnTransformer filterOutputTransformer;
 
-  private List<ColumnTransformer> commonTransformerList;
+  private final List<ColumnTransformer> commonTransformerList;
 
-  private List<LeafColumnTransformer> projectLeafColumnTransformerList;
+  private final List<LeafColumnTransformer> projectLeafColumnTransformerList;
 
-  private List<ColumnTransformer> projectOutputTransformerList;
+  private final List<ColumnTransformer> projectOutputTransformerList;
 
   private final TsBlockBuilder filterTsBlockBuilder;
 
@@ -67,6 +67,7 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
   // false when we only need to do projection
   private final boolean hasFilter;
 
+  @SuppressWarnings("squid:S107")
   public FilterAndProjectOperator(
       OperatorContext operatorContext,
       Operator inputOperator,
@@ -117,10 +118,7 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
 
   /**
    * Return the TsBlock that contains both initial input columns and columns 
of common
-   * subexpressions after filtering
-   *
-   * @param input
-   * @return
+   * subexpressions after filtering.
    */
   private TsBlock getFilterTsBlock(TsBlock input) {
     final TimeColumn originTimeColumn = input.getTimeColumn();
@@ -145,7 +143,6 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
       resultColumns.add(input.getColumn(i));
     }
 
-    // todo: remove this if, add calculated common sub expressions anyway
     if (!hasNonMappableUDF) {
       // get result of calculated common sub expressions
       for (ColumnTransformer columnTransformer : commonTransformerList) {
@@ -153,12 +150,32 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
       }
     }
 
+    int rowCount =
+        constructFilteredTsBlock(
+            resultColumns,
+            timeBuilder,
+            filterColumn,
+            originTimeColumn,
+            columnBuilders,
+            positionCount);
+
+    filterTsBlockBuilder.declarePositions(rowCount);
+    return filterTsBlockBuilder.build();
+  }
+
+  private int constructFilteredTsBlock(
+      List<Column> resultColumns,
+      TimeColumnBuilder timeBuilder,
+      Column filterColumn,
+      TimeColumn originTimeColumn,
+      ColumnBuilder[] columnBuilders,
+      int positionCount) {
     // construct result TsBlock of filter
     int rowCount = 0;
     for (int i = 0, n = resultColumns.size(); i < n; i++) {
       Column curColumn = resultColumns.get(i);
       for (int j = 0; j < positionCount; j++) {
-        if (!filterColumn.isNull(j) && filterColumn.getBoolean(j)) {
+        if (satisfy(filterColumn, j)) {
           if (i == 0) {
             rowCount++;
             timeBuilder.writeLong(originTimeColumn.getLong(j));
@@ -171,9 +188,11 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
         }
       }
     }
+    return rowCount;
+  }
 
-    filterTsBlockBuilder.declarePositions(rowCount);
-    return filterTsBlockBuilder.build();
+  private boolean satisfy(Column filterColumn, int rowIndex) {
+    return !filterColumn.isNull(rowIndex) && filterColumn.getBoolean(rowIndex);
   }
 
   private TsBlock getTransformedTsBlock(TsBlock input) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index d5e65d262ed..c58f3f9a1ae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -42,6 +42,7 @@ public class IntoOperator extends AbstractIntoOperator {
 
   private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
 
+  @SuppressWarnings("squid:S107")
   public IntoOperator(
       OperatorContext operatorContext,
       Operator child,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index b208e551b1e..8407f51fce4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 2d785311a6e..8ffc0489c5e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -33,14 +34,13 @@ import java.util.List;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
-/** Used for linear fill */
+/** Used for linear fill. */
 public class LinearFillOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final ILinearFill[] fillArray;
   private final Operator child;
   private final int outputColumnCount;
-  // TODO need to spill it to disk if it consumes too much memory
   private final List<TsBlock> cachedTsBlock;
 
   private final List<Long> cachedRowIndex;
@@ -49,8 +49,10 @@ public class LinearFillOperator implements ProcessOperator {
   // next TsBlock Index for each Column
   private final int[] nextTsBlockIndex;
 
-  // indicate whether we can call child.next()
-  // it's used to make sure that child.next() will only be called once in 
LinearFillOperator.next();
+  /**
+   * indicate whether we can call child.next(). it's used to make sure that 
child.next() will only
+   * be called once in LinearFillOperator.next().
+   */
   private boolean canCallNext;
   // indicate whether there is more TsBlock for child operator
   private boolean noMoreTsBlock;
@@ -81,6 +83,7 @@ public class LinearFillOperator implements ProcessOperator {
     return child.isBlocked();
   }
 
+  @SuppressWarnings("squid:S3776")
   @Override
   public TsBlock next() throws Exception {
 
@@ -181,7 +184,7 @@ public class LinearFillOperator implements ProcessOperator {
   }
 
   /**
-   * Judge whether we can use current cached TsBlock to fill Column
+   * Judge whether we can use current cached TsBlock to fill Column.
    *
    * @param columnIndex index for column which need to be filled
    * @param currentEndRowIndex row index for endTime of column which need to 
be filled
@@ -206,7 +209,10 @@ public class LinearFillOperator implements ProcessOperator 
{
   }
 
   /**
+   * Try to get next TsBlock
+   *
    * @return true if we succeed to get next TsBlock and add it into 
cachedTsBlock, otherwise false
+   * @throws Exception errors happened while getting next batch data
    */
   private boolean tryToGetNextTsBlock() throws Exception {
     if (canCallNext) { // if we can call child.next(), we call that and cache 
it in
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
index d13798a5176..a11df165ed2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -83,6 +84,7 @@ public class MergeSortOperator extends 
AbstractConsumeAllOperator {
         : successfulAsList(listenableFutures);
   }
 
+  @SuppressWarnings({"squid:S3776", "squid:S135"})
   @Override
   public TsBlock next() throws Exception {
     // start stopwatch
@@ -204,7 +206,8 @@ public class MergeSortOperator extends 
AbstractConsumeAllOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    long currentRetainedSize = 0;
+    long minChildReturnSize = Long.MAX_VALUE;
     for (Operator child : children) {
       long maxReturnSize = child.calculateMaxReturnSize();
       minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
@@ -223,7 +226,7 @@ public class MergeSortOperator extends 
AbstractConsumeAllOperator {
   protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
+      if (needCallNext(i)) {
         continue;
       }
       if (canCallNext[i]) {
@@ -245,4 +248,8 @@ public class MergeSortOperator extends 
AbstractConsumeAllOperator {
     }
     return allReady;
   }
+
+  private boolean needCallNext(int i) {
+    return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 22429b431e2..af9acc47e33 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
index 6205cb766d0..d6be0d4d179 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 
-// TODO should think about what interfaces should this ProcessOperator have
 public interface ProcessOperator extends Operator {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index ed765d9b478..7c6dc64dd63 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -84,6 +84,7 @@ public class RawDataAggregationOperator extends 
SingleInputAggregationOperator {
     return windowManager.hasNext(hasMoreData());
   }
 
+  @SuppressWarnings({"squid:S3776", "squid:S135"})
   @Override
   protected boolean calculateNextAggregationResult() throws Exception {
 
@@ -110,7 +111,9 @@ public class RawDataAggregationOperator extends 
SingleInputAggregationOperator {
         // If the window is not initialized, it just returns to avoid invoking
         // updateResultTsBlock()
         // but if it's skipping the last window, just break and keep skipping.
-        if (needSkip || windowManager.isCurWindowInit()) break;
+        if (needSkip || windowManager.isCurWindowInit()) {
+          break;
+        }
         return false;
       }
     }
@@ -138,6 +141,7 @@ public class RawDataAggregationOperator extends 
SingleInputAggregationOperator {
     return true;
   }
 
+  @SuppressWarnings({"squid:S3776", "squid:S135"})
   private boolean calculateFromRawData() {
     // if window is not initialized, we should init window status and reset 
aggregators
     if (!windowManager.isCurWindowInit() && 
!skipPreviousWindowAndInitCurWindow()) {
@@ -179,7 +183,9 @@ public class RawDataAggregationOperator extends 
SingleInputAggregationOperator {
       }
 
       // if no row needs to skip, just send a null parameter.
-      if (!hasSkip) needProcess = null;
+      if (!hasSkip) {
+        needProcess = null;
+      }
 
       for (Aggregator aggregator : aggregators) {
         // Current agg method has been calculated
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 40938e335ed..302e6ff34aa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -106,6 +106,7 @@ public abstract class SingleInputAggregationOperator 
implements ProcessOperator
     child.close();
   }
 
+  @SuppressWarnings("squid:S112")
   protected abstract boolean calculateNextAggregationResult() throws Exception;
 
   protected abstract void updateResultTsBlock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 3e8871da00d..e417073d8c1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -75,6 +75,7 @@ public class SlidingWindowAggregationOperator extends 
SingleInputAggregationOper
     return curTimeRange != null || timeRangeIterator.hasNextTimeRange();
   }
 
+  @SuppressWarnings("squid:S112")
   @Override
   protected boolean calculateNextAggregationResult() throws Exception {
     if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
@@ -110,15 +111,16 @@ public class SlidingWindowAggregationOperator extends 
SingleInputAggregationOper
     return true;
   }
 
-  /** @return if already get the result */
+  /** return if already get the result. */
   private boolean isCalculationDone() {
-    if (curSubTimeRange == null && !subTimeRangeIterator.hasNextTimeRange()) {
-      return true;
+    if (curSubTimeRange == null) {
+      if (!subTimeRangeIterator.hasNextTimeRange()) {
+        return true;
+      } else {
+        curSubTimeRange = subTimeRangeIterator.nextTimeRange();
+      }
     }
 
-    if (curSubTimeRange == null && subTimeRangeIterator.hasNextTimeRange()) {
-      curSubTimeRange = subTimeRangeIterator.nextTimeRange();
-    }
     return ascending
         ? curSubTimeRange.getMin() > curTimeRange.getMax()
         : curSubTimeRange.getMax() < curTimeRange.getMin();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
index 20dcac02f7e..328356029ef 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SortOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
@@ -135,7 +136,9 @@ public class SortOperator implements ProcessOperator {
   }
 
   private void prepareSortReaders() throws IoTDBException {
-    if (sortReaders != null) return;
+    if (sortReaders != null) {
+      return;
+    }
 
     sortReaders = new ArrayList<>();
     if (cachedBytes != 0) {
@@ -208,21 +211,8 @@ public class SortOperator implements ProcessOperator {
 
   private TsBlock mergeSort() throws IoTDBException {
 
-    if (mergeSortHeap == null) {
-      mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
-      // 1. fill the input from each reader
-      for (int i = 0; i < sortReaders.size(); i++) {
-        SortReader sortReader = sortReaders.get(i);
-        if (sortReader.hasNext()) {
-          MergeSortKey mergeSortKey = sortReader.next();
-          mergeSortKey.inputChannelIndex = i;
-          mergeSortHeap.push(mergeSortKey);
-        } else {
-          noMoreData[i] = true;
-          sortBufferManager.releaseOneSortBranch();
-        }
-      }
-    }
+    // 1. fill the input from each reader
+    initMergeSortHeap();
 
     long startTime = System.nanoTime();
     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
@@ -262,6 +252,23 @@ public class SortOperator implements ProcessOperator {
     return tsBlockBuilder.build();
   }
 
+  private void initMergeSortHeap() throws IoTDBException {
+    if (mergeSortHeap == null) {
+      mergeSortHeap = new MergeSortHeap(sortReaders.size(), comparator);
+      for (int i = 0; i < sortReaders.size(); i++) {
+        SortReader sortReader = sortReaders.get(i);
+        if (sortReader.hasNext()) {
+          MergeSortKey mergeSortKey = sortReader.next();
+          mergeSortKey.inputChannelIndex = i;
+          mergeSortHeap.push(mergeSortKey);
+        } else {
+          noMoreData[i] = true;
+          sortBufferManager.releaseOneSortBranch();
+        }
+      }
+    }
+  }
+
   private MergeSortKey readNextMergeSortKey(int readerIndex) throws 
IoTDBException {
     SortReader sortReader = sortReaders.get(readerIndex);
     if (sortReader.hasNext()) {
@@ -273,7 +280,9 @@ public class SortOperator implements ProcessOperator {
   }
 
   private boolean hasMoreData() {
-    if (noMoreData == null) return true;
+    if (noMoreData == null) {
+      return true;
+    }
     for (boolean noMore : noMoreData) {
       if (!noMore) {
         return true;
@@ -283,7 +292,9 @@ public class SortOperator implements ProcessOperator {
   }
 
   public void clear() {
-    if (!diskSpiller.hasSpilledData()) return;
+    if (!diskSpiller.hasSpilledData()) {
+      return;
+    }
     try {
       if (sortReaders != null) {
         for (SortReader sortReader : sortReaders) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index 0360028c2bb..299f637eb9e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -57,7 +57,7 @@ public class TagAggregationOperator extends 
AbstractConsumeAllOperator {
     this.groups = Validate.notNull(groups);
     this.groupedAggregators = Validate.notNull(groupedAggregators);
     List<TSDataType> actualOutputColumnTypes = new ArrayList<>();
-    for (String ignored : groups.get(0)) {
+    for (int i = 0; i < groups.get(0).size(); i++) {
       actualOutputColumnTypes.add(TSDataType.TEXT);
     }
     for (int outputColumnIdx = 0;
@@ -104,44 +104,50 @@ public class TagAggregationOperator extends 
AbstractConsumeAllOperator {
       rowBlocks[i] = inputTsBlocks[i].getRegion(consumedIndices[i], 1);
     }
     for (int groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
-      List<String> group = groups.get(groupIdx);
       List<Aggregator> aggregators = groupedAggregators.get(groupIdx);
+      aggregate(aggregators, rowBlocks);
+      List<String> group = groups.get(groupIdx);
+      appendOneRow(rowBlocks, group, aggregators);
+    }
 
-      for (Aggregator aggregator : aggregators) {
-        if (aggregator == null) {
-          continue;
-        }
-        aggregator.reset();
-        aggregator.processTsBlocks(rowBlocks);
+    // Reset dataReady for next iteration
+    for (int i = 0; i < children.size(); i++) {
+      consumedIndices[i]++;
+    }
+  }
+
+  private void aggregate(List<Aggregator> aggregators, TsBlock[] rowBlocks) {
+    for (Aggregator aggregator : aggregators) {
+      if (aggregator == null) {
+        continue;
       }
+      aggregator.reset();
+      aggregator.processTsBlocks(rowBlocks);
+    }
+  }
 
-      TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
-      timeColumnBuilder.writeLong(rowBlocks[0].getStartTime());
-      ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+  private void appendOneRow(TsBlock[] rowBlocks, List<String> group, 
List<Aggregator> aggregators) {
+    TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(rowBlocks[0].getStartTime());
+    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
 
-      for (int i = 0; i < group.size(); i++) {
-        if (group.get(i) == null) {
-          columnBuilders[i].writeBinary(new Binary("NULL"));
-        } else {
-          columnBuilders[i].writeBinary(new Binary(group.get(i)));
-        }
-      }
-      for (int i = 0; i < aggregators.size(); i++) {
-        Aggregator aggregator = aggregators.get(i);
-        ColumnBuilder columnBuilder = columnBuilders[i + group.size()];
-        if (aggregator == null) {
-          columnBuilder.appendNull();
-        } else {
-          aggregator.outputResult(new ColumnBuilder[] {columnBuilder});
-        }
+    for (int i = 0; i < group.size(); i++) {
+      if (group.get(i) == null) {
+        columnBuilders[i].writeBinary(new Binary("NULL"));
+      } else {
+        columnBuilders[i].writeBinary(new Binary(group.get(i)));
       }
-      tsBlockBuilder.declarePosition();
     }
-
-    // Reset dataReady for next iteration
-    for (int i = 0; i < children.size(); i++) {
-      consumedIndices[i]++;
+    for (int i = 0; i < aggregators.size(); i++) {
+      Aggregator aggregator = aggregators.get(i);
+      ColumnBuilder columnBuilder = columnBuilders[i + group.size()];
+      if (aggregator == null) {
+        columnBuilder.appendNull();
+      } else {
+        aggregator.outputResult(new ColumnBuilder[] {columnBuilder});
+      }
     }
+    tsBlockBuilder.declarePosition();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index f5c056a3e4f..2331f1e7daf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -44,10 +44,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -56,8 +53,6 @@ import java.util.Map;
 
 public class TransformOperator implements ProcessOperator {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TransformOperator.class);
-
   protected final float udfReaderMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
   protected final float udfTransformerMemoryBudgetInMB =
@@ -79,6 +74,7 @@ public class TransformOperator implements ProcessOperator {
 
   private final String udtfQueryId;
 
+  @SuppressWarnings("squid:S107")
   public TransformOperator(
       OperatorContext operatorContext,
       Operator inputOperator,
@@ -89,7 +85,7 @@ public class TransformOperator implements ProcessOperator {
       ZoneId zoneId,
       Map<NodeRef<Expression>, TSDataType> expressionTypes,
       boolean isAscending)
-      throws QueryProcessException, IOException {
+      throws QueryProcessException {
     this.operatorContext = operatorContext;
     this.inputOperator = inputOperator;
     this.keepNull = keepNull;
@@ -158,6 +154,7 @@ public class TransformOperator implements ProcessOperator {
     return YieldableState.YIELDABLE;
   }
 
+  @SuppressWarnings("squid:S135")
   protected YieldableState iterateReaderToNextValid(LayerPointReader reader) 
throws Exception {
     // Since a constant operand is not allowed to be a result column, the 
reader will not be
     // a ConstantLayerPointReader.
@@ -174,6 +171,7 @@ public class TransformOperator implements ProcessOperator {
     return yieldableState;
   }
 
+  @SuppressWarnings("squid:S112")
   @Override
   public final boolean hasNext() throws Exception {
     if (!timeHeap.isEmpty()) {
@@ -184,12 +182,12 @@ public class TransformOperator implements ProcessOperator 
{
         return true;
       }
     } catch (Exception e) {
-      LOGGER.error("TransformOperator#hasNext()", e);
       throw new RuntimeException(e);
     }
     return !timeHeap.isEmpty();
   }
 
+  @SuppressWarnings("squid:S112")
   @Override
   public TsBlock next() throws Exception {
 
@@ -200,13 +198,7 @@ public class TransformOperator implements ProcessOperator {
       }
 
       final TsBlockBuilder tsBlockBuilder = 
TsBlockBuilder.createWithOnlyTimeColumn();
-      if (outputDataTypes == null) {
-        outputDataTypes = new ArrayList<>();
-        for (LayerPointReader reader : transformers) {
-          outputDataTypes.add(reader.getDataType());
-        }
-      }
-      tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
+      prepareTsBlockBuilder(tsBlockBuilder);
       final TimeColumnBuilder timeBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
       final ColumnBuilder[] columnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
       final int columnCount = columnBuilders.length;
@@ -232,11 +224,7 @@ public class TransformOperator implements ProcessOperator {
           }
         }
 
-        for (int i = 0; i < columnCount; ++i) {
-          if (shouldIterateReadersToNextValid[i]) {
-            transformers[i].readyForNext();
-          }
-        }
+        prepareEachColumn(columnCount);
 
         ++rowCount;
 
@@ -252,11 +240,28 @@ public class TransformOperator implements ProcessOperator 
{
       tsBlockBuilder.declarePositions(rowCount);
       return tsBlockBuilder.build();
     } catch (Exception e) {
-      LOGGER.error("TransformOperator#next()", e);
       throw new RuntimeException(e);
     }
   }
 
+  protected void prepareTsBlockBuilder(TsBlockBuilder tsBlockBuilder) {
+    if (outputDataTypes == null) {
+      outputDataTypes = new ArrayList<>();
+      for (LayerPointReader reader : transformers) {
+        outputDataTypes.add(reader.getDataType());
+      }
+    }
+    tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
+  }
+
+  private void prepareEachColumn(int columnCount) {
+    for (int i = 0; i < columnCount; ++i) {
+      if (shouldIterateReadersToNextValid[i]) {
+        transformers[i].readyForNext();
+      }
+    }
+  }
+
   protected boolean collectReaderAppendIsNull(LayerPointReader reader, long 
currentTime)
       throws Exception {
     final YieldableState yieldableState = reader.yield();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
index 883dadfd5f4..0458572f4d8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java
@@ -61,26 +61,31 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
     isAligned = true;
   }
 
+  @SuppressWarnings("squid:S3740")
   @Override
-  protected Statistics currentFileStatistics(int index) throws IOException {
+  protected Statistics currentFileStatistics(int index) {
     return ((AlignedTimeSeriesMetadata) 
firstTimeSeriesMetadata).getStatistics(index);
   }
 
+  @SuppressWarnings("squid:S3740")
   @Override
-  protected Statistics currentFileTimeStatistics() throws IOException {
+  protected Statistics currentFileTimeStatistics() {
     return ((AlignedTimeSeriesMetadata) 
firstTimeSeriesMetadata).getTimeStatistics();
   }
 
+  @SuppressWarnings("squid:S3740")
   @Override
-  protected Statistics currentChunkStatistics(int index) throws IOException {
+  protected Statistics currentChunkStatistics(int index) {
     return ((AlignedChunkMetadata) firstChunkMetadata).getStatistics(index);
   }
 
+  @SuppressWarnings("squid:S3740")
   @Override
   protected Statistics currentChunkTimeStatistics() {
     return ((AlignedChunkMetadata) firstChunkMetadata).getTimeStatistics();
   }
 
+  @SuppressWarnings("squid:S3740")
   @Override
   protected Statistics currentPageStatistics(int index) throws IOException {
     if (firstPageReader == null) {
@@ -89,6 +94,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
     return firstPageReader.getStatistics(index);
   }
 
+  @SuppressWarnings("squid:S3740")
   @Override
   protected Statistics currentPageTimeStatistics() throws IOException {
     if (firstPageReader == null) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 728d88ee23a..5e3089c7623 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -70,6 +70,9 @@ import static 
org.apache.iotdb.db.mpp.metric.SeriesScanCostMetricSet.BUILD_TSBLO
 
 public class SeriesScanUtil {
 
+  private static final String ONE_SENSOR_ERROR_MSG =
+      "Only one sensor in non-aligned SeriesScanUtil.";
+
   private final QueryContext context;
 
   // The path of the target series which will be scanned.
@@ -214,6 +217,7 @@ public class SeriesScanUtil {
     return firstTimeSeriesMetadata != null;
   }
 
+  @SuppressWarnings("squid:S3740")
   boolean isFileOverlapped() throws IOException {
     if (firstTimeSeriesMetadata == null) {
       throw new IOException("no first file");
@@ -227,16 +231,19 @@ public class SeriesScanUtil {
                 fileStatistics, 
unSeqTimeSeriesMetadata.peek().getStatistics());
   }
 
+  @SuppressWarnings("squid:S3740")
   Statistics currentFileStatistics() {
     return firstTimeSeriesMetadata.getStatistics();
   }
 
-  protected Statistics currentFileStatistics(int index) throws IOException {
-    checkArgument(index == 0, "Only one sensor in non-aligned 
SeriesScanUtil.");
+  @SuppressWarnings("squid:S3740")
+  protected Statistics currentFileStatistics(int index) {
+    checkArgument(index == 0, ONE_SENSOR_ERROR_MSG);
     return currentFileStatistics();
   }
 
-  protected Statistics currentFileTimeStatistics() throws IOException {
+  @SuppressWarnings("squid:S3740")
+  protected Statistics currentFileTimeStatistics() {
     return currentFileStatistics();
   }
 
@@ -287,6 +294,7 @@ public class SeriesScanUtil {
     return firstChunkMetadata != null;
   }
 
+  @SuppressWarnings("squid:S3740")
   protected void filterFirstChunkMetadata() throws IOException {
     if (firstChunkMetadata != null && !isChunkOverlapped() && 
!firstChunkMetadata.isModified()) {
       Filter queryFilter = scanOptions.getQueryFilter();
@@ -353,8 +361,7 @@ public class SeriesScanUtil {
     }
   }
 
-  protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata 
timeSeriesMetadata)
-      throws IOException {
+  protected void unpackOneTimeSeriesMetadata(ITimeSeriesMetadata 
timeSeriesMetadata) {
     List<IChunkMetadata> chunkMetadataList =
         FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata);
     chunkMetadataList.forEach(chunkMetadata -> 
chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
@@ -362,6 +369,7 @@ public class SeriesScanUtil {
     cachedChunkMetadata.addAll(chunkMetadataList);
   }
 
+  @SuppressWarnings("squid:S3740")
   boolean isChunkOverlapped() throws IOException {
     if (firstChunkMetadata == null) {
       throw new IOException("no first chunk");
@@ -372,16 +380,19 @@ public class SeriesScanUtil {
         && orderUtils.isOverlapped(chunkStatistics, 
cachedChunkMetadata.peek().getStatistics());
   }
 
+  @SuppressWarnings("squid:S3740")
   Statistics currentChunkStatistics() {
     return firstChunkMetadata.getStatistics();
   }
 
-  protected Statistics currentChunkStatistics(int index) throws IOException {
-    checkArgument(index == 0, "Only one sensor in non-aligned 
SeriesScanUtil.");
+  @SuppressWarnings("squid:S3740")
+  protected Statistics currentChunkStatistics(int index) {
+    checkArgument(index == 0, ONE_SENSOR_ERROR_MSG);
     return currentChunkStatistics();
   }
 
-  protected Statistics currentChunkTimeStatistics() throws IOException {
+  @SuppressWarnings("squid:S3740")
+  protected Statistics currentChunkTimeStatistics() {
     return currentChunkStatistics();
   }
 
@@ -567,6 +578,7 @@ public class SeriesScanUtil {
    * <p>hasNextPage may cache firstPageReader if it is not overlapped or 
cached a BatchData if the
    * first page is overlapped
    */
+  @SuppressWarnings("squid:S3740")
   boolean isPageOverlapped() throws IOException {
 
     /*
@@ -595,6 +607,7 @@ public class SeriesScanUtil {
         && orderUtils.isOverlapped(firstPageStatistics, 
unSeqPageReaders.peek().getStatistics());
   }
 
+  @SuppressWarnings("squid:S3740")
   Statistics currentPageStatistics() {
     if (firstPageReader == null) {
       return null;
@@ -602,11 +615,13 @@ public class SeriesScanUtil {
     return firstPageReader.getStatistics();
   }
 
+  @SuppressWarnings("squid:S3740")
   protected Statistics currentPageStatistics(int index) throws IOException {
-    checkArgument(index == 0, "Only one sensor in non-aligned 
SeriesScanUtil.");
+    checkArgument(index == 0, ONE_SENSOR_ERROR_MSG);
     return currentPageStatistics();
   }
 
+  @SuppressWarnings("squid:S3740")
   protected Statistics currentPageTimeStatistics() throws IOException {
     return currentPageStatistics();
   }
@@ -671,7 +686,6 @@ public class SeriesScanUtil {
         // may has overlapped data
         if (mergeReader.hasNextTimeValuePair()) {
 
-          // TODO we still need to consider data type, ascending and 
descending here
           TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
           TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
           long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
@@ -1033,6 +1047,7 @@ public class SeriesScanUtil {
     }
   }
 
+  @SuppressWarnings("squid:S3740")
   protected void filterFirstTimeSeriesMetadata() throws IOException {
     if (firstTimeSeriesMetadata != null
         && !isFileOverlapped()
@@ -1132,10 +1147,12 @@ public class SeriesScanUtil {
       this.isMem = data instanceof MemPageReader || data instanceof 
MemAlignedPageReader;
     }
 
+    @SuppressWarnings("squid:S3740")
     Statistics getStatistics() {
       return data.getStatistics();
     }
 
+    @SuppressWarnings("squid:S3740")
     Statistics getStatistics(int index) throws IOException {
       if (!(data instanceof IAlignedPageReader)) {
         throw new IOException("Can only get statistics by index from 
AlignedPageReader");
@@ -1143,6 +1160,7 @@ public class SeriesScanUtil {
       return ((IAlignedPageReader) data).getStatistics(index);
     }
 
+    @SuppressWarnings("squid:S3740")
     Statistics getTimeStatistics() throws IOException {
       if (!(data instanceof IAlignedPageReader)) {
         throw new IOException("Can only get statistics of time column from 
AlignedPageReader");
@@ -1230,26 +1248,31 @@ public class SeriesScanUtil {
 
   class DescTimeOrderUtils implements TimeOrderUtils {
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public long getOrderTime(Statistics statistics) {
       return statistics.getEndTime();
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public long getOrderTime(TsFileResource fileResource) {
       return fileResource.getEndTime(seriesPath.getDevice());
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public long getOverlapCheckTime(Statistics range) {
       return range.getStartTime();
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public boolean isOverlapped(Statistics left, Statistics right) {
       return left.getStartTime() <= right.getEndTime();
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public boolean isOverlapped(long time, Statistics right) {
       return time <= right.getEndTime();
@@ -1348,26 +1371,31 @@ public class SeriesScanUtil {
 
   class AscTimeOrderUtils implements TimeOrderUtils {
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public long getOrderTime(Statistics statistics) {
       return statistics.getStartTime();
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public long getOrderTime(TsFileResource fileResource) {
       return fileResource.getStartTime(seriesPath.getDevice());
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public long getOverlapCheckTime(Statistics range) {
       return range.getEndTime();
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public boolean isOverlapped(Statistics left, Statistics right) {
       return left.getEndTime() >= right.getStartTime();
     }
 
+    @SuppressWarnings("squid:S3740")
     @Override
     public boolean isOverlapped(long time, Statistics right) {
       return time >= right.getStartTime();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0c2d75b35f3..0ebf65484be 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -46,7 +46,6 @@ import 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewIntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
@@ -158,7 +157,6 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryS
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
@@ -212,7 +210,6 @@ import org.apache.iotdb.db.mpp.statistics.StatisticsManager;
 import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -778,39 +775,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         operatorContext, node.getDevices(), children, deviceColumnIndex, 
outputColumnTypes);
   }
 
-  @Deprecated
-  @Override
-  public Operator visitDeviceMerge(DeviceMergeNode node, 
LocalExecutionPlanContext context) {
-    OperatorContext operatorContext =
-        context
-            .getDriverContext()
-            .addOperatorContext(
-                context.getNextOperatorId(),
-                node.getPlanNodeId(),
-                DeviceMergeOperator.class.getSimpleName());
-    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
-    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
-    TimeSelector selector = null;
-    TimeComparator timeComparator = null;
-    for (SortItem sortItem : node.getMergeOrderParameter().getSortItemList()) {
-      if (Objects.equals(sortItem.getSortKey(), OrderByKey.TIME)) {
-        Ordering ordering = sortItem.getOrdering();
-        if (ordering == Ordering.ASC) {
-          selector = new TimeSelector(node.getChildren().size() << 1, true);
-          timeComparator = ASC_TIME_COMPARATOR;
-        } else {
-          selector = new TimeSelector(node.getChildren().size() << 1, false);
-          timeComparator = DESC_TIME_COMPARATOR;
-        }
-        break;
-      }
-    }
-
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new DeviceMergeOperator(
-        operatorContext, node.getDevices(), children, dataTypes, selector, 
timeComparator);
-  }
-
   @Override
   public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext 
context) {
     OperatorContext operatorContext =
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index bea10d48463..2ca08105c1d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.AggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
@@ -49,7 +48,6 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.join.HorizontallyConcatOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.AbstractUpdateLastCacheOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
@@ -88,7 +86,6 @@ import 
org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditi
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer;
 import 
org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -557,52 +554,6 @@ public class OperatorMemoryTest {
     assertEquals(512, 
linearFillOperator.calculateRetainedSizeAfterCallingNext());
   }
 
-  @Test
-  public void deviceMergeOperatorTest() {
-    List<Operator> children = new ArrayList<>(4);
-    List<TSDataType> dataTypeList = new ArrayList<>(2);
-    dataTypeList.add(TSDataType.INT32);
-    dataTypeList.add(TSDataType.INT32);
-    List<String> devices = new ArrayList<>(4);
-    devices.add("device1");
-    devices.add("device2");
-    devices.add("device3");
-    devices.add("device4");
-    long expectedMaxReturnSize =
-        3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    long expectedMaxPeekMemory = 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
-    long expectedRetainedSizeAfterCallingNext = 0;
-    long childrenMaxPeekMemory = 0;
-
-    for (int i = 0; i < 4; i++) {
-      Operator child = Mockito.mock(Operator.class);
-      Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L);
-      Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L);
-      
Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 
1024L);
-      expectedMaxPeekMemory += 128 * 1024L;
-      childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, 
child.calculateMaxPeekMemory());
-      expectedRetainedSizeAfterCallingNext += 128 * 1024L;
-      children.add(child);
-    }
-
-    expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, 
childrenMaxPeekMemory);
-
-    DeviceMergeOperator deviceMergeOperator =
-        new DeviceMergeOperator(
-            Mockito.mock(OperatorContext.class),
-            devices,
-            children,
-            dataTypeList,
-            Mockito.mock(TimeSelector.class),
-            Mockito.mock(TimeComparator.class));
-
-    assertEquals(expectedMaxPeekMemory, 
deviceMergeOperator.calculateMaxPeekMemory());
-    assertEquals(expectedMaxReturnSize, 
deviceMergeOperator.calculateMaxReturnSize());
-    assertEquals(
-        expectedRetainedSizeAfterCallingNext - 64 * 1024L,
-        deviceMergeOperator.calculateRetainedSizeAfterCallingNext());
-  }
-
   @Test
   public void deviceViewOperatorTest() {
     List<Operator> children = new ArrayList<>(4);

Reply via email to