This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/sonar by this push:
     new b6a87055f11 
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last 
done
b6a87055f11 is described below

commit b6a87055f118707f3bbcdfec9299505d84977182
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 21 08:58:13 2023 +0800

    
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last 
done
---
 .../last/AlignedUpdateLastCacheOperator.java       |   2 +-
 .../process/last/LastQueryCollectOperator.java     |   1 +
 .../process/last/LastQueryMergeOperator.java       | 123 +++++++++++++--------
 .../operator/process/last/LastQueryOperator.java   |   1 +
 .../process/last/LastQuerySortOperator.java        |  52 ++++++---
 .../operator/process/last/LastQueryUtil.java       |   5 +
 .../process/last/UpdateLastCacheOperator.java      |   3 +-
 7 files changed, 120 insertions(+), 67 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
index 32c4260d1ea..0ac054023d0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/AlignedUpdateLastCacheOperator.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import static org.weakref.jmx.internal.guava.base.Preconditions.checkArgument;
 
-/** update last cache for aligned series */
+/** update last cache for aligned series. */
 public class AlignedUpdateLastCacheOperator extends 
AbstractUpdateLastCacheOperator {
 
   private final AlignedPath seriesPath;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
index 7e196646acd..53cfffee3ab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryCollectOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index cb21ba62b07..557eac5bc5e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -55,7 +56,7 @@ public class LastQueryMergeOperator implements 
ProcessOperator {
   /** TsBlock from child operator. Only one cache now. */
   private final TsBlock[] inputTsBlocks;
 
-  /** start index for each input TsBlocks and size of it is equal to 
inputTsBlocks */
+  /** start index for each input TsBlocks and size of it is equal to 
inputTsBlocks. */
   private final int[] inputIndex;
 
   /**
@@ -117,51 +118,22 @@ public class LastQueryMergeOperator implements 
ProcessOperator {
     // among all the input TsBlock as the current output TsBlock's 
endTimeSeries.
     for (int i = 0; i < inputOperatorsCount; i++) {
       Operator currentChild = children.get(i);
-      if (!noMoreTsBlocks[i] && empty(i) && currentChild != null) {
-        if (currentChild.hasNextWithTimer()) {
-          inputIndex[i] = 0;
-          inputTsBlocks[i] = currentChild.nextWithTimer();
-          if (!empty(i)) {
-            int rowSize = inputTsBlocks[i].getPositionCount();
-            for (int row = 0; row < rowSize; row++) {
-              Binary key = getTimeSeries(inputTsBlocks[i], row);
-              Location location = timeSeriesSelector.get(key);
-              if (location == null
-                  || inputTsBlocks[i].getTimeByIndex(row)
-                      > 
inputTsBlocks[location.tsBlockIndex].getTimeByIndex(location.rowIndex)) {
-                timeSeriesSelector.put(key, new Location(i, row));
-              }
-            }
-          } else {
-            // child operator has next but return an empty TsBlock which means 
that it may not
-            // finish calculation in given time slice.
-            // In such case, LastQueryMergeOperator can't go on calculating, 
so we just return
-            // null.
-            // We can also use the while loop here to continuously call the 
hasNext() and next()
-            // methods of the child operator until its hasNext() returns false 
or the next() gets
-            // the data that is not empty, but this will cause the execution 
time of the while
-            // loop
-            // to be uncontrollable and may exceed all allocated time slice
-            return null;
-          }
-        } else { // no more tsBlock
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-          currentChild.close();
-          children.set(i, null);
-        }
+      if (needCallNext(i, currentChild) && prepareNext(i, currentChild)) {
+        return null;
       }
       // update the currentEndTimeSeries if the TsBlock is not empty
       if (!empty(i)) {
         Binary endTimeSeries =
             getTimeSeries(inputTsBlocks[i], 
inputTsBlocks[i].getPositionCount() - 1);
-        currentEndTimeSeries =
-            init
-                ? (comparator.compare(currentEndTimeSeries, endTimeSeries) < 0
-                    ? currentEndTimeSeries
-                    : endTimeSeries)
-                : endTimeSeries;
-        init = true;
+        if (init) {
+          currentEndTimeSeries =
+              comparator.compare(currentEndTimeSeries, endTimeSeries) < 0
+                  ? currentEndTimeSeries
+                  : endTimeSeries;
+        } else {
+          currentEndTimeSeries = endTimeSeries;
+          init = true;
+        }
       }
     }
 
@@ -171,11 +143,7 @@ public class LastQueryMergeOperator implements 
ProcessOperator {
       return res;
     }
 
-    while (!timeSeriesSelector.isEmpty()
-        && (comparator.compare(timeSeriesSelector.firstKey(), 
currentEndTimeSeries) <= 0)) {
-      Location location = timeSeriesSelector.pollFirstEntry().getValue();
-      appendLastValue(tsBlockBuilder, inputTsBlocks[location.tsBlockIndex], 
location.rowIndex);
-    }
+    calcCurrentBatch(currentEndTimeSeries);
 
     clearTsBlockCache(currentEndTimeSeries);
 
@@ -184,6 +152,64 @@ public class LastQueryMergeOperator implements 
ProcessOperator {
     return res;
   }
 
+  private boolean needCallNext(int i, Operator currentChild) {
+    return !noMoreTsBlocks[i] && empty(i) && currentChild != null;
+  }
+
+  /**
+   * prepare next batch
+   *
+   * @return true if need to break current loop, otherwise false
+   * @throws Exception errors happened while fetching next batch data
+   */
+  private boolean prepareNext(int i, Operator currentChild) throws Exception {
+    if (currentChild.hasNextWithTimer()) {
+      inputIndex[i] = 0;
+      inputTsBlocks[i] = currentChild.nextWithTimer();
+      if (!empty(i)) {
+        collectTimeSeries(i);
+      } else {
+        // child operator has next but return an empty TsBlock which means 
that it may not
+        // finish calculation in given time slice.
+        // In such case, LastQueryMergeOperator can't go on calculating, so we 
just return
+        // null.
+        // We can also use the while loop here to continuously call the 
hasNext() and next()
+        // methods of the child operator until its hasNext() returns false or 
the next() gets
+        // the data that is not empty, but this will cause the execution time 
of the while
+        // loop
+        // to be uncontrollable and may exceed all allocated time slice
+        return true;
+      }
+    } else { // no more tsBlock
+      noMoreTsBlocks[i] = true;
+      inputTsBlocks[i] = null;
+      currentChild.close();
+      children.set(i, null);
+    }
+    return false;
+  }
+
+  private void collectTimeSeries(int i) {
+    int rowSize = inputTsBlocks[i].getPositionCount();
+    for (int row = 0; row < rowSize; row++) {
+      Binary key = getTimeSeries(inputTsBlocks[i], row);
+      Location location = timeSeriesSelector.get(key);
+      if (location == null
+          || inputTsBlocks[i].getTimeByIndex(row)
+              > 
inputTsBlocks[location.tsBlockIndex].getTimeByIndex(location.rowIndex)) {
+        timeSeriesSelector.put(key, new Location(i, row));
+      }
+    }
+  }
+
+  private void calcCurrentBatch(Binary currentEndTimeSeries) {
+    while (!timeSeriesSelector.isEmpty()
+        && (comparator.compare(timeSeriesSelector.firstKey(), 
currentEndTimeSeries) <= 0)) {
+      Location location = timeSeriesSelector.pollFirstEntry().getValue();
+      appendLastValue(tsBlockBuilder, inputTsBlocks[location.tsBlockIndex], 
location.rowIndex);
+    }
+  }
+
   @Override
   public boolean hasNext() throws Exception {
     if (finished) {
@@ -265,7 +291,8 @@ public class LastQueryMergeOperator implements 
ProcessOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    long childrenSum = 0, minChildReturnSize = Long.MAX_VALUE;
+    long childrenSum = 0;
+    long minChildReturnSize = Long.MAX_VALUE;
     for (Operator child : children) {
       long maxReturnSize = child.calculateMaxReturnSize();
       childrenSum += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
@@ -279,7 +306,7 @@ public class LastQueryMergeOperator implements 
ProcessOperator {
 
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, 
return true; else
-   * return false;
+   * return false.
    */
   private boolean empty(int columnIndex) {
     return inputTsBlocks[columnIndex] == null
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
index 830ef97f1e6..9673f9b36ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
index 9c32e9cd3b8..fe002a43c0a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -133,30 +134,22 @@ public class LastQuerySortOperator implements 
ProcessOperator {
       return res;
     }
 
+    return buildResult();
+  }
+
+  private TsBlock buildResult() throws Exception {
     // start stopwatch
     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
 
     int endIndex = getEndIndex();
 
-    while ((System.nanoTime() - start < maxRuntime)
-        && (currentIndex < endIndex
-            || (previousTsBlock != null
-                && previousTsBlockIndex < previousTsBlock.getPositionCount()))
-        && !tsBlockBuilder.isFull()) {
-      if (previousTsBlock == null || previousTsBlock.getPositionCount() <= 
previousTsBlockIndex) {
-        if (children.get(currentIndex).hasNextWithTimer()) {
-          previousTsBlock = children.get(currentIndex).nextWithTimer();
-          previousTsBlockIndex = 0;
-          if (previousTsBlock == null) {
-            return null;
-          }
-        } else {
-          children.get(currentIndex).close();
-          children.set(currentIndex, null);
-        }
-        currentIndex++;
+    while (keepGoing(start, maxRuntime, endIndex)) {
+
+      if (prepareData()) {
+        return null;
       }
+
       if (previousTsBlockIndex < previousTsBlock.getPositionCount()) {
         if (canUseDataFromCachedTsBlock(previousTsBlock, 
previousTsBlockIndex)) {
           LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, 
cachedTsBlockRowIndex++);
@@ -171,6 +164,31 @@ public class LastQuerySortOperator implements 
ProcessOperator {
     return res;
   }
 
+  private boolean keepGoing(long start, long maxRuntime, int endIndex) {
+    return (System.nanoTime() - start < maxRuntime)
+        && (currentIndex < endIndex
+            || (previousTsBlock != null
+                && previousTsBlockIndex < previousTsBlock.getPositionCount()))
+        && !tsBlockBuilder.isFull();
+  }
+
+  private boolean prepareData() throws Exception {
+    if (previousTsBlock == null || previousTsBlock.getPositionCount() <= 
previousTsBlockIndex) {
+      if (children.get(currentIndex).hasNextWithTimer()) {
+        previousTsBlock = children.get(currentIndex).nextWithTimer();
+        previousTsBlockIndex = 0;
+        if (previousTsBlock == null) {
+          return true;
+        }
+      } else {
+        children.get(currentIndex).close();
+        children.set(currentIndex, null);
+      }
+      currentIndex++;
+    }
+    return false;
+  }
+
   @Override
   public boolean hasNext() throws Exception {
     return currentIndex < inputOperatorsCount
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
index c674fc44f86..2e7e23b5bb7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryUtil.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -45,6 +46,10 @@ public class LastQueryUtil {
   private static final boolean CACHE_ENABLED =
       CommonDescriptor.getInstance().getConfig().isLastCacheEnable();
 
+  private LastQueryUtil() {
+    // util class doesn't need constructor
+  }
+
   public static TsBlockBuilder createTsBlockBuilder() {
     return new TsBlockBuilder(ImmutableList.of(TSDataType.TEXT, 
TSDataType.TEXT, TSDataType.TEXT));
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 44fb538b64b..11d36719d63 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.last;
 
 import org.apache.iotdb.commons.path.MeasurementPath;
@@ -36,7 +37,7 @@ public class UpdateLastCacheOperator extends 
AbstractUpdateLastCacheOperator {
   // accept PartialPath
   private final MeasurementPath fullPath;
 
-  // dataType for queried time series;
+  // type for queried time series
   protected final String dataType;
 
   public UpdateLastCacheOperator(

Reply via email to