This is an automated email from the ASF dual-hosted git repository.

liudw pushed a commit to branch reverse_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/reverse_reader by this push:
     new 9706465  add descBatchData and descPriorityMergeReader
9706465 is described below

commit 9706465331dd099420b75bff9beeca3d86bf84bb
Author: liudw <[email protected]>
AuthorDate: Thu Aug 13 17:33:34 2020 +0800

    add descBatchData and descPriorityMergeReader
---
 .../iotdb/db/query/reader/chunk/MemPageReader.java |  8 +++--
 .../iotdb/db/query/reader/series/SeriesReader.java | 33 ++++++++++++++-----
 .../reader/universal/DescPriorityMergeReader.java  | 25 +++++++++++++++
 .../reader/universal/PriorityMergeReader.java      | 20 ++++++------
 .../db/query/reader/series/SeriesReaderTest.java   | 34 ++++++++++++++++++++
 .../apache/iotdb/tsfile/read/common/BatchData.java | 14 +++++---
 .../iotdb/tsfile/read/common/BatchDataFactory.java | 18 +++++++++++
 .../iotdb/tsfile/read/common/DescBatchData.java    | 37 ++++++++++++++++++++++
 .../iotdb/tsfile/read/reader/IPageReader.java      |  7 +++-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |  7 ++--
 10 files changed, 174 insertions(+), 29 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index a9106c5..3d1d20a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.reader.chunk;
 
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 
@@ -35,18 +36,19 @@ public class MemPageReader implements IPageReader {
   }
 
   @Override
