This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 4e03a9cb651 temp save
4e03a9cb651 is described below

commit 4e03a9cb6518240f3ed2514aaf0e0ef2b3234a5e
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 18 19:33:59 2024 +0800

    temp save
---
 .../db/queryengine/common/MPPQueryContext.java     |  10 +
 .../iotdb/db/queryengine/plan/Coordinator.java     |   2 +
 .../analyze/cache/schema/DataNodeTTLCache.java     |   4 +-
 .../plan/planner/plan/node/WritePlanNode.java      |  30 ++
 .../planner/plan/node/write/DeleteDataNode.java    |   2 +-
 .../planner/plan/node/write/InsertRowNode.java     |   2 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   2 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   5 +-
 .../db/storageengine/dataregion/DataRegion.java    | 392 ++++++++++++++++-----
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  17 +
 10 files changed, 367 insertions(+), 99 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index ec536eb3612..07437404b5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -94,6 +94,7 @@ public class MPPQueryContext {
   // index 0 represents metadataExpressions, index 1 represents 
expressionsCanPushDownToOperator,
   // index 2 represents expressionsCannotPushDownToOperator
   private List<List<Expression>> tableModelPredicateExpressions;
+  private boolean isTableQuery = false;
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
@@ -371,4 +372,13 @@ public class MPPQueryContext {
   }
 
   // endregion
+
+
+  public boolean isTableQuery() {
+    return isTableQuery;
+  }
+
+  public void setTableQuery(boolean tableQuery) {
+    isTableQuery = tableQuery;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 180edcea95d..50b1bc6e550 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -288,6 +288,7 @@ public class Coordinator {
       Metadata metadata,
       long timeOut,
       long startTime) {
+    queryContext.setTableQuery(true);
     queryContext.setTimeOut(timeOut);
     queryContext.setStartTime(startTime);
     RelationalModelPlanner relationalModelPlanner =
@@ -311,6 +312,7 @@ public class Coordinator {
       Metadata metadata,
       long timeOut,
       long startTime) {
+    queryContext.setTableQuery(true);
     queryContext.setTimeOut(timeOut);
     queryContext.setStartTime(startTime);
     if (statement instanceof DropDB
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
index 5dea6b9209d..0700aa5da51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.schema.ttl.TTLCache;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.tsfile.file.metadata.IDeviceID;
 
 import java.util.Map;
@@ -87,8 +88,7 @@ public class DataNodeTTLCache {
   public long getTTL(IDeviceID deviceID) {
     lock.readLock().lock();
     try {
-      // TODO Tien change this way
-      return 
ttlCache.getClosestTTL(deviceID.toString().split(PATH_SEPARATER_NO_REGEX));
+      return 
ttlCache.getClosestTTL(CommonUtils.deviceIdToStringArray(deviceID));
     } finally {
       lock.readLock().unlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
index dd383b52c95..664a1dc5bf5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
@@ -19,12 +19,17 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 
 import java.util.List;
 
 public abstract class WritePlanNode extends PlanNode implements 
IPartitionRelatedNode {
 
+  protected boolean writeToTable;
+
   protected WritePlanNode(PlanNodeId id) {
     super(id);
   }
@@ -34,4 +39,29 @@ public abstract class WritePlanNode extends PlanNode 
implements IPartitionRelate
   public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
     throw new UnsupportedOperationException("Not supported yet.");
   }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    super.serialize(byteBuffer);
+    byteBuffer.put(writeToTable ? (byte) 1 : (byte) 0);
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    super.serialize(stream);
+    stream.writeBoolean(writeToTable);
+  }
+
+  public boolean isWriteToTable() {
+    return writeToTable;
+  }
+
+  public void setWriteToTable(boolean writeToTable) {
+    this.writeToTable = writeToTable;
+  }
+
+  public int serializedSize() {
+    // write to table
+    return 1;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 7fbf5e03814..555132e8dcc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -143,7 +143,7 @@ public class DeleteDataNode extends WritePlanNode 
implements WALEntryValue {
 
   @Override
   public int serializedSize() {
-    int size = FIXED_SERIALIZED_SIZE;
+    int size = super.serializedSize() + FIXED_SERIALIZED_SIZE;
     for (PartialPath path : pathList) {
       size += ReadWriteIOUtils.sizeToWrite(path.getFullPath());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index bc3dcf8a774..1afb26ccbd6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -504,7 +504,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   /** Serialized size for wal. */
   @Override
   public int serializedSize() {
-    return Short.BYTES + Long.BYTES + subSerializeSize();
+    return super.serializedSize() + Short.BYTES + Long.BYTES + 
subSerializeSize();
   }
 
   protected int subSerializeSize() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 22a5483ec5b..08f67b8a4eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -284,7 +284,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   /** Serialized size for wal. */
   @Override
   public int serializedSize() {
-    return Short.BYTES + Long.BYTES + subSerializeSize();
+    return super.serializedSize() + Short.BYTES + Long.BYTES + 
subSerializeSize();
   }
 
   private int subSerializeSize() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index cef96317f8c..f0b0725af6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -715,6 +715,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
     insertNode.subDeserialize(byteBuffer);
     insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+    if (byteBuffer.hasRemaining()) {
+      insertNode.writeToTable = byteBuffer.get() == 1;
+    }
     return insertNode;
   }
 
@@ -764,7 +767,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
   /** Serialized size for wal */
   @Override
   public int serializedSize() {
-    return serializedSize(0, rowCount);
+    return super.serializedSize() + serializedSize(0, rowCount);
   }
 
   /** Serialized size for wal */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2d9a7bed20c..9bab11f9503 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
+import java.util.function.IntFunction;
+import java.util.function.IntToLongFunction;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -208,13 +210,19 @@ public class DataRegion implements IDataRegionForQuery {
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
 
-  /** Condition to safely delete data region. */
+  /**
+   * Condition to safely delete data region.
+   */
   private final Condition deletedCondition = 
insertLock.writeLock().newCondition();
 
-  /** Data region has been deleted or not. */
+  /**
+   * Data region has been deleted or not.
+   */
   private volatile boolean deleted = false;
 
-  /** closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done. */
+  /**
+   * closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done.
+   */
   private final Object closeStorageGroupCondition = new Object();
 
   /**
@@ -222,32 +230,50 @@ public class DataRegion implements IDataRegionForQuery {
    */
   private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
 
-  /** time partition id in the database -> {@link TsFileProcessor} for this 
time partition. */
+  /**
+   * time partition id in the database -> {@link TsFileProcessor} for this 
time partition.
+   */
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = 
new TreeMap<>();
 
-  /** time partition id in the database -> {@link TsFileProcessor} for this 
time partition. */
+  /**
+   * time partition id in the database -> {@link TsFileProcessor} for this 
time partition.
+   */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors 
= new TreeMap<>();
 
-  /** sequence {@link TsFileProcessor}s which are closing. */
+  /**
+   * sequence {@link TsFileProcessor}s which are closing.
+   */
   private final Set<TsFileProcessor> closingSequenceTsFileProcessor = 
ConcurrentHashMap.newKeySet();
 
-  /** unsequence {@link TsFileProcessor}s which are closing. */
+  /**
+   * unsequence {@link TsFileProcessor}s which are closing.
+   */
   private final Set<TsFileProcessor> closingUnSequenceTsFileProcessor =
       ConcurrentHashMap.newKeySet();
 
-  /** data region id. */
+  /**
+   * data region id.
+   */
   private final String dataRegionId;
 
-  /** database name. */
+  /**
+   * database name.
+   */
   private final String databaseName;
 
-  /** database system directory. */
+  /**
+   * database system directory.
+   */
   private File storageGroupSysDir;
 
-  /** manage seqFileList and unSeqFileList. */
+  /**
+   * manage seqFileList and unSeqFileList.
+   */
   private final TsFileManager tsFileManager;
 
-  /** manage tsFileResource degrade. */
+  /**
+   * manage tsFileResource degrade.
+   */
   private final TsFileResourceManager tsFileResourceManager = 
TsFileResourceManager.getInstance();
 
   /**
@@ -258,10 +284,14 @@ public class DataRegion implements IDataRegionForQuery {
   private final HashMap<Long, VersionController> 
timePartitionIdVersionControllerMap =
       new HashMap<>();
 
-  /** file system factory (local or hdfs). */
+  /**
+   * file system factory (local or hdfs).
+   */
   private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
 
-  /** File flush policy. */
+  /**
+   * File flush policy.
+   */
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
@@ -272,16 +302,24 @@ public class DataRegion implements IDataRegionForQuery {
    */
   private Map<Long, Long> partitionMaxFileVersions = new ConcurrentHashMap<>();
 
-  /** database info for mem control. */
+  /**
+   * database info for mem control.
+   */
   private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
 
-  /** whether it's ready from recovery. */
+  /**
+   * whether it's ready from recovery.
+   */
   private boolean isReady = false;
 
-  /** close file listeners. */
+  /**
+   * close file listeners.
+   */
   private List<CloseFileListener> customCloseFileListeners = 
Collections.emptyList();
 
-  /** flush listeners. */
+  /**
+   * flush listeners.
+   */
   private List<FlushListener> customFlushListeners = Collections.emptyList();
 
   private ILastFlushTimeMap lastFlushTimeMap;
@@ -391,18 +429,29 @@ public class DataRegion implements IDataRegionForQuery {
     return ret;
   }
 
-  /** this class is used to store recovering context. */
+  /**
+   * this class is used to store recovering context.
+   */
   private class DataRegionRecoveryContext {
-    /** number of files to be recovered. */
+
+    /**
+     * number of files to be recovered.
+     */
     private final long numOfFilesToRecover;
 
-    /** number of already recovered files. */
+    /**
+     * number of already recovered files.
+     */
     private long recoveredFilesNum;
 
-    /** last recovery log time. */
+    /**
+     * last recovery log time.
+     */
     private long lastLogTime;
 
-    /** recover performers of unsealed TsFiles. */
+    /**
+     * recover performers of unsealed TsFiles.
+     */
     private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new 
ArrayList<>();
 
     public DataRegionRecoveryContext(long numOfFilesToRecover) {
@@ -434,7 +483,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** recover from file */
+  /**
+   * recover from file
+   */
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
   private void recover() throws DataRegionException {
     try {
@@ -704,7 +755,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** check if the tsfile's time is smaller than system current time. */
+  /**
+   * check if the tsfile's time is smaller than system current time.
+   */
   private void checkTsFileTime(File tsFile, long currentTime) throws 
DataRegionException {
     String[] items = tsFile.getName().replace(TSFILE_SUFFIX, 
"").split(FILE_NAME_SEPARATOR);
     long fileTime = Long.parseLong(items[0]);
@@ -717,7 +770,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** submit unsealed TsFile to WALRecoverManager. */
+  /**
+   * submit unsealed TsFile to WALRecoverManager.
+   */
   private WALRecoverListener recoverUnsealedTsFile(
       TsFileResource unsealedTsFile, DataRegionRecoveryContext context, 
boolean isSeq) {
     UnsealedTsFileRecoverPerformer recoverPerformer =
@@ -802,7 +857,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** recover sealed TsFile. */
+  /**
+   * recover sealed TsFile.
+   */
   private void recoverSealedTsFiles(
       TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean 
isSeq) {
     try (SealedTsFileRecoverPerformer recoverPerformer =
@@ -904,7 +961,7 @@ public class DataRegion implements IDataRegionForQuery {
       boolean isSequence =
           config.isEnableSeparateData()
               && insertRowNode.getTime()
-                  > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
+              > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
 
       // insert to sequence or unSequence file
       TsFileProcessor tsFileProcessor =
@@ -919,13 +976,28 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  public void insertTreeTablet(InsertTabletNode insertTabletNode,
+      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter)
+      throws BatchProcessException, WriteProcessException {
+    final IDeviceID deviceID = insertTabletNode.getDeviceID();
+
+    insertTablet(insertTabletNode, i -> deviceID, i ->
+        config.isEnableSeparateData()
+            ? lastFlushTimeMap.getFlushedTime(
+            
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
+            insertTabletNode.getDeviceID())
+            : Long.MAX_VALUE
+    );
+  }
+
   /**
    * Insert a tablet (rows belonging to the same devices) into this database.
    *
    * @throws BatchProcessException if some of the rows failed to be inserted
    */
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
-  public void insertTablet(InsertTabletNode insertTabletNode)
+  public void insertTablet(InsertTabletNode insertTabletNode,
+      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter)
       throws BatchProcessException, WriteProcessException {
     StorageEngine.blockInsertionIfReject(null);
     long startTime = System.nanoTime();
@@ -937,38 +1009,130 @@ public class DataRegion implements IDataRegionForQuery {
       }
       TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
       Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
-      boolean noFailure = true;
-      long deviceTTL =
-          
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getDevicePath().getNodes());
+      boolean noFailure;
+
+      int loc = checkTTL(insertTabletNode, results, i -> 
DataNodeTTLCache.getInstance()
+          .getTTL(rowDeviceIdGetter.apply(i)));
+      noFailure = loc != 0;
+
+      // before is first start point
+      int before = loc;
+      long before
+      // before time partition
+      long beforeTimePartition =
+          
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
+      // init map
+
+      if (config.isEnableSeparateData()
+          && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
+        TimePartitionManager.getInstance()
+            .registerTimePartitionInfo(
+                new TimePartitionInfo(
+                    new DataRegionId(Integer.parseInt(dataRegionId)),
+                    beforeTimePartition,
+                    true,
+                    Long.MAX_VALUE,
+                    0));
+      }
 
-      /*
-       * assume that batch has been sorted by client
-       */
-      int loc = 0;
+      // if is sequence
+      boolean isSequence = false;
       while (loc < insertTabletNode.getRowCount()) {
-        long currTime = insertTabletNode.getTimes()[loc];
-        // skip points that do not satisfy TTL
-        if (!isAlive(currTime, deviceTTL)) {
-          results[loc] =
-              RpcUtils.getStatus(
-                  TSStatusCode.OUT_OF_TTL,
-                  String.format(
-                      "Insertion time [%s] is less than ttl time bound [%s]",
-                      DateTimeUtils.convertLongToDate(currTime),
-                      DateTimeUtils.convertLongToDate(
-                          CommonDateTimeUtils.currentTime() - deviceTTL)));
-          loc++;
-          noFailure = false;
-        } else {
-          break;
+        long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
+        long time = insertTabletNode.getTimes()[loc];
+        final long timePartitionId = 
TimePartitionUtils.getTimePartitionId(time);
+        // always in some time partition
+        // judge if we should insert sequence
+        if (!isSequence && time > lastFlushTime) {
+          // insert into unsequence and then start sequence
+          noFailure =
+              insertTabletToTsFileProcessor(
+                  insertTabletNode, before, loc, false, results,
+                  timePartitionId)
+                  && noFailure;
+          before = loc;
+          isSequence = true;
         }
+        loc++;
+      }
+
+      // do not forget last part
+      if (before < loc) {
+        noFailure =
+            insertTabletToTsFileProcessor(
+                insertTabletNode, before, loc, isSequence, results, time)
+                && noFailure;
+      }
+      startTime = System.nanoTime();
+      tryToUpdateInsertTabletLastCache(insertTabletNode);
+      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
+
+      if (!noFailure) {
+        throw new BatchProcessException(results);
       }
-      // loc pointing at first legal position
-      if (loc == insertTabletNode.getRowCount()) {
-        throw new OutOfTTLException(
-            insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 
1],
-            (CommonDateTimeUtils.currentTime() - deviceTTL));
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  private int checkTTL(InsertTabletNode insertTabletNode, TSStatus[] results,
+      IntToLongFunction rowTTLGetter)
+      throws OutOfTTLException {
+
+    /*
+     * assume that batch has been sorted by client
+     */
+    int loc = 0;
+    long ttl = 0;
+    while (loc < insertTabletNode.getRowCount()) {
+      ttl = rowTTLGetter.applyAsLong(loc);
+      long currTime = insertTabletNode.getTimes()[loc];
+      // skip points that do not satisfy TTL
+      if (!isAlive(currTime, ttl)) {
+        results[loc] =
+            RpcUtils.getStatus(
+                TSStatusCode.OUT_OF_TTL,
+                String.format(
+                    "Insertion time [%s] is less than ttl time bound [%s]",
+                    DateTimeUtils.convertLongToDate(currTime),
+                    DateTimeUtils.convertLongToDate(
+                        CommonDateTimeUtils.currentTime() - ttl)));
+        loc++;
+      } else {
+        break;
       }
+    }
+    // loc pointing at first legal position
+    if (loc == insertTabletNode.getRowCount()) {
+      throw new OutOfTTLException(
+          insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 1],
+          (CommonDateTimeUtils.currentTime() - ttl));
+    }
+    return loc;
+  }
+
+  /**
+   * Insert a tablet (rows belonging to the same devices) into this database.
+   *
+   * @throws BatchProcessException if some of the rows failed to be inserted
+   */
+  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
+  public void insertTableTablet(InsertTabletNode insertTabletNode)
+      throws BatchProcessException, WriteProcessException {
+    StorageEngine.blockInsertionIfReject(null);
+    long startTime = System.nanoTime();
+    writeLock("insertTablet");
+    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
+    try {
+      if (deleted) {
+        return;
+      }
+      TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
+      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+      boolean noFailure = true;
+      int loc = checkTTL(insertTabletNode, results,
+          i -> 
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getTableDeviceID(i)));
+
       // before is first start point
       int before = loc;
       // before time partition
@@ -988,14 +1152,15 @@ public class DataRegion implements IDataRegionForQuery {
                     0));
       }
 
-      long lastFlushTime =
-          config.isEnableSeparateData()
-              ? lastFlushTimeMap.getFlushedTime(beforeTimePartition, 
insertTabletNode.getDeviceID())
-              : Long.MAX_VALUE;
-
       // if is sequence
       boolean isSequence = false;
       while (loc < insertTabletNode.getRowCount()) {
+        long lastFlushTime =
+            config.isEnableSeparateData()
+                ? lastFlushTimeMap.getFlushedTime(beforeTimePartition,
+                insertTabletNode.getTableDeviceID(loc))
+                : Long.MAX_VALUE;
+
         long time = insertTabletNode.getTimes()[loc];
         // always in some time partition
         // judge if we should insert sequence
@@ -1003,7 +1168,7 @@ public class DataRegion implements IDataRegionForQuery {
           // insert into unsequence and then start sequence
           noFailure =
               insertTabletToTsFileProcessor(
-                      insertTabletNode, before, loc, false, results, 
beforeTimePartition)
+                  insertTabletNode, before, loc, false, results, 
beforeTimePartition)
                   && noFailure;
           before = loc;
           isSequence = true;
@@ -1015,7 +1180,7 @@ public class DataRegion implements IDataRegionForQuery {
       if (before < loc) {
         noFailure =
             insertTabletToTsFileProcessor(
-                    insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition)
+                insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition)
                 && noFailure;
       }
       startTime = System.nanoTime();
@@ -1030,6 +1195,17 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  /**
+   * Insert a tablet (rows belonging to the same devices) into this database.
+   *
+   * @throws BatchProcessException if some of the rows failed to be inserted
+   */
+  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
+  public void insertTablet(InsertTabletNode insertTabletNode)
+      throws BatchProcessException, WriteProcessException {
+
+  }
+
   /**
    * Check whether the time falls in TTL.
    *
@@ -1095,7 +1271,7 @@ public class DataRegion implements IDataRegionForQuery {
   private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
         || 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
-            && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
+        && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
       // disable updating last cache on follower
       return;
     }
@@ -1559,7 +1735,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** close all tsfile resource */
+  /**
+   * close all tsfile resource
+   */
   public void closeAllResources() {
     for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) {
       try {
@@ -1577,7 +1755,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** delete tsfile */
+  /**
+   * delete tsfile
+   */
   public void syncDeleteDataFiles() throws TsFileProcessorException {
     logger.info(
         "{} will close all files for deleting data files", databaseName + "-" 
+ dataRegionId);
@@ -1687,7 +1867,9 @@ public class DataRegion implements IDataRegionForQuery {
     WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId, 
count);
   }
 
-  /** This method will be blocked until all tsfile processors are closed. */
+  /**
+   * This method will be blocked until all tsfile processors are closed.
+   */
   public void syncCloseAllWorkingTsFileProcessors() {
     try {
       List<Future<?>> tsFileProcessorsClosingFutures = 
asyncCloseAllWorkingTsFileProcessors();
@@ -1726,7 +1908,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** close all working tsfile processors */
+  /**
+   * close all working tsfile processors
+   */
   List<Future<?>> asyncCloseAllWorkingTsFileProcessors() {
     writeLock("asyncCloseAllWorkingTsFileProcessors");
     List<Future<?>> futures = new ArrayList<>();
@@ -1748,7 +1932,9 @@ public class DataRegion implements IDataRegionForQuery {
     return futures;
   }
 
-  /** force close all working tsfile processors */
+  /**
+   * force close all working tsfile processors
+   */
   public void forceCloseAllWorkingTsFileProcessors() throws 
TsFileProcessorException {
     writeLock("forceCloseAllWorkingTsFileProcessors");
     try {
@@ -1769,7 +1955,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** used for queryengine */
+  /**
+   * used for queryengine
+   */
   @Override
   public QueryDataSource query(
       List<IFullPath> pathList,
@@ -1933,7 +2121,9 @@ public class DataRegion implements IDataRegionForQuery {
     return fileScanHandles;
   }
 
-  /** lock the read lock of the insert lock */
+  /**
+   * lock the read lock of the insert lock
+   */
   @Override
   public void readLock() {
     // apply read lock for SG insert lock to prevent inconsistent with 
concurrently writing memtable
@@ -1942,20 +2132,26 @@ public class DataRegion implements IDataRegionForQuery {
     tsFileManager.readLock();
   }
 
-  /** unlock the read lock of insert lock */
+  /**
+   * unlock the read lock of insert lock
+   */
   @Override
   public void readUnlock() {
     tsFileManager.readUnlock();
     insertLock.readLock().unlock();
   }
 
-  /** lock the write lock of the insert lock */
+  /**
+   * lock the write lock of the insert lock
+   */
   public void writeLock(String holder) {
     insertLock.writeLock().lock();
     insertWriteLockHolder = holder;
   }
 
-  /** unlock the write lock of the insert lock */
+  /**
+   * unlock the write lock of the insert lock
+   */
   public void writeUnlock() {
     insertWriteLockHolder = "";
     insertLock.writeLock().unlock();
@@ -2005,7 +2201,9 @@ public class DataRegion implements IDataRegionForQuery {
     return tsfileResourcesForQuery;
   }
 
-  /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
+  /**
+   * Seperate tsfiles in TsFileManager to sealedList and unsealedList.
+   */
   private void separateTsFile(
       List<TsFileResource> sealedResource,
       List<TsFileResource> unsealedResource,
@@ -2035,10 +2233,6 @@ public class DataRegion implements IDataRegionForQuery {
 
   /**
    * @param pattern Must be a pattern start with a precise device path
-   * @param startTime
-   * @param endTime
-   * @param searchIndex
-   * @throws IOException
    */
   public void deleteByDevice(PartialPath pattern, long startTime, long 
endTime, long searchIndex)
       throws IOException {
@@ -2439,7 +2633,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** Put the memtable back to the MemTablePool and make the metadata in 
writer visible */
+  /**
+   * Put the memtable back to the MemTablePool and make the metadata in writer 
visible
+   */
   // TODO please consider concurrency with read and insert method.
   private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor 
tsFileProcessor)
       throws TsFileProcessorException {
@@ -2535,7 +2731,9 @@ public class DataRegion implements IDataRegionForQuery {
     return trySubmitCount;
   }
 
-  /** Schedule settle compaction for ttl check. */
+  /**
+   * Schedule settle compaction for ttl check.
+   */
   public int executeTTLCheck() throws InterruptedException {
     while (!isCompactionSelecting.compareAndSet(false, true)) {
       // wait until success
@@ -2664,7 +2862,9 @@ public class DataRegion implements IDataRegionForQuery {
     return getNonSystemDatabaseName(databaseName);
   }
 
-  /** Merge file under this database processor */
+  /**
+   * Merge file under this database processor
+   */
   public int compact() {
     writeLock("merge");
     CompactionScheduler.exclusiveLockCompactionSelection();
@@ -2870,8 +3070,8 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /**
-   * Update latest time in latestTimeForEachDevice and
-   * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load 
external tsfile module.
+   * Update latest time in latestTimeForEachDevice and 
partitionLatestFlushedTimeForEachDevice.
+   * @UsedBy sync module, load external tsfile module.
    */
   protected void updateLastFlushTime(TsFileResource newTsFileResource) {
     for (IDeviceID device : newTsFileResource.getDevices()) {
@@ -3133,14 +3333,14 @@ public class DataRegion implements IDataRegionForQuery {
    * "tsFileResource" have the same plan indexes as the local one.
    *
    * @return true if any file contains plans with indexes no less than the max 
plan index of
-   *     "tsFileResource", otherwise false.
+   * "tsFileResource", otherwise false.
    */
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, long 
partitionNum) {
     // examine working processor first as they have the largest plan index
     return isFileAlreadyExistInWorking(
-            tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+        tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
         || isFileAlreadyExistInWorking(
-            tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+        tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
         || isFileAlreadyExistInClosed(tsFileResource, partitionNum, 
getSequenceFileList())
         || isFileAlreadyExistInClosed(tsFileResource, partitionNum, 
getUnSequenceFileList());
   }
@@ -3264,7 +3464,7 @@ public class DataRegion implements IDataRegionForQuery {
         boolean isSequence =
             config.isEnableSeparateData()
                 && insertRowNode.getTime()
-                    > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
+                > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
         TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, isSequence);
         if (tsFileProcessor == null) {
           continue;
@@ -3373,8 +3573,8 @@ public class DataRegion implements IDataRegionForQuery {
         areSequence[i] =
             config.isEnableSeparateData()
                 && insertRowNode.getTime()
-                    > lastFlushTimeMap.getFlushedTime(
-                        timePartitionIds[i], insertRowNode.getDeviceID());
+                > lastFlushTimeMap.getFlushedTime(
+                timePartitionIds[i], insertRowNode.getDeviceID());
       }
       insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
       if (!insertRowsNode.getResults().isEmpty()) {
@@ -3548,7 +3748,9 @@ public class DataRegion implements IDataRegionForQuery {
     return insertWriteLockHolder;
   }
 
-  /** This method could only be used in iot consensus */
+  /**
+   * This method could only be used in iot consensus
+   */
   public IWALNode getWALNode() {
     if 
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
       throw new UnsupportedOperationException();
@@ -3558,7 +3760,9 @@ public class DataRegion implements IDataRegionForQuery {
         .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
   }
 
-  /** Wait for this data region successfully deleted */
+  /**
+   * Wait for this data region successfully deleted
+   */
   public void waitForDeleted() {
     writeLock("waitForDeleted");
     try {
@@ -3574,7 +3778,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** Release all threads waiting for this data region successfully deleted */
+  /**
+   * Release all threads waiting for this data region successfully deleted
+   */
   public void markDeleted() {
     writeLock("markDeleted");
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 9b902d0cf44..6a179cbaccb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -43,6 +43,7 @@ import io.airlift.airline.ParseOptionMissingValueException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.utils.Binary;
 
 import java.time.ZoneId;
@@ -397,4 +398,20 @@ public class CommonUtils {
     System.err.println("-- StackTrace --");
     System.err.println(Throwables.getStackTraceAsString(e));
   }
+
+  public static String[] deviceIdToStringArray(IDeviceID deviceID) {
+    String[] ret = new String[deviceID.segmentNum()];
+    for (int i = 0; i < ret.length; i++) {
+      ret[i] = deviceID.segment(i).toString();
+    }
+    return ret;
+  }
+
+  public static Object[] deviceIdToObjArray(IDeviceID deviceID) {
+    Object[] ret = new Object[deviceID.segmentNum()];
+    for (int i = 0; i < ret.length; i++) {
+      ret[i] = deviceID.segment(i);
+    }
+    return ret;
+  }
 }


Reply via email to