This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch pushdownlimit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 649cb4e98e1d1c3cae73d1d3b1daf31fc9020046
Author: Alima777 <[email protected]>
AuthorDate: Tue Dec 7 16:59:16 2021 +0800

    push down limit to read task in rawdatasetwithoutValueFilter
---
 .../dataset/RawQueryDataSetWithoutValueFilter.java | 32 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 5f83c74..78be9d2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -58,12 +58,24 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
     private final ManagedSeriesReader reader;
     private final String pathName;
     private final BlockingQueue<BatchData> blockingQueue;
+    // If rowLimit/rowOffset exists, fetchLimit = rowLimit + rowOffset, 
Otherwise Integer.Max
+    // It is only used when the readTask is initialized first time, to avoid 
read too much batchData
+    // If fetchLimit is still not satisfied when blockingQueue is filled, we 
don't use it again when
+    // the readTask is initialized again in fillCache() because it's also 
controlled in queryDataSet
+    // and fetchLimit is not easy to calculated when batchData is returned but 
not merged among
+    // series
+    private int fetchLimit;
+    private int batchDataTotalLength = 0;
 
     public ReadTask(
-        ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue, 
String pathName) {
+        ManagedSeriesReader reader,
+        BlockingQueue<BatchData> blockingQueue,
+        String pathName,
+        int fetchLimit) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
       this.pathName = pathName;
+      this.fetchLimit = fetchLimit;
     }
 
     @Override
@@ -87,8 +99,10 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
               continue;
             }
             blockingQueue.put(batchData);
-            // if the queue also has free space, just submit another itself
-            if (blockingQueue.remainingCapacity() > 0) {
+            // if the queue also has free space and the size of batchData < 
fetchLimit, just submit
+            // another itself
+            batchDataTotalLength += batchData.length();
+            if (batchDataTotalLength < fetchLimit && 
blockingQueue.remainingCapacity() > 0) {
               TASK_POOL_MANAGER.submit(this);
             }
             // the queue has no more space
@@ -214,7 +228,8 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
       reader.setHasRemaining(true);
       reader.setManagedByQueryManager(true);
       TASK_POOL_MANAGER.submit(
-          new ReadTask(reader, blockingQueueArray[i], 
paths.get(i).getFullPath()));
+          new ReadTask(
+              reader, blockingQueueArray[i], paths.get(i).getFullPath(), 
rowLimit + rowOffset));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
       // check the interrupted status of query before taking next batch
@@ -538,7 +553,10 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
             reader.setManagedByQueryManager(true);
             TASK_POOL_MANAGER.submit(
                 new ReadTask(
-                    reader, blockingQueueArray[seriesIndex], 
paths.get(seriesIndex).getFullPath()));
+                    reader,
+                    blockingQueueArray[seriesIndex],
+                    paths.get(seriesIndex).getFullPath(),
+                    Integer.MAX_VALUE));
           }
         }
       }
@@ -593,7 +611,9 @@ public class RawQueryDataSetWithoutValueFilter extends 
QueryDataSet
         } else {
           record.addField(cachedBatchDataArray[seriesIndex].currentValue(), 
dataType);
         }
-        cacheNext(seriesIndex);
+        if (alreadyReturnedRowNum < rowOffset + rowLimit) {
+          cacheNext(seriesIndex);
+        }
       }
     }
 

Reply via email to