-  public BatchData getAllSatisfiedPageData() {
+  public BatchData getAllSatisfiedPageData(boolean ascending) {
     if (valueFilter == null) {
       return batchData;
     }
-    BatchData filteredBatchData = new BatchData(batchData.getDataType());
+    BatchData filteredBatchData = BatchDataFactory
+        .createBatchData(batchData.getDataType(), ascending);
     while (batchData.hasCurrent()) {
       if (valueFilter.satisfy(batchData.currentTime(), 
batchData.currentValue())) {
         filteredBatchData.putAnObject(batchData.currentTime(), 
batchData.currentValue());
       }
       batchData.next();
     }
-    return filteredBatchData;
+    return filteredBatchData.flip();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 1508024..87ca635 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
@@ -51,7 +52,7 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader;
 
 public class SeriesReader {
 
-  interface TimeOrderUtils {
+  public interface TimeOrderUtils {
 
     long getOrderTime(Statistics statistics);
 
@@ -70,6 +71,8 @@ public class SeriesReader {
     <T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
 
     boolean isExcessEndpoint(long time, long endpointTime);
+
+    boolean getAscending();
   }
 
 
@@ -126,6 +129,11 @@ public class SeriesReader {
     public boolean isExcessEndpoint(long time, long endpointTime) {
       return time < endpointTime;
     }
+
+    @Override
+    public boolean getAscending() {
+      return false;
+    }
   }
 
 
@@ -181,6 +189,11 @@ public class SeriesReader {
     public boolean isExcessEndpoint(long time, long endpointTime) {
       return time > endpointTime;
     }
+
+    @Override
+    public boolean getAscending() {
+      return true;
+    }
   }
 
 
@@ -230,7 +243,7 @@ public class SeriesReader {
   /*
    * point cache
    */
-  private PriorityMergeReader mergeReader = new PriorityMergeReader();
+  private PriorityMergeReader mergeReader;
 
   /*
    * result cache
@@ -251,8 +264,10 @@ public class SeriesReader {
     this.valueFilter = valueFilter;
     if (ascending) {
       this.orderUtils = new AscTimeOrderUtils();
+      mergeReader = new PriorityMergeReader();
     } else {
       this.orderUtils = new DescTimeOrderUtils();
+      mergeReader = new DescPriorityMergeReader();
     }
 
     this.seqFileResource = new LinkedList<>(dataSource.getSeqResources());
@@ -277,8 +292,10 @@ public class SeriesReader {
     this.valueFilter = valueFilter;
     if (ascending) {
       this.orderUtils = new AscTimeOrderUtils();
+      mergeReader = new PriorityMergeReader();
     } else {
       this.orderUtils = new DescTimeOrderUtils();
+      mergeReader = new DescPriorityMergeReader();
     }
 
     this.seqFileResource = new LinkedList<>(seqFileResource);
@@ -639,7 +656,7 @@ public class SeriesReader {
       if (valueFilter != null) {
         firstPageReader.setFilter(valueFilter);
       }
-      BatchData batchData = firstPageReader.getAllSatisfiedPageData();
+      BatchData batchData = 
firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
       firstPageReader = null;
 
       return batchData;
@@ -663,7 +680,7 @@ public class SeriesReader {
       if (mergeReader.hasNextTimeValuePair()) {
 
         cachedBatchData = new BatchData(dataType);
-        long currentPageEndPointTime = mergeReader.getCurrentLargestEndTime();
+        long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
 
         while (mergeReader.hasNextTimeValuePair()) {
 
@@ -725,7 +742,7 @@ public class SeriesReader {
 
     long currentPageEndpointTime;
     if (mergeReader.hasNextTimeValuePair()) {
-      currentPageEndpointTime = mergeReader.getCurrentLargestEndTime();
+      currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
     } else {
       currentPageEndpointTime = 
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
     }
@@ -751,7 +768,7 @@ public class SeriesReader {
 
   private void putPageReaderToMergeReader(VersionPageReader pageReader) throws 
IOException {
     mergeReader.addReader(
-        pageReader.getAllSatisfiedPageData().getBatchDataIterator(),
+        
pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getBatchDataIterator(),
         pageReader.version,
         orderUtils.getOverlapCheckTime(pageReader.getStatistics()));
   }
@@ -904,8 +921,8 @@ public class SeriesReader {
       return data.getStatistics();
     }
 
-    BatchData getAllSatisfiedPageData() throws IOException {
-      return data.getAllSatisfiedPageData();
+    BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
+      return data.getAllSatisfiedPageData(ascending);
     }
 
     void setFilter(Filter filter) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
new file mode 100644
index 0000000..3f9dbb4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.db.query.reader.universal;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+
+public class DescPriorityMergeReader extends PriorityMergeReader {
+
+  public DescPriorityMergeReader() {
+    super.heap = new PriorityQueue<>((o1, o2) -> {
+      int timeCompare = Long.compare(o2.timeValuePair.getTimestamp(),
+          o1.timeValuePair.getTimestamp());
+      return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, 
o1.priority);
+    });
+  }
+
+  public void addReader(IPointReader reader, long priority, long endTime) 
throws IOException {
+    if (reader.hasNextTimeValuePair()) {
+      heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+      super.currentReadStopTime = Math.min(currentReadStopTime, endTime);
+    } else {
+      reader.close();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 4ebf7d1..53b477e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.query.reader.universal;
 
+import java.util.Comparator;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import java.io.IOException;
@@ -30,15 +31,16 @@ import java.util.PriorityQueue;
 public class PriorityMergeReader implements IPointReader {
 
   // largest end time of all added readers
-  private long currentLargestEndTime;
+  protected long currentReadStopTime;
 
-  PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
-    int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
-        o2.timeValuePair.getTimestamp());
-    return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, 
o1.priority);
-  });
+  protected PriorityQueue<Element> heap;
 
   public PriorityMergeReader() {
+    heap = new PriorityQueue<>((o1, o2) -> {
+      int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
+          o2.timeValuePair.getTimestamp());
+      return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, 
o1.priority);
+    });
   }
 
   public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int 
startPriority)
@@ -59,14 +61,14 @@ public class PriorityMergeReader implements IPointReader {
   public void addReader(IPointReader reader, long priority, long endTime) 
throws IOException {
     if (reader.hasNextTimeValuePair()) {
       heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
-      currentLargestEndTime = Math.max(currentLargestEndTime, endTime);
+      currentReadStopTime = Math.max(currentReadStopTime, endTime);
     } else {
       reader.close();
     }
   }
 
-  public long getCurrentLargestEndTime() {
-    return currentLargestEndTime;
+  public long getCurrentReadStopTime() {
+    return currentReadStopTime;
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index 024f87d..b6e4b7d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -19,10 +19,12 @@
 
 package org.apache.iotdb.db.query.reader.series;
 
+import javax.swing.plaf.PanelUI;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -130,4 +132,36 @@ public class SeriesReaderTest {
     }
 
   }
+
+  @Test
+  public void descOrderTest() {
+    try {
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      SeriesReader seriesReader = new SeriesReader(
+          new Path(SERIES_READER_TEST_SG + PATH_SEPARATOR + "device0", 
"sensor0"), allSensors,
+          TSDataType.INT32, new QueryContext(), seqResources, unseqResources, 
null, null, false);
+      IPointReader pointReader = new SeriesRawDataPointReader(seriesReader);
+      long expectedTime = 499;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+        System.out.println(timeValuePair);
+        assertEquals(expectedTime, timeValuePair.getTimestamp());
+        int value = timeValuePair.getValue().getInt();
+        if (expectedTime < 200) {
+          assertEquals(20000 + expectedTime, value);
+        } else if (expectedTime < 260 || (expectedTime >= 300 && expectedTime 
< 380)
+            || expectedTime >= 400) {
+          assertEquals(10000 + expectedTime, value);
+        } else {
+          assertEquals(expectedTime, value);
+        }
+        expectedTime--;
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail();
+    }
+
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index b4d6053..4e6049b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -53,19 +53,19 @@ public class BatchData implements Serializable {
 
   private static final long serialVersionUID = -4620310601188394839L;
   private static final int capacityThreshold = 
TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
-  private int capacity = 16;
+  protected int capacity = 16;
 
   private TSDataType dataType;
 
   // outer list index for read
-  private int readCurListIndex;
+  protected int readCurListIndex;
   // inner array index for read
-  private int readCurArrayIndex;
+  protected int readCurArrayIndex;
 
   // outer list index for write
-  private int writeCurListIndex;
+  protected int writeCurListIndex;
   // inner array index for write
-  private int writeCurArrayIndex;
+  protected int writeCurArrayIndex;
 
   // the insert timestamp number of timeRet
   private int count;
@@ -593,4 +593,8 @@ public class BatchData implements Serializable {
   public int getReadCurArrayIndex() {
     return readCurArrayIndex;
   }
+
+  public BatchData flip() {
+    return this;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
new file mode 100644
index 0000000..443036b
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class BatchDataFactory {
+
+  public static BatchData createBatchData(TSDataType dataType, boolean 
ascending) {
+    if (ascending) {
+      return new BatchData(dataType);
+    }
+    return new DescBatchData(dataType);
+  }
+
+  public static BatchData createBatchData(TSDataType dataType) {
+    return new BatchData(dataType);
+  }
+
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java
new file mode 100644
index 0000000..68cb226
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java
@@ -0,0 +1,37 @@
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class DescBatchData extends BatchData {
+
+  public DescBatchData(TSDataType dataType) {
+    super(dataType);
+  }
+
+  @Override
+  public boolean hasCurrent() {
+    return super.readCurListIndex >= 0 && super.readCurArrayIndex >= 0;
+  }
+
+  @Override
+  public void next() {
+    super.readCurArrayIndex--;
+    if (super.readCurArrayIndex == -1) {
+      super.readCurArrayIndex = capacity;
+      super.readCurListIndex--;
+    }
+  }
+
+  @Override
+  public void resetBatchData() {
+    super.readCurArrayIndex = writeCurArrayIndex - 1;
+    super.readCurListIndex = writeCurListIndex;
+  }
+
+  @Override
+  public BatchData flip() {
+    super.readCurArrayIndex = writeCurArrayIndex - 1;
+    super.readCurListIndex = writeCurListIndex;
+    return this;
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index 03a7dba..50a36b5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -26,7 +26,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 public interface IPageReader {
 
-  BatchData getAllSatisfiedPageData() throws IOException;
+
+  default BatchData getAllSatisfiedPageData() throws IOException {
+    return getAllSatisfiedPageData(true);
+  }
+
+  BatchData getAllSatisfiedPageData(boolean ascending) throws IOException;
 
   Statistics getStatistics();
 
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 73497b4..d43e61e 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -103,9 +104,9 @@ public class PageReader implements IPageReader {
    * @return the returned BatchData may be empty, but never be null
    */
   @Override
-  public BatchData getAllSatisfiedPageData() throws IOException {
+  public BatchData getAllSatisfiedPageData(boolean ascending) throws 
IOException {
 
-    BatchData pageData = new BatchData(dataType);
+    BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending);
 
     while (timeDecoder.hasNext(timeBuffer)) {
       long timestamp = timeDecoder.readLong(timeBuffer);
@@ -150,7 +151,7 @@ public class PageReader implements IPageReader {
           throw new UnSupportedDataTypeException(String.valueOf(dataType));
       }
     }
-    return pageData;
+    return pageData.flip();
   }
 
   @Override

Reply via email to