This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
new d7d9e24 fix single node bug
d7d9e24 is described below
commit d7d9e24ea76cfe6f357c4896e0458560bbae33fb
Author: Alima777 <[email protected]>
AuthorDate: Wed Mar 10 20:55:27 2021 +0800
fix single node bug
---
.../cluster/query/reader/ClusterReaderFactory.java | 1 -
.../iotdb/cluster/query/reader/EmptyReader.java | 2 +-
.../reader/RemoteSeriesReaderByTimestamp.java | 2 +-
.../db/query/aggregation/impl/AvgAggrResult.java | 6 +--
.../db/query/aggregation/impl/CountAggrResult.java | 4 +-
.../aggregation/impl/FirstValueAggrResult.java | 6 +--
.../aggregation/impl/FirstValueDescAggrResult.java | 6 +--
.../aggregation/impl/LastValueAggrResult.java | 6 +--
.../aggregation/impl/LastValueDescAggrResult.java | 6 +--
.../query/aggregation/impl/MaxTimeAggrResult.java | 4 +-
.../aggregation/impl/MaxTimeDescAggrResult.java | 4 +-
.../query/aggregation/impl/MaxValueAggrResult.java | 8 ++--
.../query/aggregation/impl/MinTimeAggrResult.java | 4 +-
.../aggregation/impl/MinTimeDescAggrResult.java | 4 +-
.../query/aggregation/impl/MinValueAggrResult.java | 8 ++--
.../db/query/aggregation/impl/SumAggrResult.java | 6 +--
.../dataset/RawQueryDataSetWithValueFilter.java | 26 +++++++---
.../adapter/ByTimestampReaderAdapter.java | 51 ++++++++++----------
.../reader/chunk/DiskChunkReaderByTimestamp.java | 56 +++++++++++-----------
.../db/query/reader/series/IReaderByTimestamp.java | 8 +---
.../reader/series/SeriesReaderByTimestamp.java | 18 ++-----
.../reader/series/SeriesReaderByTimestampTest.java | 12 +++--
22 files changed, 123 insertions(+), 125 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index e77d7cd..9b78175 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -217,7 +217,6 @@ public class ClusterReaderFactory {
boolean ascending)
throws StorageEngineException, EmptyIntervalException {
// make sure the partition table is new
- // TODO: don't need to sync metadata for every reader
try {
metaGroupMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index 6db6445..8ead5bd 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -160,7 +160,7 @@ public class EmptyReader extends BaseManagedSeriesReader
}
@Override
- public Object[] getValueInTimestamps(long[] timestamps) {
+ public Object[] getValuesInTimestamps(long[] timestamps) {
return null;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 4a32acc..f2d8729 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -109,7 +109,7 @@ public class RemoteSeriesReaderByTimestamp implements
IReaderByTimestamp {
}
@Override
- public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
+ public Object[] getValuesInTimestamps(long[] timestamps) throws IOException {
return null;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 237fa08..6e068b8 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -102,10 +102,10 @@ public class AvgAggrResult extends AggregateResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- updateAvg(seriesDataType, value);
+ if (values[i] != null) {
+ updateAvg(seriesDataType, values[i]);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index adf0069..30b4659 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -74,9 +74,9 @@ public class CountAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
int cnt = 0;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
cnt++;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 0f51dc3..3aacbf9 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -95,10 +95,10 @@ public class FirstValueAggrResult extends AggregateResult {
return;
}
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- setValue(value);
+ if (values[i] != null) {
+ setValue(values[i]);
timestamp = timestamps[i];
break;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index daf5a56..ad682d4 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -53,10 +53,10 @@ public class FirstValueDescAggrResult extends
FirstValueAggrResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- setValue(value);
+ if (values[i] != null) {
+ setValue(values[i]);
timestamp = timestamps[i];
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 0726d7a..bf21f7b 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -87,11 +87,11 @@ public class LastValueAggrResult extends AggregateResult {
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
long time = Long.MIN_VALUE;
Object lastVal = null;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
time = timestamps[i];
- lastVal = value;
+ lastVal = values[i];
}
}
if (time != Long.MIN_VALUE) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 15af763..354a360 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -70,11 +70,11 @@ public class LastValueDescAggrResult extends
LastValueAggrResult {
}
long time = Long.MIN_VALUE;
Object lastVal = null;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
time = timestamps[i];
- lastVal = value;
+ lastVal = values[i];
break;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 548b249..df19df6 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -67,9 +67,9 @@ public class MaxTimeAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
long time = -1;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
time = timestamps[i];
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e29a211..3e1bc91 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -53,9 +53,9 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
return;
}
long time = -1;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
time = timestamps[i];
break;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index cd00df2..8f59994 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -72,13 +72,13 @@ public class MaxValueAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
Comparable<Object> maxVal = null;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value == null) {
+ if (values[i] == null) {
continue;
}
- if (maxVal == null || maxVal.compareTo(value) < 0) {
- maxVal = (Comparable<Object>) value;
+ if (maxVal == null || maxVal.compareTo(values[i]) < 0) {
+ maxVal = (Comparable<Object>) values[i];
}
}
updateResult(maxVal);
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index a0fbabd..bd96043 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -74,9 +74,9 @@ public class MinTimeAggrResult extends AggregateResult {
if (hasFinalResult()) {
return;
}
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
setLongValue(timestamps[i]);
return;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 1bafc56..3cb1fbb 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -43,9 +43,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
+ if (values[i] != null) {
setLongValue(timestamps[i]);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 8b17d75..761fbac 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -67,13 +67,13 @@ public class MinValueAggrResult extends AggregateResult {
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
Comparable<Object> minVal = null;
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value == null) {
+ if (values[i] == null) {
continue;
}
- if (minVal == null || minVal.compareTo(value) > 0) {
- minVal = (Comparable<Object>) value;
+ if (minVal == null || minVal.compareTo(values[i]) > 0) {
+ minVal = (Comparable<Object>) values[i];
}
}
updateResult(minVal);
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index 475e953..d441d76 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -80,10 +80,10 @@ public class SumAggrResult extends AggregateResult {
@Override
public void updateResultUsingTimestamps(
long[] timestamps, int length, IReaderByTimestamp dataReader) throws
IOException {
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
for (int i = 0; i < length; i++) {
- Object value = dataReader.getValueInTimestamp(timestamps[i]);
- if (value != null) {
- updateSum(value);
+ if (values[i] != null) {
+ updateSum(values[i]);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index 3fb14f0..a0e8a01 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -110,7 +110,10 @@ public class RawQueryDataSetWithValueFilter extends
QueryDataSet implements UDFI
if (cached.get(i)) {
results = timeGenerator.getValues(paths.get(i));
} else {
- results =
seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
+ results =
+ seriesReaderByTimestampList
+ .get(i)
+ .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
}
// 3. use values in results to fill row record
@@ -136,7 +139,7 @@ public class RawQueryDataSetWithValueFilter extends
QueryDataSet implements UDFI
@Override
public boolean hasNextRowInObjects() throws IOException {
- if (!cachedRowRecords.isEmpty()) {
+ if (!cachedRowInObjects.isEmpty()) {
return true;
}
return cacheRowInObjects();
@@ -144,7 +147,7 @@ public class RawQueryDataSetWithValueFilter extends
QueryDataSet implements UDFI
@Override
public Object[] nextRowInObjects() throws IOException {
- if (cachedRowRecords.isEmpty() && !cacheRowInObjects()) {
+ if (cachedRowInObjects.isEmpty() && !cacheRowInObjects()) {
// values + timestamp
return new Object[seriesReaderByTimestampList.size() + 1];
}
@@ -161,6 +164,10 @@ public class RawQueryDataSetWithValueFilter extends
QueryDataSet implements UDFI
while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
}
+ if (cachedTimeCnt == 0) {
+ return false;
+ }
+
Object[][] rowsInObject = new
Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1];
for (int i = 0; i < cachedTimeCnt; i++) {
rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i];
@@ -174,13 +181,18 @@ public class RawQueryDataSetWithValueFilter extends
QueryDataSet implements UDFI
if (cached.get(i)) {
results = timeGenerator.getValues(paths.get(i));
} else {
- results =
seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
+ results =
+ seriesReaderByTimestampList
+ .get(i)
+ .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
}
// 3. use values in results to fill row record
for (int j = 0; j < cachedTimeCnt; j++) {
- if (results[j] != null) hasField[i] = true;
- rowsInObject[j][i] = results[j];
+ if (results[j] != null) {
+ hasField[j] = true;
+ rowsInObject[j][i] = results[j];
+ }
}
}
// 4. remove rowRecord if all values in one timestamp are null
@@ -191,6 +203,6 @@ public class RawQueryDataSetWithValueFilter extends
QueryDataSet implements UDFI
}
// 5. check whether there is next row record
- return !cachedRowRecords.isEmpty();
+ return !cachedRowInObjects.isEmpty();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
index befcc08..e9fa985 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
@@ -32,39 +32,40 @@ public class ByTimestampReaderAdapter implements
IReaderByTimestamp {
// only cache the first point that >= timestamp
private boolean hasCached;
private TimeValuePair pair;
+ private long currentTime = Long.MIN_VALUE;
public ByTimestampReaderAdapter(IPointReader pointReader) {
this.pointReader = pointReader;
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
- if (hasCached) {
- if (pair.getTimestamp() < timestamp) {
- hasCached = false;
- } else if (pair.getTimestamp() == timestamp) {
- hasCached = false;
- return pair.getValue().getValue();
- } else {
- return null;
- }
- }
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws
IOException {
+ Object[] result = new Object[length];
- while (pointReader.hasNextTimeValuePair()) {
- pair = pointReader.nextTimeValuePair();
- if (pair.getTimestamp() == timestamp) {
- return pair.getValue().getValue();
- } else if (pair.getTimestamp() > timestamp) {
- hasCached = true;
- return null;
+ for (int i = 0; i < length; i++) {
+ if (timestamps[i] < currentTime) {
+ throw new IOException("time must be increasing when use
ReaderByTimestamp");
+ }
+ currentTime = timestamps[i];
+ // search cache
+ if (hasCached && pair.getTimestamp() >= currentTime) {
+ if (pair.getTimestamp() == currentTime) {
+ hasCached = false;
+ result[i] = pair.getValue().getValue();
+ }
+ continue;
+ }
+ // search reader
+ while (pointReader.hasNextTimeValuePair()) {
+ pair = pointReader.nextTimeValuePair();
+ if (pair.getTimestamp() == currentTime) {
+ result[i] = pair.getValue().getValue();
+ } else if (pair.getTimestamp() > currentTime) {
+ hasCached = true;
+ result[i] = null;
+ }
}
}
-
- return null;
- }
-
- @Override
- public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
- return null;
+ return result;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
index 5095ccd..0372b7e 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
@@ -34,51 +34,49 @@ public class DiskChunkReaderByTimestamp implements
IReaderByTimestamp {
private ChunkReaderByTimestamp chunkReaderByTimestamp;
private BatchData data;
+ private long currentTime = Long.MIN_VALUE;
public DiskChunkReaderByTimestamp(ChunkReaderByTimestamp
chunkReaderByTimestamp) {
this.chunkReaderByTimestamp = chunkReaderByTimestamp;
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
+ public Object[] getValuesInTimestamps(long[] timestamps, int length) throws
IOException {
+ Object[] result = new Object[length];
- if (!hasNext()) {
- return null;
- }
-
- while (data != null) {
- Object value = data.getValueInTimestamp(timestamp);
- if (value != null) {
- return value;
+ for (int i = 0; i < length; i++) {
+ if (timestamps[i] < currentTime) {
+ throw new IOException("time must be increasing when use
ReaderByTimestamp");
}
- if (data.hasCurrent()) {
- return null;
- } else {
- chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
- if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
- data = chunkReaderByTimestamp.nextPageData();
- } else {
- return null;
+ currentTime = timestamps[i];
+ while (hasNext()) {
+ data = next();
+ if (data.getMaxTimestamp() > currentTime) {
+ result[i] = null;
+ break;
+ }
+ result[i] = data.getValueInTimestamp(currentTime);
+ // fill cache
+ if (!data.hasCurrent() &&
chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+ data = next();
}
}
}
+ return result;
+ }
- return null;
+ private boolean hasCacheData() {
+ return data != null && data.hasCurrent();
}
- @Override
- public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
- return null;
+ private boolean hasNext() {
+ return hasCacheData() || chunkReaderByTimestamp.hasNextSatisfiedPage();
}
- private boolean hasNext() throws IOException {
- if (data != null && data.hasCurrent()) {
- return true;
- }
- if (chunkReaderByTimestamp != null &&
chunkReaderByTimestamp.hasNextSatisfiedPage()) {
- data = chunkReaderByTimestamp.nextPageData();
- return true;
+ private BatchData next() throws IOException {
+ if (hasCacheData()) {
+ return data;
}
- return false;
+ return chunkReaderByTimestamp.nextPageData();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
index 28a5ad1..0db7bab 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
@@ -34,13 +34,7 @@ public interface IReaderByTimestamp {
* guarantee of correctness with any other way of calling. For example, DO
NOT call this method
* twice with the same timestamp.
*/
- Object getValueInTimestamp(long timestamp) throws IOException;
-
- /**
- * Returns all the corresponding values under the array of timestamp.
Returns null if no value
- * under one timestamp.
- */
- Object[] getValueInTimestamps(long[] timestamps) throws IOException;
+ Object[] getValuesInTimestamps(long[] timestamps, int length) throws
IOException;
/**
* Returns whether there is no more data in reader.
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 551c890..00b6df3 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -67,20 +67,10 @@ public class SeriesReaderByTimestamp implements
IReaderByTimestamp {
}
@Override
- public Object getValueInTimestamp(long timestamp) throws IOException {
- seriesReader.setTimeFilter(timestamp);
- if ((batchData == null || !hasAvailableData(batchData, timestamp)) &&
!hasNext(timestamp)) {
- return null;
- }
-
- return batchData.getValueInTimestamp(timestamp);
- }
-
- @Override
- public Object[] getValueInTimestamps(long[] timestamp) throws IOException {
- seriesReader.setTimeFilter(timestamp[0]);
- Object[] results = new Object[timestamp.length];
- for (int i = 0; i < timestamp.length; i++) {
+ public Object[] getValuesInTimestamps(long[] timestamp, int length) throws
IOException {
+ Object[] results = new Object[length];
+ for (int i = 0; i < length; i++) {
+ seriesReader.setTimeFilter(timestamp[0]);
if ((batchData == null || !hasAvailableData(batchData, timestamp[i]))
&& !hasNext(timestamp[i])) {
// there is no more data
diff --git
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 4e71afa..d78d6d4 100644
---
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -80,14 +80,18 @@ public class SeriesReaderByTimestampTest {
null,
true);
+ long timestamps[] = new long[500];
+ for (int i = 0; i < 500; i++) {
+ timestamps[i] = i;
+ }
+ Object[] values = seriesReader.getValuesInTimestamps(timestamps,
timestamps.length);
for (int time = 0; time < 500; time++) {
- Integer value = (Integer) seriesReader.getValueInTimestamp(time);
if (time < 200) {
- Assert.assertEquals(time + 20000, value.intValue());
+ Assert.assertEquals(time + 20000, values[time]);
} else if (time < 260 || (time >= 300 && time < 380) || (time >= 400)) {
- Assert.assertEquals(time + 10000, value.intValue());
+ Assert.assertEquals(time + 10000, values[time]);
} else {
- Assert.assertEquals(time, value.intValue());
+ Assert.assertEquals(time, values[time]);
}
}
}