This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 4e03a9cb651 temp save
4e03a9cb651 is described below
commit 4e03a9cb6518240f3ed2514aaf0e0ef2b3234a5e
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 18 19:33:59 2024 +0800
temp save
---
.../db/queryengine/common/MPPQueryContext.java | 10 +
.../iotdb/db/queryengine/plan/Coordinator.java | 2 +
.../analyze/cache/schema/DataNodeTTLCache.java | 4 +-
.../plan/planner/plan/node/WritePlanNode.java | 30 ++
.../planner/plan/node/write/DeleteDataNode.java | 2 +-
.../planner/plan/node/write/InsertRowNode.java | 2 +-
.../planner/plan/node/write/InsertRowsNode.java | 2 +-
.../planner/plan/node/write/InsertTabletNode.java | 5 +-
.../db/storageengine/dataregion/DataRegion.java | 392 ++++++++++++++++-----
.../org/apache/iotdb/db/utils/CommonUtils.java | 17 +
10 files changed, 367 insertions(+), 99 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index ec536eb3612..07437404b5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -94,6 +94,7 @@ public class MPPQueryContext {
// index 0 represents metadataExpressions, index 1 represents
expressionsCanPushDownToOperator,
// index 2 represents expressionsCannotPushDownToOperator
private List<List<Expression>> tableModelPredicateExpressions;
+ private boolean isTableQuery = false;
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
@@ -371,4 +372,13 @@ public class MPPQueryContext {
}
// endregion
+
+
+ public boolean isTableQuery() {
+ return isTableQuery;
+ }
+
+ public void setTableQuery(boolean tableQuery) {
+ isTableQuery = tableQuery;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 180edcea95d..50b1bc6e550 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -288,6 +288,7 @@ public class Coordinator {
Metadata metadata,
long timeOut,
long startTime) {
+ queryContext.setTableQuery(true);
queryContext.setTimeOut(timeOut);
queryContext.setStartTime(startTime);
RelationalModelPlanner relationalModelPlanner =
@@ -311,6 +312,7 @@ public class Coordinator {
Metadata metadata,
long timeOut,
long startTime) {
+ queryContext.setTableQuery(true);
queryContext.setTimeOut(timeOut);
queryContext.setStartTime(startTime);
if (statement instanceof DropDB
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
index 5dea6b9209d..0700aa5da51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.tsfile.file.metadata.IDeviceID;
import java.util.Map;
@@ -87,8 +88,7 @@ public class DataNodeTTLCache {
public long getTTL(IDeviceID deviceID) {
lock.readLock().lock();
try {
- // TODO Tien change this way
- return
ttlCache.getClosestTTL(deviceID.toString().split(PATH_SEPARATER_NO_REGEX));
+ return
ttlCache.getClosestTTL(CommonUtils.deviceIdToStringArray(deviceID));
} finally {
lock.readLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
index dd383b52c95..664a1dc5bf5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
@@ -19,12 +19,17 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import java.util.List;
public abstract class WritePlanNode extends PlanNode implements
IPartitionRelatedNode {
+ protected boolean writeToTable;
+
protected WritePlanNode(PlanNodeId id) {
super(id);
}
@@ -34,4 +39,29 @@ public abstract class WritePlanNode extends PlanNode
implements IPartitionRelate
public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public void serialize(ByteBuffer byteBuffer) {
+ super.serialize(byteBuffer);
+ byteBuffer.put(writeToTable ? (byte) 1 : (byte) 0);
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ super.serialize(stream);
+ stream.writeBoolean(writeToTable);
+ }
+
+ public boolean isWriteToTable() {
+ return writeToTable;
+ }
+
+ public void setWriteToTable(boolean writeToTable) {
+ this.writeToTable = writeToTable;
+ }
+
+ public int serializedSize() {
+ // write to table
+ return 1;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 7fbf5e03814..555132e8dcc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -143,7 +143,7 @@ public class DeleteDataNode extends WritePlanNode
implements WALEntryValue {
@Override
public int serializedSize() {
- int size = FIXED_SERIALIZED_SIZE;
+ int size = super.serializedSize() + FIXED_SERIALIZED_SIZE;
for (PartialPath path : pathList) {
size += ReadWriteIOUtils.sizeToWrite(path.getFullPath());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index bc3dcf8a774..1afb26ccbd6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -504,7 +504,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
/** Serialized size for wal. */
@Override
public int serializedSize() {
- return Short.BYTES + Long.BYTES + subSerializeSize();
+ return super.serializedSize() + Short.BYTES + Long.BYTES +
subSerializeSize();
}
protected int subSerializeSize() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 22a5483ec5b..08f67b8a4eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -284,7 +284,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
/** Serialized size for wal. */
@Override
public int serializedSize() {
- return Short.BYTES + Long.BYTES + subSerializeSize();
+ return super.serializedSize() + Short.BYTES + Long.BYTES +
subSerializeSize();
}
private int subSerializeSize() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index cef96317f8c..f0b0725af6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -715,6 +715,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
insertNode.subDeserialize(byteBuffer);
insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+ if (byteBuffer.hasRemaining()) {
+ insertNode.writeToTable = byteBuffer.get() == 1;
+ }
return insertNode;
}
@@ -764,7 +767,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
/** Serialized size for wal */
@Override
public int serializedSize() {
- return serializedSize(0, rowCount);
+ return super.serializedSize() + serializedSize(0, rowCount);
}
/** Serialized size for wal */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2d9a7bed20c..9bab11f9503 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion;
+import java.util.function.IntFunction;
+import java.util.function.IntToLongFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -208,13 +210,19 @@ public class DataRegion implements IDataRegionForQuery {
*/
private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
- /** Condition to safely delete data region. */
+ /**
+ * Condition to safely delete data region.
+ */
private final Condition deletedCondition =
insertLock.writeLock().newCondition();
- /** Data region has been deleted or not. */
+ /**
+ * Data region has been deleted or not.
+ */
private volatile boolean deleted = false;
- /** closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done. */
+ /**
+ * closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done.
+ */
private final Object closeStorageGroupCondition = new Object();
/**
@@ -222,32 +230,50 @@ public class DataRegion implements IDataRegionForQuery {
*/
private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
- /** time partition id in the database -> {@link TsFileProcessor} for this
time partition. */
+ /**
+ * time partition id in the database -> {@link TsFileProcessor} for this
time partition.
+ */
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors =
new TreeMap<>();
- /** time partition id in the database -> {@link TsFileProcessor} for this
time partition. */
+ /**
+ * time partition id in the database -> {@link TsFileProcessor} for this
time partition.
+ */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors
= new TreeMap<>();
- /** sequence {@link TsFileProcessor}s which are closing. */
+ /**
+ * sequence {@link TsFileProcessor}s which are closing.
+ */
private final Set<TsFileProcessor> closingSequenceTsFileProcessor =
ConcurrentHashMap.newKeySet();
- /** unsequence {@link TsFileProcessor}s which are closing. */
+ /**
+ * unsequence {@link TsFileProcessor}s which are closing.
+ */
private final Set<TsFileProcessor> closingUnSequenceTsFileProcessor =
ConcurrentHashMap.newKeySet();
- /** data region id. */
+ /**
+ * data region id.
+ */
private final String dataRegionId;
- /** database name. */
+ /**
+ * database name.
+ */
private final String databaseName;
- /** database system directory. */
+ /**
+ * database system directory.
+ */
private File storageGroupSysDir;
- /** manage seqFileList and unSeqFileList. */
+ /**
+ * manage seqFileList and unSeqFileList.
+ */
private final TsFileManager tsFileManager;
- /** manage tsFileResource degrade. */
+ /**
+ * manage tsFileResource degrade.
+ */
private final TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();
/**
@@ -258,10 +284,14 @@ public class DataRegion implements IDataRegionForQuery {
private final HashMap<Long, VersionController>
timePartitionIdVersionControllerMap =
new HashMap<>();
- /** file system factory (local or hdfs). */
+ /**
+ * file system factory (local or hdfs).
+ */
private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
- /** File flush policy. */
+ /**
+ * File flush policy.
+ */
private TsFileFlushPolicy fileFlushPolicy;
/**
@@ -272,16 +302,24 @@ public class DataRegion implements IDataRegionForQuery {
*/
private Map<Long, Long> partitionMaxFileVersions = new ConcurrentHashMap<>();
- /** database info for mem control. */
+ /**
+ * database info for mem control.
+ */
private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
- /** whether it's ready from recovery. */
+ /**
+ * whether it's ready from recovery.
+ */
private boolean isReady = false;
- /** close file listeners. */
+ /**
+ * close file listeners.
+ */
private List<CloseFileListener> customCloseFileListeners =
Collections.emptyList();
- /** flush listeners. */
+ /**
+ * flush listeners.
+ */
private List<FlushListener> customFlushListeners = Collections.emptyList();
private ILastFlushTimeMap lastFlushTimeMap;
@@ -391,18 +429,29 @@ public class DataRegion implements IDataRegionForQuery {
return ret;
}
- /** this class is used to store recovering context. */
+ /**
+ * this class is used to store recovering context.
+ */
private class DataRegionRecoveryContext {
- /** number of files to be recovered. */
+
+ /**
+ * number of files to be recovered.
+ */
private final long numOfFilesToRecover;
- /** number of already recovered files. */
+ /**
+ * number of already recovered files.
+ */
private long recoveredFilesNum;
- /** last recovery log time. */
+ /**
+ * last recovery log time.
+ */
private long lastLogTime;
- /** recover performers of unsealed TsFiles. */
+ /**
+ * recover performers of unsealed TsFiles.
+ */
private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new
ArrayList<>();
public DataRegionRecoveryContext(long numOfFilesToRecover) {
@@ -434,7 +483,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** recover from file */
+ /**
+ * recover from file
+ */
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
private void recover() throws DataRegionException {
try {
@@ -704,7 +755,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** check if the tsfile's time is smaller than system current time. */
+ /**
+ * check if the tsfile's time is smaller than system current time.
+ */
private void checkTsFileTime(File tsFile, long currentTime) throws
DataRegionException {
String[] items = tsFile.getName().replace(TSFILE_SUFFIX,
"").split(FILE_NAME_SEPARATOR);
long fileTime = Long.parseLong(items[0]);
@@ -717,7 +770,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** submit unsealed TsFile to WALRecoverManager. */
+ /**
+ * submit unsealed TsFile to WALRecoverManager.
+ */
private WALRecoverListener recoverUnsealedTsFile(
TsFileResource unsealedTsFile, DataRegionRecoveryContext context,
boolean isSeq) {
UnsealedTsFileRecoverPerformer recoverPerformer =
@@ -802,7 +857,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** recover sealed TsFile. */
+ /**
+ * recover sealed TsFile.
+ */
private void recoverSealedTsFiles(
TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean
isSeq) {
try (SealedTsFileRecoverPerformer recoverPerformer =
@@ -904,7 +961,7 @@ public class DataRegion implements IDataRegionForQuery {
boolean isSequence =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
// insert to sequence or unSequence file
TsFileProcessor tsFileProcessor =
@@ -919,13 +976,28 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ public void insertTreeTablet(InsertTabletNode insertTabletNode,
+ IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter)
+ throws BatchProcessException, WriteProcessException {
+ final IDeviceID deviceID = insertTabletNode.getDeviceID();
+
+ insertTablet(insertTabletNode, i -> deviceID, i ->
+ config.isEnableSeparateData()
+ ? lastFlushTimeMap.getFlushedTime(
+
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
+ insertTabletNode.getDeviceID())
+ : Long.MAX_VALUE
+ );
+ }
+
/**
* Insert a tablet (rows belonging to the same devices) into this database.
*
* @throws BatchProcessException if some of the rows failed to be inserted
*/
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
- public void insertTablet(InsertTabletNode insertTabletNode)
+ public void insertTablet(InsertTabletNode insertTabletNode,
+ IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction
rowLastFlushTimeGetter)
throws BatchProcessException, WriteProcessException {
StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
@@ -937,38 +1009,130 @@ public class DataRegion implements IDataRegionForQuery {
}
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
- boolean noFailure = true;
- long deviceTTL =
-
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getDevicePath().getNodes());
+ boolean noFailure;
+
+ int loc = checkTTL(insertTabletNode, results, i ->
DataNodeTTLCache.getInstance()
+ .getTTL(rowDeviceIdGetter.apply(i)));
+ noFailure = loc != 0;
+
+ // before is first start point
+ int before = loc;
+ long before
+ // before time partition
+ long beforeTimePartition =
+
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
+ // init map
+
+ if (config.isEnableSeparateData()
+ &&
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
+ TimePartitionManager.getInstance()
+ .registerTimePartitionInfo(
+ new TimePartitionInfo(
+ new DataRegionId(Integer.parseInt(dataRegionId)),
+ beforeTimePartition,
+ true,
+ Long.MAX_VALUE,
+ 0));
+ }
- /*
- * assume that batch has been sorted by client
- */
- int loc = 0;
+ // if is sequence
+ boolean isSequence = false;
while (loc < insertTabletNode.getRowCount()) {
- long currTime = insertTabletNode.getTimes()[loc];
- // skip points that do not satisfy TTL
- if (!isAlive(currTime, deviceTTL)) {
- results[loc] =
- RpcUtils.getStatus(
- TSStatusCode.OUT_OF_TTL,
- String.format(
- "Insertion time [%s] is less than ttl time bound [%s]",
- DateTimeUtils.convertLongToDate(currTime),
- DateTimeUtils.convertLongToDate(
- CommonDateTimeUtils.currentTime() - deviceTTL)));
- loc++;
- noFailure = false;
- } else {
- break;
+ long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
+ long time = insertTabletNode.getTimes()[loc];
+ final long timePartitionId =
TimePartitionUtils.getTimePartitionId(time);
+ // always in some time partition
+ // judge if we should insert sequence
+ if (!isSequence && time > lastFlushTime) {
+ // insert into unsequence and then start sequence
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletNode, before, loc, false, results,
+ timePartitionId)
+ && noFailure;
+ before = loc;
+ isSequence = true;
}
+ loc++;
+ }
+
+ // do not forget last part
+ if (before < loc) {
+ noFailure =
+ insertTabletToTsFileProcessor(
+ insertTabletNode, before, loc, isSequence, results, time)
+ && noFailure;
+ }
+ startTime = System.nanoTime();
+ tryToUpdateInsertTabletLastCache(insertTabletNode);
+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
- startTime);
+
+ if (!noFailure) {
+ throw new BatchProcessException(results);
}
- // loc pointing at first legal position
- if (loc == insertTabletNode.getRowCount()) {
- throw new OutOfTTLException(
- insertTabletNode.getTimes()[insertTabletNode.getTimes().length -
1],
- (CommonDateTimeUtils.currentTime() - deviceTTL));
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private int checkTTL(InsertTabletNode insertTabletNode, TSStatus[] results,
+ IntToLongFunction rowTTLGetter)
+ throws OutOfTTLException {
+
+ /*
+ * assume that batch has been sorted by client
+ */
+ int loc = 0;
+ long ttl = 0;
+ while (loc < insertTabletNode.getRowCount()) {
+ ttl = rowTTLGetter.applyAsLong(loc);
+ long currTime = insertTabletNode.getTimes()[loc];
+ // skip points that do not satisfy TTL
+ if (!isAlive(currTime, ttl)) {
+ results[loc] =
+ RpcUtils.getStatus(
+ TSStatusCode.OUT_OF_TTL,
+ String.format(
+ "Insertion time [%s] is less than ttl time bound [%s]",
+ DateTimeUtils.convertLongToDate(currTime),
+ DateTimeUtils.convertLongToDate(
+ CommonDateTimeUtils.currentTime() - ttl)));
+ loc++;
+ } else {
+ break;
}
+ }
+ // loc pointing at first legal position
+ if (loc == insertTabletNode.getRowCount()) {
+ throw new OutOfTTLException(
+ insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 1],
+ (CommonDateTimeUtils.currentTime() - ttl));
+ }
+ return loc;
+ }
+
+ /**
+ * Insert a tablet (rows belonging to the same devices) into this database.
+ *
+ * @throws BatchProcessException if some of the rows failed to be inserted
+ */
+ @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
+ public void insertTableTablet(InsertTabletNode insertTabletNode)
+ throws BatchProcessException, WriteProcessException {
+ StorageEngine.blockInsertionIfReject(null);
+ long startTime = System.nanoTime();
+ writeLock("insertTablet");
+ PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
+ try {
+ if (deleted) {
+ return;
+ }
+ TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
+ Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+ boolean noFailure = true;
+ int loc = checkTTL(insertTabletNode, results,
+ i ->
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getTableDeviceID(i)));
+
// before is first start point
int before = loc;
// before time partition
@@ -988,14 +1152,15 @@ public class DataRegion implements IDataRegionForQuery {
0));
}
- long lastFlushTime =
- config.isEnableSeparateData()
- ? lastFlushTimeMap.getFlushedTime(beforeTimePartition,
insertTabletNode.getDeviceID())
- : Long.MAX_VALUE;
-
// if is sequence
boolean isSequence = false;
while (loc < insertTabletNode.getRowCount()) {
+ long lastFlushTime =
+ config.isEnableSeparateData()
+ ? lastFlushTimeMap.getFlushedTime(beforeTimePartition,
+ insertTabletNode.getTableDeviceID(loc))
+ : Long.MAX_VALUE;
+
long time = insertTabletNode.getTimes()[loc];
// always in some time partition
// judge if we should insert sequence
@@ -1003,7 +1168,7 @@ public class DataRegion implements IDataRegionForQuery {
// insert into unsequence and then start sequence
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, false, results,
beforeTimePartition)
+ insertTabletNode, before, loc, false, results,
beforeTimePartition)
&& noFailure;
before = loc;
isSequence = true;
@@ -1015,7 +1180,7 @@ public class DataRegion implements IDataRegionForQuery {
if (before < loc) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode, before, loc, isSequence, results,
beforeTimePartition)
+ insertTabletNode, before, loc, isSequence, results,
beforeTimePartition)
&& noFailure;
}
startTime = System.nanoTime();
@@ -1030,6 +1195,17 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ /**
+ * Insert a tablet (rows belonging to the same devices) into this database.
+ *
+ * @throws BatchProcessException if some of the rows failed to be inserted
+ */
+ @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
+ public void insertTablet(InsertTabletNode insertTabletNode)
+ throws BatchProcessException, WriteProcessException {
+
+ }
+
/**
* Check whether the time falls in TTL.
*
@@ -1095,7 +1271,7 @@ public class DataRegion implements IDataRegionForQuery {
private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
||
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
- && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
+ && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
// disable updating last cache on follower
return;
}
@@ -1559,7 +1735,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** close all tsfile resource */
+ /**
+ * close all tsfile resource
+ */
public void closeAllResources() {
for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) {
try {
@@ -1577,7 +1755,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** delete tsfile */
+ /**
+ * delete tsfile
+ */
public void syncDeleteDataFiles() throws TsFileProcessorException {
logger.info(
"{} will close all files for deleting data files", databaseName + "-"
+ dataRegionId);
@@ -1687,7 +1867,9 @@ public class DataRegion implements IDataRegionForQuery {
WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId,
count);
}
- /** This method will be blocked until all tsfile processors are closed. */
+ /**
+ * This method will be blocked until all tsfile processors are closed.
+ */
public void syncCloseAllWorkingTsFileProcessors() {
try {
List<Future<?>> tsFileProcessorsClosingFutures =
asyncCloseAllWorkingTsFileProcessors();
@@ -1726,7 +1908,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** close all working tsfile processors */
+ /**
+ * close all working tsfile processors
+ */
List<Future<?>> asyncCloseAllWorkingTsFileProcessors() {
writeLock("asyncCloseAllWorkingTsFileProcessors");
List<Future<?>> futures = new ArrayList<>();
@@ -1748,7 +1932,9 @@ public class DataRegion implements IDataRegionForQuery {
return futures;
}
- /** force close all working tsfile processors */
+ /**
+ * force close all working tsfile processors
+ */
public void forceCloseAllWorkingTsFileProcessors() throws
TsFileProcessorException {
writeLock("forceCloseAllWorkingTsFileProcessors");
try {
@@ -1769,7 +1955,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** used for queryengine */
+ /**
+ * used for queryengine
+ */
@Override
public QueryDataSource query(
List<IFullPath> pathList,
@@ -1933,7 +2121,9 @@ public class DataRegion implements IDataRegionForQuery {
return fileScanHandles;
}
- /** lock the read lock of the insert lock */
+ /**
+ * lock the read lock of the insert lock
+ */
@Override
public void readLock() {
// apply read lock for SG insert lock to prevent inconsistent with
concurrently writing memtable
@@ -1942,20 +2132,26 @@ public class DataRegion implements IDataRegionForQuery {
tsFileManager.readLock();
}
- /** unlock the read lock of insert lock */
+ /**
+ * unlock the read lock of insert lock
+ */
@Override
public void readUnlock() {
tsFileManager.readUnlock();
insertLock.readLock().unlock();
}
- /** lock the write lock of the insert lock */
+ /**
+ * lock the write lock of the insert lock
+ */
public void writeLock(String holder) {
insertLock.writeLock().lock();
insertWriteLockHolder = holder;
}
- /** unlock the write lock of the insert lock */
+ /**
+ * unlock the write lock of the insert lock
+ */
public void writeUnlock() {
insertWriteLockHolder = "";
insertLock.writeLock().unlock();
@@ -2005,7 +2201,9 @@ public class DataRegion implements IDataRegionForQuery {
return tsfileResourcesForQuery;
}
- /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
+ /**
+ * Seperate tsfiles in TsFileManager to sealedList and unsealedList.
+ */
private void separateTsFile(
List<TsFileResource> sealedResource,
List<TsFileResource> unsealedResource,
@@ -2035,10 +2233,6 @@ public class DataRegion implements IDataRegionForQuery {
/**
* @param pattern Must be a pattern start with a precise device path
- * @param startTime
- * @param endTime
- * @param searchIndex
- * @throws IOException
*/
public void deleteByDevice(PartialPath pattern, long startTime, long
endTime, long searchIndex)
throws IOException {
@@ -2439,7 +2633,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** Put the memtable back to the MemTablePool and make the metadata in
writer visible */
+ /**
+ * Put the memtable back to the MemTablePool and make the metadata in writer
visible
+ */
// TODO please consider concurrency with read and insert method.
private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor
tsFileProcessor)
throws TsFileProcessorException {
@@ -2535,7 +2731,9 @@ public class DataRegion implements IDataRegionForQuery {
return trySubmitCount;
}
- /** Schedule settle compaction for ttl check. */
+ /**
+ * Schedule settle compaction for ttl check.
+ */
public int executeTTLCheck() throws InterruptedException {
while (!isCompactionSelecting.compareAndSet(false, true)) {
// wait until success
@@ -2664,7 +2862,9 @@ public class DataRegion implements IDataRegionForQuery {
return getNonSystemDatabaseName(databaseName);
}
- /** Merge file under this database processor */
+ /**
+ * Merge file under this database processor
+ */
public int compact() {
writeLock("merge");
CompactionScheduler.exclusiveLockCompactionSelection();
@@ -2870,8 +3070,8 @@ public class DataRegion implements IDataRegionForQuery {
}
/**
- * Update latest time in latestTimeForEachDevice and
- * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
+ * Update latest time in latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
+ * @UsedBy sync module, load external tsfile module.
*/
protected void updateLastFlushTime(TsFileResource newTsFileResource) {
for (IDeviceID device : newTsFileResource.getDevices()) {
@@ -3133,14 +3333,14 @@ public class DataRegion implements IDataRegionForQuery {
* "tsFileResource" have the same plan indexes as the local one.
*
* @return true if any file contains plans with indexes no less than the max
plan index of
- * "tsFileResource", otherwise false.
+ * "tsFileResource", otherwise false.
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
// examine working processor first as they have the largest plan index
return isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
|| isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getSequenceFileList())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getUnSequenceFileList());
}
@@ -3264,7 +3464,7 @@ public class DataRegion implements IDataRegionForQuery {
boolean isSequence =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, isSequence);
if (tsFileProcessor == null) {
continue;
@@ -3373,8 +3573,8 @@ public class DataRegion implements IDataRegionForQuery {
areSequence[i] =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(
- timePartitionIds[i], insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(
+ timePartitionIds[i], insertRowNode.getDeviceID());
}
insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
if (!insertRowsNode.getResults().isEmpty()) {
@@ -3548,7 +3748,9 @@ public class DataRegion implements IDataRegionForQuery {
return insertWriteLockHolder;
}
- /** This method could only be used in iot consensus */
+ /**
+ * This method could only be used in iot consensus
+ */
public IWALNode getWALNode() {
if
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
throw new UnsupportedOperationException();
@@ -3558,7 +3760,9 @@ public class DataRegion implements IDataRegionForQuery {
.applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
}
- /** Wait for this data region successfully deleted */
+ /**
+ * Wait for this data region successfully deleted
+ */
public void waitForDeleted() {
writeLock("waitForDeleted");
try {
@@ -3574,7 +3778,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** Release all threads waiting for this data region successfully deleted */
+ /**
+ * Release all threads waiting for this data region successfully deleted
+ */
public void markDeleted() {
writeLock("markDeleted");
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 9b902d0cf44..6a179cbaccb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -43,6 +43,7 @@ import io.airlift.airline.ParseOptionMissingValueException;
import org.apache.commons.lang3.StringUtils;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Binary;
import java.time.ZoneId;
@@ -397,4 +398,20 @@ public class CommonUtils {
System.err.println("-- StackTrace --");
System.err.println(Throwables.getStackTraceAsString(e));
}
+
+ public static String[] deviceIdToStringArray(IDeviceID deviceID) {
+ String[] ret = new String[deviceID.segmentNum()];
+ for (int i = 0; i < ret.length; i++) {
+ ret[i] = deviceID.segment(i).toString();
+ }
+ return ret;
+ }
+
+ public static Object[] deviceIdToObjArray(IDeviceID deviceID) {
+ Object[] ret = new Object[deviceID.segmentNum()];
+ for (int i = 0; i < ret.length; i++) {
+ ret[i] = deviceID.segment(i);
+ }
+ return ret;
+ }
}