This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a3646a3  [IOTDB-500] Let timeColumn and batchData store time in the 
same struct (#913)
a3646a3 is described below

commit a3646a34d70c1a91c80578a2c735a7cf4f488523
Author: Dawei Liu <[email protected]>
AuthorDate: Tue Mar 17 15:40:34 2020 +0800

    [IOTDB-500] Let timeColumn and batchData store time in the same struct 
(#913)
    
    * [IOTDB-500] Let timeColumn and batchData store time in the same struct
---
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |   4 +-
 .../apache/iotdb/tsfile/read/common/BatchData.java | 139 +++++++++++++--------
 .../iotdb/tsfile/read/common/TimeColumn.java       |  89 ++++++++-----
 .../read/query/timegenerator/node/AndNode.java     |   2 +-
 .../read/query/timegenerator/node/OrNode.java      |   2 +-
 5 files changed, 149 insertions(+), 87 deletions(-)

diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 8a690ad..3510782 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -64,9 +64,9 @@ public class TSFileConfig {
   public static final double MAX_BLOOM_FILTER_ERROR_RATE = 0.1;
 
   /**
-   * The default grow size of class BatchData.
+   * The primitive array capacity threshold.
    */
-  public static final int DYNAMIC_DATA_SIZE = 1000;
+  public static final int ARRAY_CAPACITY_THRESHOLD = 1000;
   /**
    * Memory size threshold for flushing to disk, default value is 128MB.
    */
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index a4c316f..1c4ae73 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.common;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.List;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -50,8 +51,8 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType.TsLong;
 public class BatchData implements Serializable {
 
   private static final long serialVersionUID = -4620310601188394839L;
+  private static final int capacityThreshold = 
TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
   private int capacity = 16;
-  private int capacityThreshold = 1024;
 
   private TSDataType dataType;
 
@@ -69,13 +70,13 @@ public class BatchData implements Serializable {
   private int count;
 
 
-  private ArrayList<long[]> timeRet;
-  private ArrayList<boolean[]> booleanRet;
-  private ArrayList<int[]> intRet;
-  private ArrayList<long[]> longRet;
-  private ArrayList<float[]> floatRet;
-  private ArrayList<double[]> doubleRet;
-  private ArrayList<Binary[]> binaryRet;
+  private List<long[]> timeRet;
+  private List<boolean[]> booleanRet;
+  private List<int[]> intRet;
+  private List<long[]> longRet;
+  private List<float[]> floatRet;
+  private List<double[]> doubleRet;
+  private List<Binary[]> binaryRet;
 
   public BatchData() {
     dataType = null;
@@ -95,13 +96,11 @@ public class BatchData implements Serializable {
   }
 
   public boolean hasCurrent() {
-    if (readCurListIndex < writeCurListIndex) {
-      return readCurArrayIndex < capacity;
-    } else if (readCurListIndex == writeCurListIndex) {
+    if (readCurListIndex == writeCurListIndex) {
       return readCurArrayIndex < writeCurArrayIndex;
-    } else {
-      return false;
     }
+
+    return readCurListIndex < writeCurListIndex && readCurArrayIndex < 
capacity;
   }
 
   public void next() {
@@ -174,7 +173,6 @@ public class BatchData implements Serializable {
     this.readCurArrayIndex = 0;
     this.writeCurListIndex = 0;
     this.writeCurArrayIndex = 0;
-    capacityThreshold = TSFileConfig.DYNAMIC_DATA_SIZE;
 
     timeRet = new ArrayList<>();
     timeRet.add(new long[capacity]);
@@ -224,17 +222,23 @@ public class BatchData implements Serializable {
         writeCurListIndex++;
         writeCurArrayIndex = 0;
       } else {
-        long[] newTimeData = new long[capacity * 2];
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        boolean[] newValueData = new boolean[newCapacity];
+
         System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
-        timeRet.set(0, newTimeData);
-        boolean[] newValueData = new boolean[capacity * 2];
         System.arraycopy(booleanRet.get(0), 0, newValueData, 0, capacity);
+
+        timeRet.set(0, newTimeData);
         booleanRet.set(0, newValueData);
-        capacity = capacity * 2;
+
+        capacity = newCapacity;
       }
     }
-    (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
-    (booleanRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+    timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+    booleanRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
     writeCurArrayIndex++;
     count++;
   }
@@ -253,17 +257,23 @@ public class BatchData implements Serializable {
         writeCurListIndex++;
         writeCurArrayIndex = 0;
       } else {
-        long[] newTimeData = new long[capacity * 2];
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        int[] newValueData = new int[newCapacity];
+
         System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
-        timeRet.set(0, newTimeData);
-        int[] newValueData = new int[capacity * 2];
         System.arraycopy(intRet.get(0), 0, newValueData, 0, capacity);
+
+        timeRet.set(0, newTimeData);
         intRet.set(0, newValueData);
-        capacity = capacity * 2;
+
+        capacity = newCapacity;
       }
     }
-    (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
-    (intRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+    timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+    intRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
     writeCurArrayIndex++;
     count++;
   }
@@ -282,17 +292,23 @@ public class BatchData implements Serializable {
         writeCurListIndex++;
         writeCurArrayIndex = 0;
       } else {
-        long[] newTimeData = new long[capacity * 2];
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        long[] newValueData = new long[newCapacity];
+
         System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
-        timeRet.set(0, newTimeData);
-        long[] newValueData = new long[capacity * 2];
         System.arraycopy(longRet.get(0), 0, newValueData, 0, capacity);
+
+        timeRet.set(0, newTimeData);
         longRet.set(0, newValueData);
-        capacity = capacity * 2;
+
+        capacity = newCapacity;
       }
     }
-    (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
-    (longRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+    timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+    longRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
     writeCurArrayIndex++;
     count++;
   }
@@ -311,17 +327,23 @@ public class BatchData implements Serializable {
         writeCurListIndex++;
         writeCurArrayIndex = 0;
       } else {
-        long[] newTimeData = new long[capacity * 2];
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        float[] newValueData = new float[newCapacity];
+
         System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
-        timeRet.set(0, newTimeData);
-        float[] newValueData = new float[capacity * 2];
         System.arraycopy(floatRet.get(0), 0, newValueData, 0, capacity);
+
+        timeRet.set(0, newTimeData);
         floatRet.set(0, newValueData);
-        capacity = capacity * 2;
+
+        capacity = newCapacity;
       }
     }
-    (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
-    (floatRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+    timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+    floatRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
     writeCurArrayIndex++;
     count++;
   }
@@ -340,17 +362,22 @@ public class BatchData implements Serializable {
         writeCurListIndex++;
         writeCurArrayIndex = 0;
       } else {
-        long[] newTimeData = new long[capacity * 2];
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        double[] newValueData = new double[newCapacity];
+
         System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
-        timeRet.set(0, newTimeData);
-        double[] newValueData = new double[capacity * 2];
         System.arraycopy(doubleRet.get(0), 0, newValueData, 0, capacity);
+
+        timeRet.set(0, newTimeData);
         doubleRet.set(0, newValueData);
-        capacity = capacity * 2;
+        capacity = newCapacity;
       }
     }
-    (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
-    (doubleRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+    timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+    doubleRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
     writeCurArrayIndex++;
     count++;
   }
@@ -369,17 +396,23 @@ public class BatchData implements Serializable {
         writeCurListIndex++;
         writeCurArrayIndex = 0;
       } else {
-        long[] newTimeData = new long[capacity * 2];
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        Binary[] newValueData = new Binary[newCapacity];
+
         System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
-        timeRet.set(0, newTimeData);
-        Binary[] newValueData = new Binary[capacity * 2];
         System.arraycopy(binaryRet.get(0), 0, newValueData, 0, capacity);
+
+        timeRet.set(0, newTimeData);
         binaryRet.set(0, newValueData);
-        capacity = capacity * 2;
+
+        capacity = newCapacity;
       }
     }
-    (timeRet.get(writeCurListIndex))[writeCurArrayIndex] = t;
-    (binaryRet.get(writeCurListIndex))[writeCurArrayIndex] = v;
+    timeRet.get(writeCurListIndex)[writeCurArrayIndex] = t;
+    binaryRet.get(writeCurListIndex)[writeCurArrayIndex] = v;
+
     writeCurArrayIndex++;
     count++;
   }
@@ -520,11 +553,7 @@ public class BatchData implements Serializable {
   }
 
   public TimeColumn getTimeColumn() {
-    TimeColumn timeSeries = new TimeColumn(length());
-    for (int i = 0; i < length(); i++) {
-      timeSeries.add(getTimeByIndex(i));
-    }
-    return timeSeries;
+    return new TimeColumn(timeRet, count, capacity);
   }
 
   public BatchDataIterator getBatchDataIterator() {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
index 3b106a0..7754b40 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeColumn.java
@@ -19,60 +19,93 @@
 package org.apache.iotdb.tsfile.read.common;
 
 
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+
 public class TimeColumn {
 
-  private static final int DEFAULT_INIT_SIZE = 1000;
+  private static final int capacityThreshold = 
TSFileConfig.ARRAY_CAPACITY_THRESHOLD;
+  private int capacity = 16;
 
+  // outer list index for read
+  private int readCurListIndex;
+  // inner array index for read
+  private int readCurArrayIndex;
 
-  private long[] times;
+  // outer list index for write
+  private int writeCurListIndex;
+  // inner array index for write
+  private int writeCurArrayIndex;
 
-  private int size;
+  // the insert timestamp number of timeRet
+  private int count;
 
-  private int cur;
+  private List<long[]> timeRet;
 
   public TimeColumn() {
-    this(DEFAULT_INIT_SIZE);
-  }
-
-  public TimeColumn(int initSize) {
-    times = new long[initSize];
+    this.readCurListIndex = 0;
+    this.readCurArrayIndex = 0;
+    this.writeCurListIndex = 0;
+    this.writeCurArrayIndex = 0;
+    timeRet = new ArrayList<>();
+    timeRet.add(new long[capacity]);
+    count = 0;
   }
 
+  public TimeColumn(List<long[]> timeRet, int count, int capacity) {
+    this.count = count;
+    this.readCurListIndex = 0;
+    this.readCurArrayIndex = 0;
+    this.capacity = capacity;
 
-  public TimeColumn(long[] times) {
-    this.times = times;
+    this.writeCurListIndex = count / capacity;
+    this.writeCurArrayIndex = count % capacity;
+    this.timeRet = timeRet;
   }
 
   public void add(long time) {
-    if (size == times.length) {
-      long[] newArray = new long[times.length * 2];
-      System.arraycopy(times, 0, newArray, 0, times.length);
-      times = newArray;
+    if (writeCurArrayIndex == capacity) {
+      if (capacity >= capacityThreshold) {
+        timeRet.add(new long[capacity]);
+        writeCurListIndex++;
+        writeCurArrayIndex = 0;
+      } else {
+        int newCapacity = capacity << 1;
+
+        long[] newTimeData = new long[newCapacity];
+        System.arraycopy(timeRet.get(0), 0, newTimeData, 0, capacity);
+        timeRet.set(0, newTimeData);
+
+        capacity = newCapacity;
+      }
     }
-    times[size++] = time;
-  }
-
-  public long[] getTimes() {
-    return times;
+    timeRet.get(writeCurListIndex)[writeCurArrayIndex] = time;
+    writeCurArrayIndex++;
+    count++;
   }
 
   public boolean hasCurrent() {
-    return size > 0 && cur < size;
+    if (readCurListIndex == writeCurListIndex) {
+      return readCurArrayIndex < writeCurArrayIndex;
+    }
+
+    return readCurListIndex < writeCurListIndex && readCurArrayIndex < 
capacity;
   }
 
   public long currentTime() {
-    return times[cur];
+    return this.timeRet.get(readCurListIndex)[readCurArrayIndex];
   }
 
   public void next() {
-    cur++;
-  }
-
-  public long getLastTime() {
-    return times[size - 1];
+    readCurArrayIndex++;
+    if (readCurArrayIndex == capacity) {
+      readCurArrayIndex = 0;
+      readCurListIndex++;
+    }
   }
 
   public int size() {
-    return size;
+    return this.count;
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
index 1b298ba..650a741 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/AndNode.java
@@ -53,7 +53,7 @@ public class AndNode implements Node {
     if (hasCachedValue) {
       return true;
     }
-    cachedTimeColumn = new TimeColumn(fetchSize);
+    cachedTimeColumn = new TimeColumn();
     //fill data
     fillLeftCache();
     fillRightCache();
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
index aef0419..b9d2eb8 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/node/OrNode.java
@@ -64,7 +64,7 @@ public class OrNode implements Node {
       return true;
     }
 
-    cachedTimeColumn = new TimeColumn(fetchSize);
+    cachedTimeColumn = new TimeColumn();
 
     while (hasLeftValue() && hasRightValue()) {
       long leftValue = leftTimeColumn.currentTime();

Reply via email to