This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/clusterQueryOpt by this push:
     new d7d9e24  fix single node bug
d7d9e24 is described below

commit d7d9e24ea76cfe6f357c4896e0458560bbae33fb
Author: Alima777 <[email protected]>
AuthorDate: Wed Mar 10 20:55:27 2021 +0800

    fix single node bug
---
 .../cluster/query/reader/ClusterReaderFactory.java |  1 -
 .../iotdb/cluster/query/reader/EmptyReader.java    |  2 +-
 .../reader/RemoteSeriesReaderByTimestamp.java      |  2 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |  6 +--
 .../db/query/aggregation/impl/CountAggrResult.java |  4 +-
 .../aggregation/impl/FirstValueAggrResult.java     |  6 +--
 .../aggregation/impl/FirstValueDescAggrResult.java |  6 +--
 .../aggregation/impl/LastValueAggrResult.java      |  6 +--
 .../aggregation/impl/LastValueDescAggrResult.java  |  6 +--
 .../query/aggregation/impl/MaxTimeAggrResult.java  |  4 +-
 .../aggregation/impl/MaxTimeDescAggrResult.java    |  4 +-
 .../query/aggregation/impl/MaxValueAggrResult.java |  8 ++--
 .../query/aggregation/impl/MinTimeAggrResult.java  |  4 +-
 .../aggregation/impl/MinTimeDescAggrResult.java    |  4 +-
 .../query/aggregation/impl/MinValueAggrResult.java |  8 ++--
 .../db/query/aggregation/impl/SumAggrResult.java   |  6 +--
 .../dataset/RawQueryDataSetWithValueFilter.java    | 26 +++++++---
 .../adapter/ByTimestampReaderAdapter.java          | 51 ++++++++++----------
 .../reader/chunk/DiskChunkReaderByTimestamp.java   | 56 +++++++++++-----------
 .../db/query/reader/series/IReaderByTimestamp.java |  8 +---
 .../reader/series/SeriesReaderByTimestamp.java     | 18 ++-----
 .../reader/series/SeriesReaderByTimestampTest.java | 12 +++--
 22 files changed, 123 insertions(+), 125 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index e77d7cd..9b78175 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -217,7 +217,6 @@ public class ClusterReaderFactory {
       boolean ascending)
       throws StorageEngineException, EmptyIntervalException {
     // make sure the partition table is new
-    // TODO: don't need to sync metadata for every reader
     try {
       metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
index 6db6445..8ead5bd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/EmptyReader.java
@@ -160,7 +160,7 @@ public class EmptyReader extends BaseManagedSeriesReader
   }
 
   @Override
-  public Object[] getValueInTimestamps(long[] timestamps) {
+  public Object[] getValuesInTimestamps(long[] timestamps) {
     return null;
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 4a32acc..f2d8729 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -109,7 +109,7 @@ public class RemoteSeriesReaderByTimestamp implements 
IReaderByTimestamp {
   }
 
   @Override
-  public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
+  public Object[] getValuesInTimestamps(long[] timestamps) throws IOException {
     return null;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 237fa08..6e068b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -102,10 +102,10 @@ public class AvgAggrResult extends AggregateResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        updateAvg(seriesDataType, value);
+      if (values[i] != null) {
+        updateAvg(seriesDataType, values[i]);
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index adf0069..30b4659 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -74,9 +74,9 @@ public class CountAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
     int cnt = 0;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         cnt++;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 0f51dc3..3aacbf9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -95,10 +95,10 @@ public class FirstValueAggrResult extends AggregateResult {
       return;
     }
 
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        setValue(value);
+      if (values[i] != null) {
+        setValue(values[i]);
         timestamp = timestamps[i];
         break;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
index daf5a56..ad682d4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java
@@ -53,10 +53,10 @@ public class FirstValueDescAggrResult extends 
FirstValueAggrResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        setValue(value);
+      if (values[i] != null) {
+        setValue(values[i]);
         timestamp = timestamps[i];
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 0726d7a..bf21f7b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -87,11 +87,11 @@ public class LastValueAggrResult extends AggregateResult {
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
     long time = Long.MIN_VALUE;
     Object lastVal = null;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         time = timestamps[i];
-        lastVal = value;
+        lastVal = values[i];
       }
     }
     if (time != Long.MIN_VALUE) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
index 15af763..354a360 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java
@@ -70,11 +70,11 @@ public class LastValueDescAggrResult extends 
LastValueAggrResult {
     }
     long time = Long.MIN_VALUE;
     Object lastVal = null;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         time = timestamps[i];
-        lastVal = value;
+        lastVal = values[i];
         break;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
index 548b249..df19df6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java
@@ -67,9 +67,9 @@ public class MaxTimeAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
     long time = -1;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         time = timestamps[i];
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
index e29a211..3e1bc91 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java
@@ -53,9 +53,9 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult {
       return;
     }
     long time = -1;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         time = timestamps[i];
         break;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
index cd00df2..8f59994 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java
@@ -72,13 +72,13 @@ public class MaxValueAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
     Comparable<Object> maxVal = null;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value == null) {
+      if (values[i] == null) {
         continue;
       }
-      if (maxVal == null || maxVal.compareTo(value) < 0) {
-        maxVal = (Comparable<Object>) value;
+      if (maxVal == null || maxVal.compareTo(values[i]) < 0) {
+        maxVal = (Comparable<Object>) values[i];
       }
     }
     updateResult(maxVal);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
index a0fbabd..bd96043 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java
@@ -74,9 +74,9 @@ public class MinTimeAggrResult extends AggregateResult {
     if (hasFinalResult()) {
       return;
     }
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         setLongValue(timestamps[i]);
         return;
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
index 1bafc56..3cb1fbb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java
@@ -43,9 +43,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
+      if (values[i] != null) {
         setLongValue(timestamps[i]);
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
index 8b17d75..761fbac 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java
@@ -67,13 +67,13 @@ public class MinValueAggrResult extends AggregateResult {
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
     Comparable<Object> minVal = null;
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value == null) {
+      if (values[i] == null) {
         continue;
       }
-      if (minVal == null || minVal.compareTo(value) > 0) {
-        minVal = (Comparable<Object>) value;
+      if (minVal == null || minVal.compareTo(values[i]) > 0) {
+        minVal = (Comparable<Object>) values[i];
       }
     }
     updateResult(minVal);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
index 475e953..d441d76 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java
@@ -80,10 +80,10 @@ public class SumAggrResult extends AggregateResult {
   @Override
   public void updateResultUsingTimestamps(
       long[] timestamps, int length, IReaderByTimestamp dataReader) throws 
IOException {
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
     for (int i = 0; i < length; i++) {
-      Object value = dataReader.getValueInTimestamp(timestamps[i]);
-      if (value != null) {
-        updateSum(value);
+      if (values[i] != null) {
+        updateSum(values[i]);
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index 3fb14f0..a0e8a01 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -110,7 +110,10 @@ public class RawQueryDataSetWithValueFilter extends 
QueryDataSet implements UDFI
       if (cached.get(i)) {
         results = timeGenerator.getValues(paths.get(i));
       } else {
-        results = 
seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
+        results =
+            seriesReaderByTimestampList
+                .get(i)
+                .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
       }
 
       // 3. use values in results to fill row record
@@ -136,7 +139,7 @@ public class RawQueryDataSetWithValueFilter extends 
QueryDataSet implements UDFI
 
   @Override
   public boolean hasNextRowInObjects() throws IOException {
-    if (!cachedRowRecords.isEmpty()) {
+    if (!cachedRowInObjects.isEmpty()) {
       return true;
     }
     return cacheRowInObjects();
@@ -144,7 +147,7 @@ public class RawQueryDataSetWithValueFilter extends 
QueryDataSet implements UDFI
 
   @Override
   public Object[] nextRowInObjects() throws IOException {
-    if (cachedRowRecords.isEmpty() && !cacheRowInObjects()) {
+    if (cachedRowInObjects.isEmpty() && !cacheRowInObjects()) {
       // values + timestamp
       return new Object[seriesReaderByTimestampList.size() + 1];
     }
@@ -161,6 +164,10 @@ public class RawQueryDataSetWithValueFilter extends 
QueryDataSet implements UDFI
     while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
       cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
     }
+    if (cachedTimeCnt == 0) {
+      return false;
+    }
+
     Object[][] rowsInObject = new 
Object[cachedTimeCnt][seriesReaderByTimestampList.size() + 1];
     for (int i = 0; i < cachedTimeCnt; i++) {
       rowsInObject[i][seriesReaderByTimestampList.size()] = cachedTimeArray[i];
@@ -174,13 +181,18 @@ public class RawQueryDataSetWithValueFilter extends 
QueryDataSet implements UDFI
       if (cached.get(i)) {
         results = timeGenerator.getValues(paths.get(i));
       } else {
-        results = 
seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
+        results =
+            seriesReaderByTimestampList
+                .get(i)
+                .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
       }
 
       // 3. use values in results to fill row record
       for (int j = 0; j < cachedTimeCnt; j++) {
-        if (results[j] != null) hasField[i] = true;
-        rowsInObject[j][i] = results[j];
+        if (results[j] != null) {
+          hasField[j] = true;
+          rowsInObject[j][i] = results[j];
+        }
       }
     }
     // 4. remove rowRecord if all values in one timestamp are null
@@ -191,6 +203,6 @@ public class RawQueryDataSetWithValueFilter extends 
QueryDataSet implements UDFI
     }
 
     // 5. check whether there is next row record
-    return !cachedRowRecords.isEmpty();
+    return !cachedRowInObjects.isEmpty();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
index befcc08..e9fa985 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
@@ -32,39 +32,40 @@ public class ByTimestampReaderAdapter implements 
IReaderByTimestamp {
   // only cache the first point that >= timestamp
   private boolean hasCached;
   private TimeValuePair pair;
+  private long currentTime = Long.MIN_VALUE;
 
   public ByTimestampReaderAdapter(IPointReader pointReader) {
     this.pointReader = pointReader;
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
-    if (hasCached) {
-      if (pair.getTimestamp() < timestamp) {
-        hasCached = false;
-      } else if (pair.getTimestamp() == timestamp) {
-        hasCached = false;
-        return pair.getValue().getValue();
-      } else {
-        return null;
-      }
-    }
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws 
IOException {
+    Object[] result = new Object[length];
 
-    while (pointReader.hasNextTimeValuePair()) {
-      pair = pointReader.nextTimeValuePair();
-      if (pair.getTimestamp() == timestamp) {
-        return pair.getValue().getValue();
-      } else if (pair.getTimestamp() > timestamp) {
-        hasCached = true;
-        return null;
+    for (int i = 0; i < length; i++) {
+      if (timestamps[i] < currentTime) {
+        throw new IOException("time must be increasing when use 
ReaderByTimestamp");
+      }
+      currentTime = timestamps[i];
+      // search cache
+      if (hasCached && pair.getTimestamp() >= currentTime) {
+        if (pair.getTimestamp() == currentTime) {
+          hasCached = false;
+          result[i] = pair.getValue().getValue();
+        }
+        continue;
+      }
+      // search reader
+      while (pointReader.hasNextTimeValuePair()) {
+        pair = pointReader.nextTimeValuePair();
+        if (pair.getTimestamp() == currentTime) {
+          result[i] = pair.getValue().getValue();
+        } else if (pair.getTimestamp() > currentTime) {
+          hasCached = true;
+          result[i] = null;
+        }
       }
     }
-
-    return null;
-  }
-
-  @Override
-  public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
-    return null;
+    return result;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
index 5095ccd..0372b7e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkReaderByTimestamp.java
@@ -34,51 +34,49 @@ public class DiskChunkReaderByTimestamp implements 
IReaderByTimestamp {
 
   private ChunkReaderByTimestamp chunkReaderByTimestamp;
   private BatchData data;
+  private long currentTime = Long.MIN_VALUE;
 
   public DiskChunkReaderByTimestamp(ChunkReaderByTimestamp 
chunkReaderByTimestamp) {
     this.chunkReaderByTimestamp = chunkReaderByTimestamp;
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
+  public Object[] getValuesInTimestamps(long[] timestamps, int length) throws 
IOException {
+    Object[] result = new Object[length];
 
-    if (!hasNext()) {
-      return null;
-    }
-
-    while (data != null) {
-      Object value = data.getValueInTimestamp(timestamp);
-      if (value != null) {
-        return value;
+    for (int i = 0; i < length; i++) {
+      if (timestamps[i] < currentTime) {
+        throw new IOException("time must be increasing when use 
ReaderByTimestamp");
       }
-      if (data.hasCurrent()) {
-        return null;
-      } else {
-        chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
-        if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
-          data = chunkReaderByTimestamp.nextPageData();
-        } else {
-          return null;
+      currentTime = timestamps[i];
+      while (hasNext()) {
+        data = next();
+        if (data.getMaxTimestamp() > currentTime) {
+          result[i] = null;
+          break;
+        }
+        result[i] = data.getValueInTimestamp(currentTime);
+        // fill cache
+        if (!data.hasCurrent() && 
chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+          data = next();
         }
       }
     }
+    return result;
+  }
 
-    return null;
+  private boolean hasCacheData() {
+    return data != null && data.hasCurrent();
   }
 
-  @Override
-  public Object[] getValueInTimestamps(long[] timestamps) throws IOException {
-    return null;
+  private boolean hasNext() {
+    return hasCacheData() || chunkReaderByTimestamp.hasNextSatisfiedPage();
   }
 
-  private boolean hasNext() throws IOException {
-    if (data != null && data.hasCurrent()) {
-      return true;
-    }
-    if (chunkReaderByTimestamp != null && 
chunkReaderByTimestamp.hasNextSatisfiedPage()) {
-      data = chunkReaderByTimestamp.nextPageData();
-      return true;
+  private BatchData next() throws IOException {
+    if (hasCacheData()) {
+      return data;
     }
-    return false;
+    return chunkReaderByTimestamp.nextPageData();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
index 28a5ad1..0db7bab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
@@ -34,13 +34,7 @@ public interface IReaderByTimestamp {
    * guarantee of correctness with any other way of calling. For example, DO 
NOT call this method
    * twice with the same timestamp.
    */
-  Object getValueInTimestamp(long timestamp) throws IOException;
-
-  /**
-   * Returns all the corresponding values under the array of timestamp. 
Returns null if no value
-   * under one timestamp.
-   */
-  Object[] getValueInTimestamps(long[] timestamps) throws IOException;
+  Object[] getValuesInTimestamps(long[] timestamps, int length) throws 
IOException;
 
   /**
    * Returns whether there is no more data in reader.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index 551c890..00b6df3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -67,20 +67,10 @@ public class SeriesReaderByTimestamp implements 
IReaderByTimestamp {
   }
 
   @Override
-  public Object getValueInTimestamp(long timestamp) throws IOException {
-    seriesReader.setTimeFilter(timestamp);
-    if ((batchData == null || !hasAvailableData(batchData, timestamp)) && 
!hasNext(timestamp)) {
-      return null;
-    }
-
-    return batchData.getValueInTimestamp(timestamp);
-  }
-
-  @Override
-  public Object[] getValueInTimestamps(long[] timestamp) throws IOException {
-    seriesReader.setTimeFilter(timestamp[0]);
-    Object[] results = new Object[timestamp.length];
-    for (int i = 0; i < timestamp.length; i++) {
+  public Object[] getValuesInTimestamps(long[] timestamp, int length) throws 
IOException {
+    Object[] results = new Object[length];
+    for (int i = 0; i < length; i++) {
+      seriesReader.setTimeFilter(timestamp[0]);
       if ((batchData == null || !hasAvailableData(batchData, timestamp[i]))
           && !hasNext(timestamp[i])) {
         // there is no more data
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 4e71afa..d78d6d4 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -80,14 +80,18 @@ public class SeriesReaderByTimestampTest {
             null,
             true);
 
+    long timestamps[] = new long[500];
+    for (int i = 0; i < 500; i++) {
+      timestamps[i] = i;
+    }
+    Object[] values = seriesReader.getValuesInTimestamps(timestamps, 
timestamps.length);
     for (int time = 0; time < 500; time++) {
-      Integer value = (Integer) seriesReader.getValueInTimestamp(time);
       if (time < 200) {
-        Assert.assertEquals(time + 20000, value.intValue());
+        Assert.assertEquals(time + 20000, values[time]);
       } else if (time < 260 || (time >= 300 && time < 380) || (time >= 400)) {
-        Assert.assertEquals(time + 10000, value.intValue());
+        Assert.assertEquals(time + 10000, values[time]);
       } else {
-        Assert.assertEquals(time, value.intValue());
+        Assert.assertEquals(time, values[time]);
       }
     }
   }

Reply via email to