This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/scanOpBatchProcess1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cf713db5ccd500175578650a1d1a46fd923fc1ab
Author: Minghui Liu <[email protected]>
AuthorDate: Wed Nov 30 23:13:20 2022 +0800

    try to unify
---
 .../source/AbstractSeriesScanOperator.java         |   2 +-
 .../execution/operator/source/SeriesScanUtil.java  | 336 ++-------------------
 2 files changed, 33 insertions(+), 305 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
index 98c3dc7d3f..fabb66ebd6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java
@@ -137,7 +137,7 @@ public abstract class AbstractSeriesScanOperator implements 
DataSourceOperator {
   }
 
   private boolean readPageData() throws IOException {
-    return seriesScanUtil.tryToFetchDataFromPage();
+    return seriesScanUtil.tryToFetchDataFromPage(true);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 7d15555852..a238c24442 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -55,7 +55,6 @@ import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.function.ToLongFunction;
-import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -118,9 +117,7 @@ public class SeriesScanUtil {
   /*
    * result cache
    */
-  protected boolean hasCachedNextOverlappedPage;
-  protected TsBlock cachedTsBlock;
-
+  protected boolean lastPageOverlapped;
   protected TsBlockBuilder cachedTsBlockBuilder;
 
   public SeriesScanUtil(
@@ -400,64 +397,13 @@ public class SeriesScanUtil {
    * This method should be called after hasNextChunk() until no next page, 
make sure that all
    * overlapped pages are consumed
    */
-  @SuppressWarnings("squid:S3776")
-  // Suppress high Cognitive Complexity warning
   public boolean hasNextPage() throws IOException {
-
-    /*
-     * has overlapped data before
-     */
-    if (hasCachedNextOverlappedPage) {
-      return true;
-    } else if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
-      if (hasNextOverlappedPage()) {
-        cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
-          hasCachedNextOverlappedPage = true;
-          return true;
-        }
-      }
-    }
-
-    if (firstPageReader != null) {
-      return true;
-    }
-
-    /*
-     * construct first page reader
-     */
-    if (firstChunkMetadata != null) {
-      /*
-       * try to unpack all overlapped ChunkMetadata to cachedPageReaders
-       */
-      unpackAllOverlappedChunkMetadataToPageReaders(
-          orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), 
true);
-    } else {
-      /*
-       * first chunk metadata is already unpacked, consume cached pages
-       */
-      initFirstPageReader();
-    }
-
-    if (isExistOverlappedPage()) {
-      return true;
-    }
-
-    // make sure firstPageReader won't be null while the unSeqPageReaders has 
more cached page
-    // readers
-    while (firstPageReader == null && (!seqPageReaders.isEmpty() || 
!unSeqPageReaders.isEmpty())) {
-
-      initFirstPageReader();
-
-      if (isExistOverlappedPage()) {
-        return true;
-      }
-    }
-    return firstPageReader != null;
+    return tryToFetchDataFromPage(false);
   }
 
-  /** Return true if there is new data written to the builder */
-  public boolean tryToFetchDataFromPage() throws IOException {
+  /** @return true if there is new data written to the builder */
+  public boolean tryToFetchDataFromPage(boolean consumePageReader) throws 
IOException {
+    lastPageOverlapped = false;
 
     // has overlapped data
     if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) {
@@ -467,7 +413,9 @@ public class SeriesScanUtil {
     }
 
     if (firstPageReader != null) {
-      appendDataFromPageReader();
+      if (consumePageReader) {
+        appendDataFromPageReader();
+      }
       return true;
     }
 
@@ -497,13 +445,15 @@ public class SeriesScanUtil {
     }
 
     if (firstPageReader != null) {
-      appendDataFromPageReader();
+      if (consumePageReader) {
+        appendDataFromPageReader();
+      }
       return true;
     }
     return false;
   }
 
-  /** Return true if there is new data written to the builder */
+  /** @return true if there is new data written to the builder */
   private boolean tryToFetchDataFromOverlappedPage() throws IOException {
     if (firstPageOverlapped()) {
       // next page is overlapped, read overlapped data and cache it
@@ -523,7 +473,12 @@ public class SeriesScanUtil {
     firstPageReader = null;
   }
 
-  /** Return true if there is new data written to the builder */
+  /**
+   * read overlapped data till currentLargestEndTime in mergeReader, if 
current batch does not
+   * contain data, read till next currentLargestEndTime again.
+   *
+   * @return true if there is new data written to the builder
+   */
   private boolean appendDataFromMergeReader() throws IOException {
     int initialPositionCount = cachedTsBlockBuilder.getPositionCount();
 
@@ -582,7 +537,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < firstPageReader.getStatistics().getStartTime())) {
-              return cachedTsBlockBuilder.getPositionCount() > 
initialPositionCount;
+              lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > 
initialPositionCount;
+              return lastPageOverlapped;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), 
firstPageReader.getStatistics())) {
               // current timeValuePair is overlapped with firstPageReader, add 
it to merged reader
@@ -607,7 +563,8 @@ public class SeriesScanUtil {
                 || (!orderUtils.getAscending()
                     && timeValuePair.getTimestamp()
                         < 
seqPageReaders.get(0).getStatistics().getStartTime())) {
-              return cachedTsBlockBuilder.getPositionCount() > 
initialPositionCount;
+              lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > 
initialPositionCount;
+              return lastPageOverlapped;
             } else if (orderUtils.isOverlapped(
                 timeValuePair.getTimestamp(), 
seqPageReaders.get(0).getStatistics())) {
               VersionPageReader pageReader = seqPageReaders.remove(0);
@@ -692,7 +649,8 @@ public class SeriesScanUtil {
         /*
          * if current overlapped page has valid data, return, otherwise read 
next overlapped page
          */
-        if (cachedTsBlockBuilder.getPositionCount() > initialPositionCount) {
+        lastPageOverlapped = cachedTsBlockBuilder.getPositionCount() > 
initialPositionCount;
+        if (lastPageOverlapped) {
           return true;
         } else if (mergeReader.hasNextTimeValuePair()) {
           // condition: seqPage.endTime < mergeReader.currentTime
@@ -704,22 +662,6 @@ public class SeriesScanUtil {
     }
   }
 
-  private boolean isExistOverlappedPage() throws IOException {
-    if (firstPageOverlapped()) {
-      /*
-       * next page is overlapped, read overlapped data and cache it
-       */
-      if (hasNextOverlappedPage()) {
-        cachedTsBlock = nextOverlappedPage();
-        if (cachedTsBlock != null && !cachedTsBlock.isEmpty()) {
-          hasCachedNextOverlappedPage = true;
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
   private boolean firstPageOverlapped() throws IOException {
     if (firstPageReader == null) {
       return false;
@@ -813,17 +755,12 @@ public class SeriesScanUtil {
    * first page is overlapped
    */
   boolean isPageOverlapped() throws IOException {
-
-    /*
-     * has an overlapped page
-     */
-    if (hasCachedNextOverlappedPage) {
+    // has an overlapped page
+    if (lastPageOverlapped) {
       return true;
     }
 
-    /*
-     * has a non-overlapped page in firstPageReader
-     */
+    // has a non-overlapped page in firstPageReader
     if (mergeReader.hasNextTimeValuePair()
         && ((orderUtils.getAscending()
                 && mergeReader.currentTimeValuePair().getTimestamp()
@@ -835,7 +772,6 @@ public class SeriesScanUtil {
     }
 
     Statistics firstPageStatistics = firstPageReader.getStatistics();
-
     return !unSeqPageReaders.isEmpty()
         && orderUtils.isOverlapped(firstPageStatistics, 
unSeqPageReaders.peek().getStatistics());
   }
@@ -867,207 +803,13 @@ public class SeriesScanUtil {
     firstPageReader = null;
   }
 
-  /** This method should only be used when the method isPageOverlapped() 
return true. */
   public TsBlock nextPage() throws IOException {
-
-    if (hasCachedNextOverlappedPage) {
-      hasCachedNextOverlappedPage = false;
-      TsBlock res = cachedTsBlock;
-      cachedTsBlock = null;
-      return res;
-    } else {
-
-      /*
-       * next page is not overlapped, push down value filter if it exists
-       */
-      if (valueFilter != null) {
-        firstPageReader.setFilter(valueFilter);
-      }
-      TsBlock tsBlock = 
firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
-      firstPageReader = null;
-
-      return tsBlock;
-    }
-  }
-
-  /**
-   * read overlapped data till currentLargestEndTime in mergeReader, if 
current batch does not
-   * contain data, read till next currentLargestEndTime again
-   */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
-  private boolean hasNextOverlappedPage() throws IOException {
-
-    if (hasCachedNextOverlappedPage) {
-      return true;
-    }
-
-    tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
-
-    while (true) {
-
-      // may has overlapped data
-      if (mergeReader.hasNextTimeValuePair()) {
-
-        // TODO we still need to consider data type, ascending and descending 
here
-        TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
-        TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
-        long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
-        while (mergeReader.hasNextTimeValuePair()) {
-
-          /*
-           * get current first point in mergeReader, this maybe overlapped 
later
-           */
-          TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
-
-          if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), 
currentPageEndPointTime)) {
-            /*
-             * when the merged point excesses the currentPageEndPointTime, we 
have read all overlapped data before currentPageEndPointTime
-             * 1. has cached batch data, we don't need to read more data, just 
use the cached data later
-             * 2. has first page reader, which means first page reader last 
endTime < currentTimeValuePair.getTimestamp(),
-             * we could just use the first page reader later
-             * 3. sequence page reader is not empty, which means first page 
reader last endTime < currentTimeValuePair.getTimestamp(),
-             * we could use the first sequence page reader later
-             */
-            if (!builder.isEmpty() || firstPageReader != null || 
!seqPageReaders.isEmpty()) {
-              break;
-            }
-            // so, we don't have other data except mergeReader
-            currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
-          }
-
-          // unpack all overlapped data for the first timeValuePair
-          
unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
-          unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
-              timeValuePair.getTimestamp(), false);
-          
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), 
false);
-          
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
-
-          // update if there are unpacked unSeqPageReaders
-          timeValuePair = mergeReader.currentTimeValuePair();
-
-          // from now, the unsequence reader is all unpacked, so we don't need 
to consider it
-          // we has first page reader now
-          if (firstPageReader != null) {
-            // if current timeValuePair excesses the first page reader's end 
time, we just use the
-            // cached data
-            if ((orderUtils.getAscending()
-                    && timeValuePair.getTimestamp() > 
firstPageReader.getStatistics().getEndTime())
-                || (!orderUtils.getAscending()
-                    && timeValuePair.getTimestamp()
-                        < firstPageReader.getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = !builder.isEmpty();
-              cachedTsBlock = builder.build();
-              return hasCachedNextOverlappedPage;
-            } else if (orderUtils.isOverlapped(
-                timeValuePair.getTimestamp(), 
firstPageReader.getStatistics())) {
-              // current timeValuePair is overlapped with firstPageReader, add 
it to merged reader
-              // and update endTime to the max end time
-              mergeReader.addReader(
-                  getPointReader(
-                      
firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
-                  firstPageReader.version,
-                  
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
-                  context);
-              currentPageEndPointTime =
-                  updateEndPointTime(currentPageEndPointTime, firstPageReader);
-              firstPageReader = null;
-            }
-          }
-
-          // the seq page readers is not empty, just like first page reader
-          if (!seqPageReaders.isEmpty()) {
-            if ((orderUtils.getAscending()
-                    && timeValuePair.getTimestamp()
-                        > seqPageReaders.get(0).getStatistics().getEndTime())
-                || (!orderUtils.getAscending()
-                    && timeValuePair.getTimestamp()
-                        < 
seqPageReaders.get(0).getStatistics().getStartTime())) {
-              hasCachedNextOverlappedPage = !builder.isEmpty();
-              cachedTsBlock = builder.build();
-              return hasCachedNextOverlappedPage;
-            } else if (orderUtils.isOverlapped(
-                timeValuePair.getTimestamp(), 
seqPageReaders.get(0).getStatistics())) {
-              VersionPageReader pageReader = seqPageReaders.remove(0);
-              mergeReader.addReader(
-                  
getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
-                  pageReader.version,
-                  orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
-                  context);
-              currentPageEndPointTime = 
updateEndPointTime(currentPageEndPointTime, pageReader);
-            }
-          }
-
-          /*
-           * get the latest first point in mergeReader
-           */
-          timeValuePair = mergeReader.nextTimeValuePair();
-
-          Object valueForFilter = timeValuePair.getValue().getValue();
-
-          // TODO fix value filter firstNotNullObject, currently, if it's a 
value filter, it will
-          // only accept AlignedPath with only one sub sensor
-          if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) {
-            for (TsPrimitiveType tsPrimitiveType : 
timeValuePair.getValue().getVector()) {
-              if (tsPrimitiveType != null) {
-                valueForFilter = tsPrimitiveType.getValue();
-                break;
-              }
-            }
-          }
-
-          if (valueFilter == null
-              || valueFilter.satisfy(timeValuePair.getTimestamp(), 
valueForFilter)) {
-            timeBuilder.writeLong(timeValuePair.getTimestamp());
-            switch (dataType) {
-              case BOOLEAN:
-                
builder.getColumnBuilder(0).writeBoolean(timeValuePair.getValue().getBoolean());
-                break;
-              case INT32:
-                
builder.getColumnBuilder(0).writeInt(timeValuePair.getValue().getInt());
-                break;
-              case INT64:
-                
builder.getColumnBuilder(0).writeLong(timeValuePair.getValue().getLong());
-                break;
-              case FLOAT:
-                
builder.getColumnBuilder(0).writeFloat(timeValuePair.getValue().getFloat());
-                break;
-              case DOUBLE:
-                
builder.getColumnBuilder(0).writeDouble(timeValuePair.getValue().getDouble());
-                break;
-              case TEXT:
-                
builder.getColumnBuilder(0).writeBinary(timeValuePair.getValue().getBinary());
-                break;
-              case VECTOR:
-                TsPrimitiveType[] values = 
timeValuePair.getValue().getVector();
-                for (int i = 0; i < values.length; i++) {
-                  if (values[i] == null) {
-                    builder.getColumnBuilder(i).appendNull();
-                  } else {
-                    
builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
-                  }
-                }
-                break;
-              default:
-                throw new 
UnSupportedDataTypeException(String.valueOf(dataType));
-            }
-            builder.declarePosition();
-          }
-        }
-        hasCachedNextOverlappedPage = !builder.isEmpty();
-        cachedTsBlock = builder.build();
-        /*
-         * if current overlapped page has valid data, return, otherwise read 
next overlapped page
-         */
-        if (hasCachedNextOverlappedPage) {
-          return true;
-        } else if (mergeReader.hasNextTimeValuePair()) {
-          // condition: seqPage.endTime < mergeReader.currentTime
-          return false;
-        }
-      } else {
-        return false;
-      }
+    if (!lastPageOverlapped) {
+      appendDataFromPageReader();
     }
+    TsBlock cachedTsBlock = cachedTsBlockBuilder.build();
+    cachedTsBlockBuilder.reset();
+    return cachedTsBlock;
   }
 
   private long updateEndPointTime(long currentPageEndPointTime, 
VersionPageReader pageReader) {
@@ -1176,20 +918,6 @@ public class SeriesScanUtil {
         context);
   }
 
-  private TsBlock nextOverlappedPage() throws IOException {
-    if (hasCachedNextOverlappedPage || hasNextOverlappedPage()) {
-      hasCachedNextOverlappedPage = false;
-      return cachedTsBlock;
-    }
-    throw new IOException("No more batch data");
-  }
-
-  private LinkedList<TsFileResource> 
sortUnSeqFileResources(List<TsFileResource> tsFileResources) {
-    return tsFileResources.stream()
-        .sorted(orderUtils.comparingLong(tsFileResource -> 
orderUtils.getOrderTime(tsFileResource)))
-        .collect(Collectors.toCollection(LinkedList::new));
-  }
-
   /**
    * unpack all overlapped seq/unseq files and find the first 
TimeSeriesMetadata
    *

Reply via email to