This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a3646a3 [IOTDB-500] Let timeColumn and batchData store time in the
same struct (#913)
a3646a3 is described below
commit a3646a34d70c1a91c80578a2c735a7cf4f488523
Author: Dawei Liu <[email protected]>
AuthorDate: Tue Mar 17 15:40:34 2020 +0800
[IOTDB-500] Let timeColumn and batchData store time in the same struct
(#913)
* [IOTDB-500] Let timeColumn and batchData store time in the same struct
---
.../iotdb/tsfile/common/conf/TSFileConfig.java | 4 +-
.../apache/iotdb/tsfile/read/common/BatchData.java | 139 +++++++++++++--------
.../iotdb/tsfile/read/common/TimeColumn.java | 89 ++++++++-----
.../read/query/timegenerator/node/AndNode.java | 2 +-
.../read/query/timegenerator/node/OrNode.java | 2 +-
5 files changed, 149 insertions(+), 87 deletions(-)
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 8a690ad..3510782 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -64,9 +64,9 @@ public class TSFileConfig {
public static final double MAX_BLOOM_FILTER_ERROR_RATE = 0.1;
/**
- * The default grow size of class BatchData.
+ * The primitive array capacity threshold.
*/
- public static final int DYNAMIC_DATA_SIZE = 1000;
+ public static final int ARRAY_CAPACITY_THRESHOLD = 1000;
/**
* Memory size threshold for flushing to disk, default value is 128MB.
*/
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index a4c316f..1c4ae73 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.common;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -50,8 +51,8 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong;
public class BatchData implements Serializable {
private static final long serialVersionUID = -4620310601188394839L;
+ private static final int capacityThreshold =
TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
private int capacity = 16;
- private int capacityThreshold = 1024;
private TSDataType dataType;
@@ -69,13 +70,13 @@ public class BatchData implements Serializable {
private int count;
- private ArrayList<long[]> timeRet;
- private ArrayList<boolean[]> booleanRet;
- private ArrayList<int[]> intRet;
- private ArrayList<long[]> longRet;
- private ArrayList<float[]> floatRet;
- private ArrayList<double[]> doubleRet;
- private ArrayList<Binary[]> binaryRet;
+ private List<long[]> timeRet;
+ private List<boolean[]> booleanRet;
+ private List<int[]> intRet;
+ private List<long[]> longRet;
+ private List<float[]> floatRet;
+ private List<double[]> doubleRet;
+ private List<Binary[]> binaryRet;
public BatchData() {
dataType = null;
@@ -95,13 +96,11 @@ public class BatchData implements Serializable {
}
public boolean hasCurrent() {
- if (readCurListIndex < writeCurListIndex) {
- return readCurArrayIndex < capacity;
- } else if (readCurListIndex == writeCurListIndex) {
+ if (readCurListIndex == writeCurListIndex) {
return readCurArrayIndex < writeCurArrayIndex;
- } else {
- return false;
}
+
+ return readCurListIndex < writeCurListIndex && readCurArrayIndex <
capacity;
}
public void next() {
@@ -174,7 +173,6 @@ public class BatchData implements Serializable {
this.readCurArrayIndex = 0;
this.writeCurListIndex = 0;
this.writeCurArrayIndex = 0;
- capacityThreshold = TSFileConfig.DYNAMIC_DATA_SIZE;
timeRet = new ArrayList<>();
timeRet.add(new long[capacity]);
@@ -224,17 +222,23 @@ public class BatchData implements Serializable {
writeCurListIndex++;
writeCurArrayIndex = 0;
} else {
- long[] newTimeData = new long[capacity * 2];
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ boolean[] newValueData = new boolean[newCapacity];
+
System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
- timeRet.set(0, newTimeData);
- boolean[] newValueData = new boolean[capacity * 2];
System.arraycopy(booleanRet.get(0), 0, newValueData, 0, capacity);
+
+ timeRet.set(0, newTimeData);
booleanRet.set(0, newValueData);
- capacity = capacity * 2;
+
+ capacity = newCapacity;
}
}
- (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
- (booleanRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+ timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+ booleanRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
writeCurArrayIndex++;
count++;
}
@@ -253,17 +257,23 @@ public class BatchData implements Serializable {
writeCurListIndex++;
writeCurArrayIndex = 0;
} else {
- long[] newTimeData = new long[capacity * 2];
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ int[] newValueData = new int[newCapacity];
+
System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
- timeRet.set(0, newTimeData);
- int[] newValueData = new int[capacity * 2];
System.arraycopy(intRet.get(0), 0, newValueData, 0, capacity);
+
+ timeRet.set(0, newTimeData);
intRet.set(0, newValueData);
- capacity = capacity * 2;
+
+ capacity = newCapacity;
}
}
- (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
- (intRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+ timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+ intRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
writeCurArrayIndex++;
count++;
}
@@ -282,17 +292,23 @@ public class BatchData implements Serializable {
writeCurListIndex++;
writeCurArrayIndex = 0;
} else {
- long[] newTimeData = new long[capacity * 2];
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ long[] newValueData = new long[newCapacity];
+
System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
- timeRet.set(0, newTimeData);
- long[] newValueData = new long[capacity * 2];
System.arraycopy(longRet.get(0), 0, newValueData, 0, capacity);
+
+ timeRet.set(0, newTimeData);
longRet.set(0, newValueData);
- capacity = capacity * 2;
+
+ capacity = newCapacity;
}
}
- (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
- (longRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+ timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+ longRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
writeCurArrayIndex++;
count++;
}
@@ -311,17 +327,23 @@ public class BatchData implements Serializable {
writeCurListIndex++;
writeCurArrayIndex = 0;
} else {
- long[] newTimeData = new long[capacity * 2];
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ float[] newValueData = new float[newCapacity];
+
System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
- timeRet.set(0, newTimeData);
- float[] newValueData = new float[capacity * 2];
System.arraycopy(floatRet.get(0), 0, newValueData, 0, capacity);
+
+ timeRet.set(0, newTimeData);
floatRet.set(0, newValueData);
- capacity = capacity * 2;
+
+ capacity = newCapacity;
}
}
- (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
- (floatRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+ timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+ floatRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
writeCurArrayIndex++;
count++;
}
@@ -340,17 +362,22 @@ public class BatchData implements Serializable {
writeCurListIndex++;
writeCurArrayIndex = 0;
} else {
- long[] newTimeData = new long[capacity * 2];
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ double[] newValueData = new double[newCapacity];
+
System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
- timeRet.set(0, newTimeData);
- double[] newValueData = new double[capacity * 2];
System.arraycopy(doubleRet.get(0), 0, newValueData, 0, capacity);
+
+ timeRet.set(0, newTimeData);
doubleRet.set(0, newValueData);
- capacity = capacity * 2;
+ capacity = newCapacity;
}
}
- (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
- (doubleRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+ timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+ doubleRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
writeCurArrayIndex++;
count++;
}
@@ -369,17 +396,23 @@ public class BatchData implements Serializable {
writeCurListIndex++;
writeCurArrayIndex = 0;
} else {
- long[] newTimeData = new long[capacity * 2];
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ Binary[] newValueData = new Binary[newCapacity];
+
System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
- timeRet.set(0, newTimeData);
- Binary[] newValueData = new Binary[capacity * 2];
System.arraycopy(binaryRet.get(0), 0, newValueData, 0, capacity);
+
+ timeRet.set(0, newTimeData);
binaryRet.set(0, newValueData);
- capacity = capacity * 2;
+
+ capacity = newCapacity;
}
}
- (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
- (binaryRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+ timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+ binaryRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
writeCurArrayIndex++;
count++;
}
@@ -520,11 +553,7 @@ public class BatchData implements Serializable {
}
public TimeColumn getTimeColumn() {
- TimeColumn timeSeries = new TimeColumn(length());
- for (int i = 0; i < length(); i++) {
- timeSeries.add(getTimeByIndex(i));
- }
- return timeSeries;
+ return new TimeColumn(timeRet, count, capacity);
}
public BatchDataIterator getBatchDataIterator() {
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
index 3b106a0..7754b40 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
@@ -19,60 +19,93 @@
package org.apache.iotdb.tsfile.read.common;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+
public class TimeColumn {
- private static final int DEFAULT_INIT_SIZE = 1000;
+ private static final int capacityThreshold =
TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
+ private int capacity = 16;
+ // outer list index for read
+ private int readCurListIndex;
+ // inner array index for read
+ private int readCurArrayIndex;
- private long[] times;
+ // outer list index for write
+ private int writeCurListIndex;
+ // inner array index for write
+ private int writeCurArrayIndex;
- private int size;
+ // the insert timestamp number of timeRet
+ private int count;
- private int cur;
+ private List<long[]> timeRet;
public TimeColumn() {
- this(DEFAULT_INIT_SIZE);
- }
-
- public TimeColumn(int initSize) {
- times = new long[initSize];
+ this.readCurListIndex = 0;
+ this.readCurArrayIndex = 0;
+ this.writeCurListIndex = 0;
+ this.writeCurArrayIndex = 0;
+ timeRet = new ArrayList<>();
+ timeRet.add(new long[capacity]);
+ count = 0;
}
+ public TimeColumn(List<long[]> timeRet, int count, int capacity) {
+ this.count = count;
+ this.readCurListIndex = 0;
+ this.readCurArrayIndex = 0;
+ this.capacity = capacity;
- public TimeColumn(long[] times) {
- this.times = times;
+ this.writeCurListIndex = count / capacity;
+ this.writeCurArrayIndex = count % capacity;
+ this.timeRet = timeRet;
}
public void add(long time) {
- if (size == times.length) {
- long[] newArray = new long[times.length * 2];
- System.arraycopy(times, 0, newArray, 0, times.length);
- times = newArray;
+ if (writeCurArrayIndex == capacity) {
+ if (capacity >= capacityThreshold) {
+ timeRet.add(new long[capacity]);
+ writeCurListIndex++;
+ writeCurArrayIndex = 0;
+ } else {
+ int newCapacity = capacity << 1;
+
+ long[] newTimeData = new long[newCapacity];
+ System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
+ timeRet.set(0, newTimeData);
+
+ capacity = newCapacity;
+ }
}
- times[size++] = time;
- }
-
- public long[] getTimes() {
- return times;
+ timeRet.get(writeCurListIndex)[writeCurArrayIndex] = time;
+ writeCurArrayIndex++;
+ count++;
}
public boolean hasCurrent() {
- return size > 0 && cur < size;
+ if (readCurListIndex == writeCurListIndex) {
+ return readCurArrayIndex < writeCurArrayIndex;
+ }
+
+ return readCurListIndex < writeCurListIndex && readCurArrayIndex <
capacity;
}
public long currentTime() {
- return times[cur];
+ return this.timeRet.get(readCurListIndex)[readCurArrayIndex];
}
public void next() {
- cur++;
- }
-
- public long getLastTime() {
- return times[size - 1];
+ readCurArrayIndex++;
+ if (readCurArrayIndex == capacity) {
+ readCurArrayIndex = 0;
+ readCurListIndex++;
+ }
}
public int size() {
- return size;
+ return this.count;
}
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
index 1b298ba..650a741 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
@@ -53,7 +53,7 @@ public class AndNode implements Node {
if (hasCachedValue) {
return true;
}
- cachedTimeColumn = new TimeColumn(fetchSize);
+ cachedTimeColumn = new TimeColumn();
//fill data
fillLeftCache();
fillRightCache();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
index aef0419..b9d2eb8 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
@@ -64,7 +64,7 @@ public class OrNode implements Node {
return true;
}
- cachedTimeColumn = new TimeColumn(fetchSize);
+ cachedTimeColumn = new TimeColumn();
while (hasLeftValue() && hasRightValue()) {
long leftValue = leftTimeColumn.currentTime();