This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch proceeding_vldb
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/proceeding_vldb by this push:
     new 5bcdbb2  finish sliding windows
5bcdbb2 is described below

commit 5bcdbb2883c543cfb89420d2686b3711c705dbca
Author: EJTTianyu <[email protected]>
AuthorDate: Mon Feb 22 10:52:05 2021 +0800

    finish sliding windows
---
 .../resources/conf/iotdb-engine.properties         | 10 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 ++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  8 +++
 .../engine/storagegroup/StorageGroupProcessor.java | 72 ++++++++++++++++++++--
 .../db/engine/storagegroup/TsFileProcessor.java    | 19 +++++-
 5 files changed, 126 insertions(+), 7 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 1b9f9de..931d7d5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -283,6 +283,16 @@ encoding_task_queue_size_for_flushing=2147483647
 io_task_queue_size_for_flushing=2147483647
 
 ####################
+### Sliding Memory Table Configurations
+####################
+
+# Whether to enable sliding memory table
+enable_sliding_mem_table=true
+
+# Sliding window threshold, default, 16 MB
+sliding_window_threshold=16777216
+
+####################
 ### Upgrade Configurations
 ####################
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c4be6f7..232a50b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -70,10 +70,34 @@ public class IoTDBConfig {
     this.overlapSplit = overlapSplit;
   }
 
+  public boolean isEnableSlidingMemTable() {
+    return enableSlidingMemTable;
+  }
+
+  public void setEnableSlidingMemTable(boolean enableSlidingMemTable) {
+    this.enableSlidingMemTable = enableSlidingMemTable;
+  }
+
+  public int getSlidingWindowThreshold() {
+    return slidingWindowThreshold;
+  }
+
+  public void setSlidingWindowThreshold(int slidingWindowThreshold) {
+    this.slidingWindowThreshold = slidingWindowThreshold;
+  }
+
   /**
    * storage engine configurations
    */
   private boolean overlapSplit = true;
+  /**
+   * Whether to enable sliding memory table
+   */
+  private boolean enableSlidingMemTable = true;
+  /**
+   * Sliding window threshold, default, 16 MB
+   */
+  private int slidingWindowThreshold = 16 * 1024 * 1024;
 
   /**
    * Port which the metrics service listens to.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3c6e703..36457f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -322,6 +322,14 @@ public class IoTDBDescriptor {
           properties.getProperty("enable_unseq_compaction",
               Boolean.toString(conf.isEnableUnseqCompaction()))));
 
+      conf.setEnableSlidingMemTable(Boolean.parseBoolean(properties
+          .getProperty("enable_sliding_mem_table",
+              Boolean.toString(conf.isEnableSlidingMemTable()))));
+
+      conf.setSlidingWindowThreshold(Integer.parseInt(properties
+          .getProperty("sliding_window_threshold",
+              Integer.toString(conf.getSlidingWindowThreshold()))));
+
       
conf.setFirstLevelFileNum(Integer.parseInt(properties.getProperty("first_level_file_num",
           Integer.toString(conf.getFirstLevelFileNum()))));
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 7d3519e..5dceb6a 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -164,6 +164,10 @@ public class StorageGroupProcessor {
    */
   protected final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors 
= new TreeMap<>();
   /**
+   * time partition id in the storage group -> flushingTsFileProcessor for 
this time partition
+   */
+  protected final TreeMap<Long, TsFileProcessor> 
flushingSequenceTsFileProcessors = new TreeMap<>();
+  /**
    * time partition id in the storage group -> tsFileProcessor for this time 
partition
    */
   protected final TreeMap<Long, TsFileProcessor> 
workUnsequenceTsFileProcessors = new TreeMap<>();
@@ -201,6 +205,12 @@ public class StorageGroupProcessor {
    */
   protected Map<Long, Map<String, Long>> 
partitionLatestFlushedTimeForEachDevice = new HashMap<>();
 
+  /*
+   * time partition id -> map, when a memory table is marked as to be flush, 
use latestTimeForEachDevice
+   * to update the flushingLatestTimeForEachDevice, and is used to update 
partitionLatestFlushedTimeForEachDevice
+   * when a flush is actually issued.
+   */
+  protected Map<Long, Map<String, Long>> flushingLatestTimeForEachDevice = new 
HashMap<>();
   /**
    * used to record the latest flush time while upgrading and inserting
    */
@@ -692,6 +702,10 @@ public class StorageGroupProcessor {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
+      if (config.isEnableSlidingMemTable()) {
+        flushingLatestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> 
new HashMap<>());
+      }
+
       boolean isSequence =
           insertRowPlan.getTime() > 
partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
               .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE);
