This is an automated email from the ASF dual-hosted git repository.
liudw pushed a commit to branch reverse_reader
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/reverse_reader by this push:
new 9706465 add descBatchData and descPriorityMergeReader
9706465 is described below
commit 9706465331dd099420b75bff9beeca3d86bf84bb
Author: liudw <[email protected]>
AuthorDate: Thu Aug 13 17:33:34 2020 +0800
add descBatchData and descPriorityMergeReader
---
.../iotdb/db/query/reader/chunk/MemPageReader.java | 8 +++--
.../iotdb/db/query/reader/series/SeriesReader.java | 33 ++++++++++++++-----
.../reader/universal/DescPriorityMergeReader.java | 25 +++++++++++++++
.../reader/universal/PriorityMergeReader.java | 20 ++++++------
.../db/query/reader/series/SeriesReaderTest.java | 34 ++++++++++++++++++++
.../apache/iotdb/tsfile/read/common/BatchData.java | 14 +++++---
.../iotdb/tsfile/read/common/BatchDataFactory.java | 18 +++++++++++
.../iotdb/tsfile/read/common/DescBatchData.java | 37 ++++++++++++++++++++++
.../iotdb/tsfile/read/reader/IPageReader.java | 7 +++-
.../iotdb/tsfile/read/reader/page/PageReader.java | 7 ++--
10 files changed, 174 insertions(+), 29 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index a9106c5..3d1d20a 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.reader.chunk;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
@@ -35,18 +36,19 @@ public class MemPageReader implements IPageReader {
}
@Override
- public BatchData getAllSatisfiedPageData() {
+ public BatchData getAllSatisfiedPageData(boolean ascending) {
if (valueFilter == null) {
return batchData;
}
- BatchData filteredBatchData = new BatchData(batchData.getDataType());
+ BatchData filteredBatchData = BatchDataFactory
+ .createBatchData(batchData.getDataType(), ascending);
while (batchData.hasCurrent()) {
if (valueFilter.satisfy(batchData.currentTime(),
batchData.currentValue())) {
filteredBatchData.putAnObject(batchData.currentTime(),
batchData.currentValue());
}
batchData.next();
}
- return filteredBatchData;
+ return filteredBatchData.flip();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 1508024..87ca635 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.QueryUtils;
@@ -51,7 +52,7 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader;
public class SeriesReader {
- interface TimeOrderUtils {
+ public interface TimeOrderUtils {
long getOrderTime(Statistics statistics);
@@ -70,6 +71,8 @@ public class SeriesReader {
<T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
boolean isExcessEndpoint(long time, long endpointTime);
+
+ boolean getAscending();
}
@@ -126,6 +129,11 @@ public class SeriesReader {
public boolean isExcessEndpoint(long time, long endpointTime) {
return time < endpointTime;
}
+
+ @Override
+ public boolean getAscending() {
+ return false;
+ }
}
@@ -181,6 +189,11 @@ public class SeriesReader {
public boolean isExcessEndpoint(long time, long endpointTime) {
return time > endpointTime;
}
+
+ @Override
+ public boolean getAscending() {
+ return true;
+ }
}
@@ -230,7 +243,7 @@ public class SeriesReader {
/*
* point cache
*/
- private PriorityMergeReader mergeReader = new PriorityMergeReader();
+ private PriorityMergeReader mergeReader;
/*
* result cache
@@ -251,8 +264,10 @@ public class SeriesReader {
this.valueFilter = valueFilter;
if (ascending) {
this.orderUtils = new AscTimeOrderUtils();
+ mergeReader = new PriorityMergeReader();
} else {
this.orderUtils = new DescTimeOrderUtils();
+ mergeReader = new DescPriorityMergeReader();
}
this.seqFileResource = new LinkedList<>(dataSource.getSeqResources());
@@ -277,8 +292,10 @@ public class SeriesReader {
this.valueFilter = valueFilter;
if (ascending) {
this.orderUtils = new AscTimeOrderUtils();
+ mergeReader = new PriorityMergeReader();
} else {
this.orderUtils = new DescTimeOrderUtils();
+ mergeReader = new DescPriorityMergeReader();
}
this.seqFileResource = new LinkedList<>(seqFileResource);
@@ -639,7 +656,7 @@ public class SeriesReader {
if (valueFilter != null) {
firstPageReader.setFilter(valueFilter);
}
- BatchData batchData = firstPageReader.getAllSatisfiedPageData();
+ BatchData batchData =
firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending());
firstPageReader = null;
return batchData;
@@ -663,7 +680,7 @@ public class SeriesReader {
if (mergeReader.hasNextTimeValuePair()) {
cachedBatchData = new BatchData(dataType);
- long currentPageEndPointTime = mergeReader.getCurrentLargestEndTime();
+ long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
while (mergeReader.hasNextTimeValuePair()) {
@@ -725,7 +742,7 @@ public class SeriesReader {
long currentPageEndpointTime;
if (mergeReader.hasNextTimeValuePair()) {
- currentPageEndpointTime = mergeReader.getCurrentLargestEndTime();
+ currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
} else {
currentPageEndpointTime =
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
}
@@ -751,7 +768,7 @@ public class SeriesReader {
private void putPageReaderToMergeReader(VersionPageReader pageReader) throws
IOException {
mergeReader.addReader(
- pageReader.getAllSatisfiedPageData().getBatchDataIterator(),
+
pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getBatchDataIterator(),
pageReader.version,
orderUtils.getOverlapCheckTime(pageReader.getStatistics()));
}
@@ -904,8 +921,8 @@ public class SeriesReader {
return data.getStatistics();
}
- BatchData getAllSatisfiedPageData() throws IOException {
- return data.getAllSatisfiedPageData();
+ BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
+ return data.getAllSatisfiedPageData(ascending);
}
void setFilter(Filter filter) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
new file mode 100644
index 0000000..3f9dbb4
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.db.query.reader.universal;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+
+public class DescPriorityMergeReader extends PriorityMergeReader {
+
+ public DescPriorityMergeReader() {
+ super.heap = new PriorityQueue<>((o1, o2) -> {
+ int timeCompare = Long.compare(o2.timeValuePair.getTimestamp(),
+ o1.timeValuePair.getTimestamp());
+ return timeCompare != 0 ? timeCompare : Long.compare(o2.priority,
o1.priority);
+ });
+ }
+
+ public void addReader(IPointReader reader, long priority, long endTime)
throws IOException {
+ if (reader.hasNextTimeValuePair()) {
+ heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+ super.currentReadStopTime = Math.min(currentReadStopTime, endTime);
+ } else {
+ reader.close();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 4ebf7d1..53b477e 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.universal;
+import java.util.Comparator;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import java.io.IOException;
@@ -30,15 +31,16 @@ import java.util.PriorityQueue;
public class PriorityMergeReader implements IPointReader {
// largest end time of all added readers
- private long currentLargestEndTime;
+ protected long currentReadStopTime;
- PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
- int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
- o2.timeValuePair.getTimestamp());
- return timeCompare != 0 ? timeCompare : Long.compare(o2.priority,
o1.priority);
- });
+ protected PriorityQueue<Element> heap;
public PriorityMergeReader() {
+ heap = new PriorityQueue<>((o1, o2) -> {
+ int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
+ o2.timeValuePair.getTimestamp());
+ return timeCompare != 0 ? timeCompare : Long.compare(o2.priority,
o1.priority);
+ });
}
public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int
startPriority)
@@ -59,14 +61,14 @@ public class PriorityMergeReader implements IPointReader {
public void addReader(IPointReader reader, long priority, long endTime)
throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
- currentLargestEndTime = Math.max(currentLargestEndTime, endTime);
+ currentReadStopTime = Math.max(currentReadStopTime, endTime);
} else {
reader.close();
}
}
- public long getCurrentLargestEndTime() {
- return currentLargestEndTime;
+ public long getCurrentReadStopTime() {
+ return currentReadStopTime;
}
@Override
diff --git
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index 024f87d..b6e4b7d 100644
---
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.query.reader.series;
+import javax.swing.plaf.PanelUI;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -130,4 +132,36 @@ public class SeriesReaderTest {
}
}
+
+ @Test
+ public void descOrderTest() {
+ try {
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ SeriesReader seriesReader = new SeriesReader(
+ new Path(SERIES_READER_TEST_SG + PATH_SEPARATOR + "device0",
"sensor0"), allSensors,
+ TSDataType.INT32, new QueryContext(), seqResources, unseqResources,
null, null, false);
+ IPointReader pointReader = new SeriesRawDataPointReader(seriesReader);
+ long expectedTime = 499;
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+ System.out.println(timeValuePair);
+ assertEquals(expectedTime, timeValuePair.getTimestamp());
+ int value = timeValuePair.getValue().getInt();
+ if (expectedTime < 200) {
+ assertEquals(20000 + expectedTime, value);
+ } else if (expectedTime < 260 || (expectedTime >= 300 && expectedTime
< 380)
+ || expectedTime >= 400) {
+ assertEquals(10000 + expectedTime, value);
+ } else {
+ assertEquals(expectedTime, value);
+ }
+ expectedTime--;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index b4d6053..4e6049b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -53,19 +53,19 @@ public class BatchData implements Serializable {
private static final long serialVersionUID = -4620310601188394839L;
private static final int capacityThreshold =
TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
- private int capacity = 16;
+ protected int capacity = 16;
private TSDataType dataType;
// outer list index for read
- private int readCurListIndex;
+ protected int readCurListIndex;
// inner array index for read
- private int readCurArrayIndex;
+ protected int readCurArrayIndex;
// outer list index for write
- private int writeCurListIndex;
+ protected int writeCurListIndex;
// inner array index for write
- private int writeCurArrayIndex;
+ protected int writeCurArrayIndex;
// the insert timestamp number of timeRet
private int count;
@@ -593,4 +593,8 @@ public class BatchData implements Serializable {
public int getReadCurArrayIndex() {
return readCurArrayIndex;
}
+
+ public BatchData flip() {
+ return this;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
new file mode 100644
index 0000000..443036b
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchDataFactory.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class BatchDataFactory {
+
+ public static BatchData createBatchData(TSDataType dataType, boolean
ascending) {
+ if (ascending) {
+ return new BatchData(dataType);
+ }
+ return new DescBatchData(dataType);
+ }
+
+ public static BatchData createBatchData(TSDataType dataType) {
+ return new BatchData(dataType);
+ }
+
+}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java
new file mode 100644
index 0000000..68cb226
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/DescBatchData.java
@@ -0,0 +1,37 @@
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class DescBatchData extends BatchData {
+
+ public DescBatchData(TSDataType dataType) {
+ super(dataType);
+ }
+
+ @Override
+ public boolean hasCurrent() {
+ return super.readCurListIndex >= 0 && super.readCurArrayIndex >= 0;
+ }
+
+ @Override
+ public void next() {
+ super.readCurArrayIndex--;
+ if (super.readCurArrayIndex == -1) {
+ super.readCurArrayIndex = capacity;
+ super.readCurListIndex--;
+ }
+ }
+
+ @Override
+ public void resetBatchData() {
+ super.readCurArrayIndex = writeCurArrayIndex - 1;
+ super.readCurListIndex = writeCurListIndex;
+ }
+
+ @Override
+ public BatchData flip() {
+ super.readCurArrayIndex = writeCurArrayIndex - 1;
+ super.readCurListIndex = writeCurListIndex;
+ return this;
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
index 03a7dba..50a36b5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java
@@ -26,7 +26,12 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
public interface IPageReader {
- BatchData getAllSatisfiedPageData() throws IOException;
+
+ default BatchData getAllSatisfiedPageData() throws IOException {
+ return getAllSatisfiedPageData(true);
+ }
+
+ BatchData getAllSatisfiedPageData(boolean ascending) throws IOException;
Statistics getStatistics();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 73497b4..d43e61e 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -103,9 +104,9 @@ public class PageReader implements IPageReader {
* @return the returned BatchData may be empty, but never be null
*/
@Override
- public BatchData getAllSatisfiedPageData() throws IOException {
+ public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {
- BatchData pageData = new BatchData(dataType);
+ BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending);
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
@@ -150,7 +151,7 @@ public class PageReader implements IPageReader {
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
}
- return pageData;
+ return pageData.flip();
}
@Override