@@ -913,7 +927,18 @@ public class StorageGroupProcessor {
       return;
     }
 
-    tsFileProcessor.insert(insertRowPlan);
+    TsFileProcessor flushingProcessor = 
flushingSequenceTsFileProcessors.get(timePartitionId);
+    boolean toFlushingProcessor = false;
+
+    if (config.isEnableSlidingMemTable() && sequence && 
isInsertToFlushingMemTable(timePartitionId,
+        insertRowPlan)) {
+      toFlushingProcessor = true;
+    }
+    if (toFlushingProcessor) {
+      flushingProcessor.insert(insertRowPlan);
+    } else {
+      tsFileProcessor.insert(insertRowPlan);
+    }
 
     // try to update the latest time of the device of this tsRecord
     if (latestTimeForEachDevice.get(timePartitionId)
@@ -928,12 +953,52 @@ public class StorageGroupProcessor {
 
     tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime);
 
+    if (config.isEnableSlidingMemTable() && sequence) {
+      if (flushingProcessor == null && tsFileProcessor.shouldFlush()) {
+        flushingSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
+        Map<String, Long> curPartitionDeviceLatestTime = 
latestTimeForEachDevice
+            .get(timePartitionId);
+        if (curPartitionDeviceLatestTime != null) {
+          for (Entry<String, Long> entry : 
curPartitionDeviceLatestTime.entrySet()) {
+            flushingLatestTimeForEachDevice
+                .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+                .put(entry.getKey(), entry.getValue());
+          }
+        }
+        workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
+        // if unsequence files don't contain this time range id, we should 
remove it's version controller
+        if 
(!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) 
{
+          
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
+        }
+      } else if (flushingProcessor != null && 
tsFileProcessor.shouldCloseFlushing()) {
+        fileFlushPolicy.apply(this, flushingProcessor, sequence);
+        flushingSequenceTsFileProcessors.put(timePartitionId, null);
+        Map<String, Long> curPartitionDeviceLatestTime = 
flushingLatestTimeForEachDevice
+            .get(timePartitionId);
+        if (curPartitionDeviceLatestTime != null) {
+          for (Entry<String, Long> entry : 
curPartitionDeviceLatestTime.entrySet()) {
+            partitionLatestFlushedTimeForEachDevice
+                .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+                .put(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+      return;
+    }
     // check memtable size and may asyncTryToFlush the work memtable
     if (tsFileProcessor.shouldFlush()) {
       fileFlushPolicy.apply(this, tsFileProcessor, sequence);
     }
   }
 
+  /**
+   * judge whether a insert plan should be inserted into the flushingMemtable
+   */
+  private boolean isInsertToFlushingMemTable(long timePartitionId, 
InsertRowPlan insertRowPlan){
+    return flushingLatestTimeForEachDevice.get(timePartitionId).
+        getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE) >= insertRowPlan.getTime();
+  }
+
   protected void tryToUpdateInsertLastCache(InsertRowPlan plan, Long 
latestFlushedTime) {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
@@ -1148,11 +1213,6 @@ public class StorageGroupProcessor {
       updateEndTimeMap(tsFileProcessor);
       tsFileProcessor.asyncClose();
 
-      workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
-      // if unsequence files don't contain this time range id, we should 
remove it's version controller
-      if 
(!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) 
{
-        
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
-      }
       logger.info("close a sequence tsfile processor {}", storageGroupName);
     } else {
       closingUnSequenceTsFileProcessor.add(tsFileProcessor);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c3ba768..4735996 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -421,7 +421,24 @@ public class TsFileProcessor {
           workMemTable.memSize(), 
tsFileResource.getTsFile().getAbsolutePath());
       return true;
     }
-    if (!enableMemControl && workMemTable.memSize() >= 
getMemtableSizeThresholdBasedOnSeriesNum()) {
+    if (workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) {
+      logger.info("The memtable size {} of tsfile {} reaches the threshold",
+          workMemTable.memSize(), 
tsFileResource.getTsFile().getAbsolutePath());
+      return true;
+    }
+    return false;
+  }
+
+  public boolean shouldCloseFlushing() {
+    if (workMemTable == null) {
+      return false;
+    }
+    if (shouldFlush) {
+      logger.info("The memtable size {} of tsfile {} reaches the mem control 
threshold",
+          workMemTable.memSize(), 
tsFileResource.getTsFile().getAbsolutePath());
+      return true;
+    }
+    if (workMemTable.memSize() >= config.getSlidingWindowThreshold()) {
       logger.info("The memtable size {} of tsfile {} reaches the threshold",
           workMemTable.memSize(), 
tsFileResource.getTsFile().getAbsolutePath());
       return true;

Reply via email to