This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1c2451af9b5 Table ttl & Fix ttl bugs (#13668)
1c2451af9b5 is described below
commit 1c2451af9b515635c9ff57935e7af51baa6692e2
Author: shuwenwei <[email protected]>
AuthorDate: Wed Oct 9 15:04:37 2024 +0800
Table ttl & Fix ttl bugs (#13668)
* modify insert
* fix bug
* add it
* add compaction ut
* fix TTLTest
* fix ut
* add it
* remove d2 in it
* modify it & add comments
* fix it
---
.../relational/it/db/it/IoTDBInsertTableIT.java | 79 +++++++++
.../iotdb/session/it/IoTDBSessionSimpleIT.java | 37 ++++
.../operator/schema/source/DeviceSchemaSource.java | 2 +-
.../source/ActiveDeviceRegionScanOperator.java | 2 +-
.../execution/operator/source/SeriesScanUtil.java | 16 +-
.../analyze/cache/schema/DataNodeTTLCache.java | 62 +++----
.../plan/planner/OperatorTreeGenerator.java | 8 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 2 +-
.../iotdb/db/storageengine/StorageEngine.java | 8 +-
.../db/storageengine/dataregion/DataRegion.java | 47 +++--
.../execute/performer/ICompactionPerformer.java | 2 -
.../performer/impl/FastCompactionPerformer.java | 49 ++++--
.../impl/ReadChunkCompactionPerformer.java | 16 +-
.../impl/ReadPointCompactionPerformer.java | 10 +-
.../RepairUnsortedFileCompactionPerformer.java | 3 +-
.../task/RepairUnsortedFileCompactionTask.java | 10 +-
.../execute/utils/MultiTsFileDeviceIterator.java | 42 +++--
.../ReadChunkAlignedSeriesCompactionExecutor.java | 8 +-
.../repair/RepairTimePartitionScanTask.java | 6 +-
.../schedule/CompactionScheduleContext.java | 25 +--
.../selector/impl/SettleSelectorImpl.java | 16 +-
.../impl/SizeTieredCompactionSelector.java | 3 +-
.../dataregion/memtable/TsFileProcessor.java | 13 +-
.../dataregion/read/QueryDataSource.java | 5 +
.../db/storageengine/dataregion/TTLQueryTest.java | 14 +-
.../iotdb/db/storageengine/dataregion/TTLTest.java | 46 ++---
.../compaction/AbstractCompactionTest.java | 3 +-
.../repair/RepairUnsortedFileCompactionTest.java | 32 ++--
.../settle/SettleCompactionRecoverTest.java | 2 +-
.../settle/SettleCompactionSelectorTest.java | 14 +-
.../settle/SettleCompactionTaskTest.java | 22 +--
.../tablemodel/CompactionWithAllNullRowsTest.java | 9 +-
.../TableModelCompactionWithTTLTest.java | 191 +++++++++++++++++++++
.../compaction/utils/CompactionCheckerUtils.java | 3 +-
.../utils/MultiTsFileDeviceIteratorTest.java | 38 ++--
.../apache/iotdb/commons/schema/table/TsTable.java | 15 ++
36 files changed, 601 insertions(+), 259 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
index b87dbe79ec3..6264fc26651 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBInsertTableIT.java
@@ -879,6 +879,85 @@ public class IoTDBInsertTableIT {
}
}
+ @Test
+ public void testInsertWithTTL() {
+ try (Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("use \"test\"");
+ statement.execute("create table sg22 (id1 string id, s1 int64
measurement)");
+ statement.execute("alter table sg22 set properties TTL=1");
+ statement.execute(
+ String.format(
+ "insert into sg22(id1,time,s1) values('d1',%s,2)",
+ System.currentTimeMillis() - 10000));
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("less than ttl time bound"));
+ }
+ }
+
+ @Test
+ public void testInsertTabletWithTTL()
+ throws IoTDBConnectionException, StatementExecutionException {
+ long ttl = 1;
+ try (ISession session =
EnvFactory.getEnv().getSessionConnection(BaseEnv.TABLE_SQL_DIALECT)) {
+ session.executeNonQueryStatement("use \"test\"");
+ session.executeNonQueryStatement("create table sg23 (id1 string id, s1
int64 measurement)");
+ session.executeNonQueryStatement("alter table sg23 set properties TTL="
+ ttl);
+
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("id1", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ final List<ColumnType> columnTypes = Arrays.asList(ColumnType.ID,
ColumnType.MEASUREMENT);
+
+ // all expired
+ long timestamp = 0;
+ Tablet tablet = new Tablet("sg23", schemaList, columnTypes, 15);
+
+ for (long row = 0; row < 3; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp + row);
+ tablet.addValue("id1", rowIndex, "id:" + row);
+ tablet.addValue("s1", rowIndex, row);
+ }
+ try {
+ session.insertRelationalTablet(tablet, true);
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("less than ttl time bound"));
+ }
+
+ // partial expired
+ tablet.reset();
+ timestamp = System.currentTimeMillis() - 10000;
+ for (long row = 0; row < 4; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue("id1", rowIndex, "id:" + row);
+ tablet.addValue("s1", rowIndex, row);
+ timestamp += 10000;
+ }
+
+ try {
+ session.insertRelationalTablet(tablet, true);
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("less than ttl time bound"));
+ }
+
+ // part of data is indeed inserted
+ long timeLowerBound = System.currentTimeMillis() - ttl;
+ SessionDataSet dataSet = session.executeQueryStatement("select time, s1
from sg23");
+ int count = 0;
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertTrue(record.getFields().get(0).getLongV() >
timeLowerBound);
+ count++;
+ }
+ Assert.assertTrue(count > 0 && count < 4);
+ }
+ }
+
private List<Integer> checkHeader(
ResultSetMetaData resultSetMetaData, String expectedHeaderStrings, int[]
expectedTypes)
throws SQLException {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
index c277435bb84..afc6646ef6f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
@@ -1132,6 +1132,43 @@ public class IoTDBSessionSimpleIT {
}
}
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void insertRecordsWithExpiredDataTest()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ List<Long> times = new ArrayList<>();
+ List<List<String>> measurements = new ArrayList<>();
+ List<List<TSDataType>> datatypes = new ArrayList<>();
+ List<List<Object>> values = new ArrayList<>();
+ List<String> devices = new ArrayList<>();
+
+ devices.add("root.sg.d1");
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 3L,
+ "s1",
+ "s2",
+ TSDataType.INT32,
+ TSDataType.INT32,
+ 1,
+ 2);
+ session.executeNonQueryStatement("set ttl to root.sg.d1 1");
+ try {
+ session.insertRecords(devices, times, measurements, datatypes, values);
+ fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("less than ttl time bound"));
+ }
+ session.executeNonQueryStatement("unset ttl to root.sg.d1");
+ SessionDataSet dataSet = session.executeQueryStatement("select * from
root.sg.d1");
+ Assert.assertFalse(dataSet.hasNext());
+ }
+ }
+
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertStringRecordsOfOneDeviceWithOrderTest() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
index 7abd3232b3d..c803388b054 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/source/DeviceSchemaSource.java
@@ -101,7 +101,7 @@ public class DeviceSchemaSource implements
ISchemaSource<IDeviceSchemaInfo> {
.getColumnBuilder(0)
.writeBinary(new Binary(device.getFullPath(),
TSFileConfig.STRING_CHARSET));
int templateId = device.getTemplateId();
- long ttl =
DataNodeTTLCache.getInstance().getTTLInMS(device.getPartialPath().getNodes());
+ long ttl =
DataNodeTTLCache.getInstance().getTTLInMSForTree(device.getPartialPath().getNodes());
// TODO: make it more readable, like "30 days" or "10 hours"
String ttlStr = ttl == Long.MAX_VALUE ? IoTDBConstant.TTL_INFINITE :
String.valueOf(ttl);
if (hasSgCol) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
index 2e06ec4b32a..777d2bf854d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
@@ -91,7 +91,7 @@ public class ActiveDeviceRegionScanOperator extends
AbstractRegionScanDataSource
DeviceContext deviceContext = deviceContextMap.get(deviceID);
int templateId = deviceContext.getTemplateId();
// TODO: use IDeviceID interface to get ttl
- long ttl = DataNodeTTLCache.getInstance().getTTL(deviceID);
+ long ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
// TODO: make it more readable, like "30 days" or "10 hours"
String ttlStr = ttl == Long.MAX_VALUE ? IoTDBConstant.TTL_INFINITE :
String.valueOf(ttl);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index bed98b8d61d..1389666f0b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -177,7 +177,21 @@ public class SeriesScanUtil implements Accountable {
this.dataSource = dataSource;
// updated filter concerning TTL
-
scanOptions.setTTL(DataNodeTTLCache.getInstance().getTTL(seriesPath.getDeviceId()));
+ long ttl;
+ // Only the data in the table model needs to retain rows where all value
+ // columns are null values, so we can use isIgnoreAllNullRows to
+ // differentiate the data of tree model and table model.
+ if (context.isIgnoreAllNullRows()) {
+ ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
+ } else {
+ String databaseName = dataSource.getDatabaseName();
+ ttl =
+ databaseName == null
+ ? Long.MAX_VALUE
+ : DataNodeTTLCache.getInstance()
+ .getTTLForTable(databaseName, deviceID.getTableName());
+ }
+ scanOptions.setTTL(ttl);
// init file index
orderUtils.setCurSeqFileIndex(dataSource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
index 0ab29eacea7..3a8cb0f5426 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeTTLCache.java
@@ -20,23 +20,24 @@ package
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.tsfile.file.metadata.IDeviceID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DataNodeTTLCache {
- private final TTLCache ttlCache;
+ private final TTLCache treeModelTTLCache;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private DataNodeTTLCache() {
- ttlCache = new TTLCache();
+ treeModelTTLCache = new TTLCache();
}
public static DataNodeTTLCache getInstance() {
@@ -48,61 +49,60 @@ public class DataNodeTTLCache {
}
@TestOnly
- public void setTTL(String path, long ttl) throws IllegalPathException {
+ public void setTTLForTree(String path, long ttl) throws IllegalPathException
{
lock.writeLock().lock();
try {
- ttlCache.setTTL(PathUtils.splitPathToDetachedNodes(path), ttl);
+ treeModelTTLCache.setTTL(PathUtils.splitPathToDetachedNodes(path), ttl);
} finally {
lock.writeLock().unlock();
}
}
- public void setTTL(String[] path, long ttl) {
+ public void setTTLForTree(String[] path, long ttl) {
lock.writeLock().lock();
try {
- ttlCache.setTTL(path, ttl);
+ treeModelTTLCache.setTTL(path, ttl);
} finally {
lock.writeLock().unlock();
}
}
- public void unsetTTL(String[] path) {
+ public void unsetTTLForTree(String[] path) {
lock.writeLock().lock();
try {
- ttlCache.unsetTTL(path);
+ treeModelTTLCache.unsetTTL(path);
} finally {
lock.writeLock().unlock();
}
}
- public long getTTL(IDeviceID deviceID) {
- lock.readLock().lock();
+ public long getTTLForTree(IDeviceID deviceID) {
try {
- return
ttlCache.getClosestTTL(CommonUtils.deviceIdToStringArray(deviceID));
- } finally {
- lock.readLock().unlock();
+ return
getTTLForTree(PathUtils.splitPathToDetachedNodes(deviceID.toString()));
+ } catch (IllegalPathException e) {
+ return Long.MAX_VALUE;
}
}
+ public long getTTLForTable(String database, String table) {
+ TsTable tsTable = DataNodeTableCache.getInstance().getTable(database,
table);
+ return tsTable == null ? Long.MAX_VALUE : tsTable.getTableTTL();
+ }
+
/** Get ttl with time precision conversion. */
- public long getTTL(String[] path) {
- lock.readLock().lock();
- try {
- long ttl = ttlCache.getClosestTTL(path);
- return ttl == Long.MAX_VALUE
- ? ttl
- : CommonDateTimeUtils.convertMilliTimeWithPrecision(
- ttl,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
- } finally {
- lock.readLock().unlock();
- }
+ public long getTTLForTree(String[] path) {
+ long ttl = getTTLInMSForTree(path);
+ return ttl == Long.MAX_VALUE
+ ? ttl
+ : CommonDateTimeUtils.convertMilliTimeWithPrecision(
+ ttl,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
}
/** Get ttl without time precision conversion. */
- public long getTTLInMS(String[] path) {
+ public long getTTLInMSForTree(String[] path) {
lock.readLock().lock();
try {
- return ttlCache.getClosestTTL(path);
+ return treeModelTTLCache.getClosestTTL(path);
} finally {
lock.readLock().unlock();
}
@@ -112,10 +112,10 @@ public class DataNodeTTLCache {
* Get ttl of one specific path node without time precision conversion. If
this node does not set
* ttl, then return -1.
*/
- public long getNodeTTLInMS(String path) throws IllegalPathException {
+ public long getNodeTTLInMSForTree(String path) throws IllegalPathException {
lock.readLock().lock();
try {
- return ttlCache.getLastNodeTTL(PathUtils.splitPathToDetachedNodes(path));
+ return
treeModelTTLCache.getLastNodeTTL(PathUtils.splitPathToDetachedNodes(path));
} finally {
lock.readLock().unlock();
}
@@ -123,10 +123,10 @@ public class DataNodeTTLCache {
/** Clear all ttl of cache. */
@TestOnly
- public void clearAllTTL() {
+ public void clearAllTTLForTree() {
lock.writeLock().lock();
try {
- ttlCache.clear();
+ treeModelTTLCache.clear();
} finally {
lock.writeLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index cf0036ffbad..3d6bb46952c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2812,7 +2812,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(
context.getGlobalTimeFilter(),
-
DataNodeTTLCache.getInstance().getTTL(seriesPath.getDevicePath().getNodes())),
+
DataNodeTTLCache.getInstance().getTTLForTree(seriesPath.getDevicePath().getNodes())),
timeValuePair)) { // cached last value is not satisfied
if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
@@ -3027,7 +3027,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
} else if (!LastQueryUtil.satisfyFilter(
updateFilterUsingTTL(
context.getGlobalTimeFilter(),
- DataNodeTTLCache.getInstance().getTTL(devicePath.getNodes())),
+
DataNodeTTLCache.getInstance().getTTLForTree(devicePath.getNodes())),
timeValuePair)) { // cached last value is not satisfied
if (!isFilterGtOrGe(context.getGlobalTimeFilter())) {
@@ -3723,7 +3723,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
PartialPath devicePath = entry.getKey();
IDeviceID deviceID = devicePath.getIDeviceID();
deviceIDToContext.put(deviceID, entry.getValue());
- ttlCache.put(deviceID, DataNodeTTLCache.getInstance().getTTL(deviceID));
+ ttlCache.put(deviceID,
DataNodeTTLCache.getInstance().getTTLForTree(deviceID));
}
ActiveDeviceRegionScanOperator regionScanOperator =
new ActiveDeviceRegionScanOperator(
@@ -3764,7 +3764,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
Map<String, TimeseriesContext> timeseriesSchemaInfoMap =
getTimeseriesSchemaInfoMap(entryMap, dataDriverContext);
timeseriesToSchemaInfo.put(deviceID, timeseriesSchemaInfoMap);
- ttlCache.put(deviceID, DataNodeTTLCache.getInstance().getTTL(deviceID));
+ ttlCache.put(deviceID,
DataNodeTTLCache.getInstance().getTTLForTree(deviceID));
}
ActiveTimeSeriesRegionScanOperator regionScanOperator =
new ActiveTimeSeriesRegionScanOperator(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 17e2bf8b5e8..d85ac18989e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -1140,7 +1140,7 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
for (int i = 0; i < mapSize; i++) {
try {
DataNodeTTLCache.getInstance()
- .setTTL(
+ .setTTLForTree(
PathUtils.splitPathToDetachedNodes(
Objects.requireNonNull(ReadWriteIOUtils.readString(buffer))),
ReadWriteIOUtils.readLong(buffer));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 9c1ef5c4f09..b75d8be26ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -862,20 +862,20 @@ public class StorageEngine implements IService {
long ttl = req.getTTL();
boolean isDataBase = req.isDataBase;
if (ttl == TTLCache.NULL_TTL) {
- DataNodeTTLCache.getInstance().unsetTTL(path);
+ DataNodeTTLCache.getInstance().unsetTTLForTree(path);
if (isDataBase) {
// unset ttl to path.**
String[] pathWithWildcard = Arrays.copyOf(path, path.length + 1);
pathWithWildcard[pathWithWildcard.length - 1] =
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
- DataNodeTTLCache.getInstance().unsetTTL(pathWithWildcard);
+ DataNodeTTLCache.getInstance().unsetTTLForTree(pathWithWildcard);
}
} else {
- DataNodeTTLCache.getInstance().setTTL(path, ttl);
+ DataNodeTTLCache.getInstance().setTTLForTree(path, ttl);
if (isDataBase) {
// set ttl to path.**
String[] pathWithWildcard = Arrays.copyOf(path, path.length + 1);
pathWithWildcard[pathWithWildcard.length - 1] =
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
- DataNodeTTLCache.getInstance().setTTL(pathWithWildcard, ttl);
+ DataNodeTTLCache.getInstance().setTTLForTree(pathWithWildcard, ttl);
}
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index c6b55484b8f..1a98874f5fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -313,8 +313,6 @@ public class DataRegion implements IDataRegionForQuery {
private final AtomicBoolean isCompactionSelecting = new AtomicBoolean(false);
- private boolean isTreeModel;
-
private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
QueryResourceMetricSet.getInstance();
@@ -335,7 +333,6 @@ public class DataRegion implements IDataRegionForQuery {
this.dataRegionId = dataRegionId;
this.databaseName = databaseName;
this.fileFlushPolicy = fileFlushPolicy;
- this.isTreeModel = this.databaseName.startsWith("root.");
acquireDirectBufferMemory();
dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir,
dataRegionId);
@@ -386,7 +383,6 @@ public class DataRegion implements IDataRegionForQuery {
this.dataRegionId = id;
this.tsFileManager = new TsFileManager(databaseName, id, "");
this.partitionMaxFileVersions = new HashMap<>();
- this.isTreeModel = this.databaseName.startsWith("root.");
partitionMaxFileVersions.put(0L, 0L);
}
@@ -1027,11 +1023,10 @@ public class DataRegion implements IDataRegionForQuery {
*/
public void insert(InsertRowNode insertRowNode) throws WriteProcessException
{
// reject insertions that are out of ttl
- long deviceTTL =
-
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getTargetPath().getNodes());
- if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
+ long ttl = getTTL(insertRowNode);
+ if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
throw new OutOfTTLException(
- insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() -
deviceTTL));
+ insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - ttl));
}
StorageEngine.blockInsertionIfReject();
long startTime = System.nanoTime();
@@ -1202,9 +1197,7 @@ public class DataRegion implements IDataRegionForQuery {
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure;
- int loc =
- insertTabletNode.checkTTL(
- results, i ->
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getDeviceID(i)));
+ int loc = insertTabletNode.checkTTL(results, i ->
getTTL(insertTabletNode));
noFailure = loc == 0;
noFailure = noFailure && splitAndInsert(insertTabletNode, loc, results);
@@ -2775,7 +2768,7 @@ public class DataRegion implements IDataRegionForQuery {
// Sort the time partition from largest to smallest
timePartitions.sort(Comparator.reverseOrder());
- CompactionScheduleContext context = new
CompactionScheduleContext(this.isTreeModel);
+ CompactionScheduleContext context = new CompactionScheduleContext();
// schedule insert compaction
trySubmitCount += executeInsertionCompaction(timePartitions, context);
@@ -2818,7 +2811,7 @@ public class DataRegion implements IDataRegionForQuery {
logger.info("[TTL] {}-{} Start ttl checking.", databaseName, dataRegionId);
int trySubmitCount = 0;
try {
- CompactionScheduleContext context = new
CompactionScheduleContext(this.isTreeModel);
+ CompactionScheduleContext context = new CompactionScheduleContext();
List<Long> timePartitions = new
ArrayList<>(tsFileManager.getTimePartitions());
// Sort the time partition from smallest to largest
Collections.sort(timePartitions);
@@ -3499,14 +3492,12 @@ public class DataRegion implements IDataRegionForQuery {
if (deleted) {
return;
}
- long deviceTTL =
- DataNodeTTLCache.getInstance()
- .getTTL(insertRowsOfOneDeviceNode.getTargetPath().getNodes());
+ long ttl = getTTL(insertRowsOfOneDeviceNode);
long[] costsForMetrics = new long[4];
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new
HashMap<>();
for (int i = 0; i <
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode =
insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
- if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
+ if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
// we do not need to write these part of data, as they can not be
queried
// or the sub-plan has already been executed, we are retrying other
sub-plans
insertRowsOfOneDeviceNode
@@ -3519,7 +3510,7 @@ public class DataRegion implements IDataRegionForQuery {
"Insertion time [%s] is less than ttl time bound
[%s]",
DateTimeUtils.convertLongToDate(insertRowNode.getTime()),
DateTimeUtils.convertLongToDate(
- CommonDateTimeUtils.currentTime() -
deviceTTL))));
+ CommonDateTimeUtils.currentTime() - ttl))));
continue;
}
// init map
@@ -3621,13 +3612,8 @@ public class DataRegion implements IDataRegionForQuery {
long[] timePartitionIds = new
long[insertRowsNode.getInsertRowNodeList().size()];
for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode =
insertRowsNode.getInsertRowNodeList().get(i);
- long deviceTTL =
- DataNodeTTLCache.getInstance()
- .getTTL(
- Arrays.stream(insertRowNode.getDeviceID().getSegments())
- .map(seg -> seg != null ? seg.toString() : null)
- .toArray(String[]::new));
- if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
+ long ttl = getTTL(insertRowNode);
+ if (!CommonUtils.isAlive(insertRowNode.getTime(), ttl)) {
insertRowsNode
.getResults()
.put(
@@ -3638,7 +3624,8 @@ public class DataRegion implements IDataRegionForQuery {
"Insertion time [%s] is less than ttl time bound
[%s]",
DateTimeUtils.convertLongToDate(insertRowNode.getTime()),
DateTimeUtils.convertLongToDate(
- CommonDateTimeUtils.currentTime() -
deviceTTL))));
+ CommonDateTimeUtils.currentTime() - ttl))));
+
insertRowNode.setFailedMeasurementNumber(insertRowNode.getMeasurements().length);
continue;
}
// init map
@@ -3947,4 +3934,12 @@ public class DataRegion implements IDataRegionForQuery {
public TsFileManager getTsFileManager() {
return tsFileManager;
}
+
+ private long getTTL(InsertNode insertNode) {
+ if (insertNode.getTableName() == null) {
+ return
DataNodeTTLCache.getInstance().getTTLForTree(insertNode.getTargetPath().getNodes());
+ } else {
+ return DataNodeTTLCache.getInstance().getTTLForTable(databaseName,
insertNode.getTableName());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICompactionPerformer.java
index 091a8ed1e5a..1dc24d12f70 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICompactionPerformer.java
@@ -40,8 +40,6 @@ public interface ICompactionPerformer {
void setSummary(CompactionTaskSummary summary);
- void setIgnoreAllNullRows(boolean ignoreAllNullRows);
-
default void setSourceFiles(List<TsFileResource> files) {
throw new IllegalSourceFileTypeException(
"Cannot set single type of source files to this kind of performer");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
index 9708b4d80e8..a9db8b33798 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -21,10 +21,10 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performe
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.IllegalCompactionTaskSummaryException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastCrossCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastInnerCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
@@ -91,7 +92,6 @@ public class FastCompactionPerformer
modificationCache = new ConcurrentHashMap<>();
private final boolean isCrossCompaction;
- private boolean ignoreAllNullRows = true;
public FastCompactionPerformer(
List<TsFileResource> seqFiles,
@@ -116,7 +116,7 @@ public class FastCompactionPerformer
public void perform() throws Exception {
this.subTaskSummary.setTemporalFileNum(targetFiles.size());
try (MultiTsFileDeviceIterator deviceIterator =
- new MultiTsFileDeviceIterator(seqFiles, unseqFiles,
readerCacheMap, ignoreAllNullRows);
+ new MultiTsFileDeviceIterator(seqFiles, unseqFiles,
readerCacheMap);
AbstractCompactionWriter compactionWriter =
isCrossCompaction
? new FastCrossCompactionWriter(targetFiles, seqFiles,
readerCacheMap)
@@ -130,32 +130,43 @@ public class FastCompactionPerformer
checkThreadInterrupted();
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
IDeviceID device = deviceInfo.left;
+ boolean isAligned = deviceInfo.right;
// sort the resources by the start time of current device from old to
new, and remove
// resource that does not contain the current device. Notice: when the
level of time index
// is file, there will be a false positive judgment problem, that is,
the device does not
// actually exist but the judgment return device being existed.
sortedSourceFiles.addAll(seqFiles);
sortedSourceFiles.addAll(unseqFiles);
+ boolean isTreeModel = !isAligned ||
device.getTableName().startsWith("root.");
+ long ttl = deviceIterator.getTTLForCurrentDevice();
sortedSourceFiles.removeIf(
- x ->
- x.definitelyNotContains(device)
- || !x.isDeviceAlive(
- device,
- DataNodeTTLCache.getInstance()
- // TODO: remove deviceId conversion
- .getTTL(device)));
+ x -> x.definitelyNotContains(device) || !x.isDeviceAlive(device,
ttl));
sortedSourceFiles.sort(Comparator.comparingLong(x ->
x.getStartTime(device)));
+ if (ttl != Long.MAX_VALUE) {
+ Deletion ttlDeletion =
+ new Deletion(
+ new MeasurementPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
+ Long.MAX_VALUE,
+ Long.MIN_VALUE,
+ deviceIterator.getTimeLowerBoundForCurrentDevice());
+ for (TsFileResource sourceFile : sortedSourceFiles) {
+ modificationCache
+ .computeIfAbsent(
+ sourceFile.getTsFile().getName(),
+ k -> PatternTreeMapFactory.getModsPatternTreeMap())
+ .append(ttlDeletion.getPath(), ttlDeletion);
+ }
+ }
if (sortedSourceFiles.isEmpty()) {
// device is out of dated in all source files
continue;
}
- boolean isAligned = deviceInfo.right;
compactionWriter.startChunkGroup(device, isAligned);
if (isAligned) {
- compactAlignedSeries(device, deviceIterator, compactionWriter);
+ compactAlignedSeries(device, deviceIterator, compactionWriter,
isTreeModel);
} else {
compactNonAlignedSeries(device, deviceIterator, compactionWriter);
}
@@ -181,7 +192,8 @@ public class FastCompactionPerformer
private void compactAlignedSeries(
IDeviceID deviceId,
MultiTsFileDeviceIterator deviceIterator,
- AbstractCompactionWriter fastCrossCompactionWriter)
+ AbstractCompactionWriter fastCrossCompactionWriter,
+ boolean ignoreAllNullRows)
throws PageException, IOException, WriteProcessException,
IllegalPathException {
// measurement -> tsfile resource -> timeseries metadata <startOffset,
endOffset>, including
// empty value chunk metadata
@@ -298,11 +310,6 @@ public class FastCompactionPerformer
this.subTaskSummary = (FastCompactionTaskSummary) summary;
}
- @Override
- public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
- this.ignoreAllNullRows = ignoreAllNullRows;
- }
-
@Override
public void setSourceFiles(List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles) {
this.seqFiles = seqFiles;
@@ -348,4 +355,10 @@ public class FastCompactionPerformer
modificationCache.put(resource.getTsFile().getName(), modifications);
}
}
+
+ public String getDatabaseName() {
+ return !seqFiles.isEmpty()
+ ? seqFiles.get(0).getDatabaseName()
+ : unseqFiles.get(0).getDatabaseName();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index d000d58b4c2..ca592939976 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -55,7 +55,6 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
private CompactionTsFileWriter currentWriter;
private long endedFileSize = 0;
private int currentTargetFileIndex = 0;
- private boolean ignoreAllNullRows = true;
// memory budget for file writer is 5% of per compaction task memory budget
private final long memoryBudgetForFileWriter =
(long)
@@ -87,8 +86,7 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
InterruptedException,
StorageEngineException,
PageException {
- try (MultiTsFileDeviceIterator deviceIterator =
- new MultiTsFileDeviceIterator(seqFiles, ignoreAllNullRows)) {
+ try (MultiTsFileDeviceIterator deviceIterator = new
MultiTsFileDeviceIterator(seqFiles)) {
schema =
CompactionTableSchemaCollector.collectSchema(seqFiles,
deviceIterator.getReaderMap());
while (deviceIterator.hasNextDevice()) {
@@ -174,11 +172,6 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
this.summary = summary;
}
- @Override
- public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
- this.ignoreAllNullRows = ignoreAllNullRows;
- }
-
private void compactAlignedSeries(
IDeviceID device,
TsFileResource targetResource,
@@ -194,7 +187,12 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
writer.startChunkGroup(device);
BatchedReadChunkAlignedSeriesCompactionExecutor compactionExecutor =
new BatchedReadChunkAlignedSeriesCompactionExecutor(
- device, targetResource, readerAndChunkMetadataList, writer,
summary, ignoreAllNullRows);
+ device,
+ targetResource,
+ readerAndChunkMetadataList,
+ writer,
+ summary,
+ device.getTableName().startsWith("root."));
compactionExecutor.execute();
for (ChunkMetadata chunkMetadata :
writer.getChunkMetadataListOfCurrentDeviceInMemory()) {
if (chunkMetadata.getMeasurementUid().isEmpty()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index f890cb78835..c5f07933605 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -79,7 +79,6 @@ public class ReadPointCompactionPerformer
private CompactionTaskSummary summary;
protected List<TsFileResource> targetFiles = Collections.emptyList();
- protected boolean ignoreAllNullRows = true;
public ReadPointCompactionPerformer(
List<TsFileResource> seqFiles,
@@ -113,7 +112,7 @@ public class ReadPointCompactionPerformer
getCompactionWriter(seqFiles, unseqFiles, targetFiles)) {
// Do not close device iterator, because tsfile reader is managed by
FileReaderManager.
MultiTsFileDeviceIterator deviceIterator =
- new MultiTsFileDeviceIterator(seqFiles, unseqFiles,
ignoreAllNullRows);
+ new MultiTsFileDeviceIterator(seqFiles, unseqFiles);
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(
seqFiles, unseqFiles, deviceIterator.getReaderMap());
@@ -153,11 +152,6 @@ public class ReadPointCompactionPerformer
this.summary = summary;
}
- @Override
- public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
- this.ignoreAllNullRows = ignoreAllNullRows;
- }
-
private void compactAlignedSeries(
IDeviceID device,
MultiTsFileDeviceIterator deviceIterator,
@@ -176,7 +170,7 @@ public class ReadPointCompactionPerformer
.map(IMeasurementSchema::getMeasurementId)
.collect(Collectors.toList());
- fragmentInstanceContext.setIgnoreAllNullRows(ignoreAllNullRows);
+
fragmentInstanceContext.setIgnoreAllNullRows(device.getTableName().startsWith("root."));
IDataBlockReader dataBlockReader =
constructReader(
device,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
index 880ce4ab357..832b2286aac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
@@ -35,9 +35,8 @@ import java.util.List;
/** Used for fixing files which contains internal unsorted data */
public class RepairUnsortedFileCompactionPerformer extends
ReadPointCompactionPerformer {
- public RepairUnsortedFileCompactionPerformer(boolean ignoreAllNullRows) {
+ public RepairUnsortedFileCompactionPerformer() {
super();
- this.ignoreAllNullRows = ignoreAllNullRows;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index fe6ae62fd21..52bee85c50d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -67,14 +67,13 @@ public class RepairUnsortedFileCompactionTask extends
InnerSpaceCompactionTask {
TsFileManager tsFileManager,
TsFileResource sourceFile,
boolean sequence,
- long serialId,
- boolean ignoreAllNullRows) {
+ long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
- new RepairUnsortedFileCompactionPerformer(ignoreAllNullRows),
+ new RepairUnsortedFileCompactionPerformer(),
serialId);
this.sourceFile = sourceFile;
if (this.sourceFile.getTsFileRepairStatus() !=
TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
@@ -89,14 +88,13 @@ public class RepairUnsortedFileCompactionTask extends
InnerSpaceCompactionTask {
TsFileResource sourceFile,
CountDownLatch latch,
boolean sequence,
- long serialId,
- boolean ignoreAllNullRows) {
+ long serialId) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
- new RepairUnsortedFileCompactionPerformer(ignoreAllNullRows),
+ new RepairUnsortedFileCompactionPerformer(),
serialId);
this.sourceFile = sourceFile;
if (this.sourceFile.getTsFileRepairStatus() !=
TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 6b84d5ca7a1..832be1086ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -71,16 +71,18 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap =
new HashMap<>();
private final Map<TsFileResource, List<Modification>> modificationCache =
new HashMap<>();
private Pair<IDeviceID, Boolean> currentDevice = null;
+ private boolean ignoreAllNullRows;
+ private long ttlForCurrentDevice;
private long timeLowerBoundForCurrentDevice;
- private boolean ignoreAllNullRows = true;
+ private final String databaseName;
/**
* Used for compaction with read chunk performer.
*
* @throws IOException if io error occurred
*/
- public MultiTsFileDeviceIterator(List<TsFileResource> tsFileResources,
boolean ignoreAllNullRows)
- throws IOException {
+ public MultiTsFileDeviceIterator(List<TsFileResource> tsFileResources)
throws IOException {
+ this.databaseName = tsFileResources.get(0).getDatabaseName();
this.tsFileResourcesSortedByDesc = new ArrayList<>(tsFileResources);
this.tsFileResourcesSortedByAsc = new ArrayList<>(tsFileResources);
// sort the files from the oldest to the newest
@@ -104,7 +106,6 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
}
throw e;
}
- this.ignoreAllNullRows = ignoreAllNullRows;
}
/**
@@ -113,12 +114,10 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
* @throws IOException if io errors occurred
*/
public MultiTsFileDeviceIterator(
- List<TsFileResource> seqResources,
- List<TsFileResource> unseqResources,
- boolean ignoreAllNullRows)
- throws IOException {
+ List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
throws IOException {
this.tsFileResourcesSortedByDesc = new ArrayList<>(seqResources);
tsFileResourcesSortedByDesc.addAll(unseqResources);
+ this.databaseName = tsFileResourcesSortedByDesc.get(0).getDatabaseName();
// sort the files from the newest to the oldest
Collections.sort(
this.tsFileResourcesSortedByDesc,
TsFileResource::compareFileCreationOrderByDesc);
@@ -128,7 +127,6 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
readerMap.put(tsFileResource, reader);
deviceIteratorMap.put(tsFileResource,
reader.getAllDevicesIteratorWithIsAligned());
}
- this.ignoreAllNullRows = ignoreAllNullRows;
}
/**
@@ -139,16 +137,15 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
public MultiTsFileDeviceIterator(
List<TsFileResource> seqResources,
List<TsFileResource> unseqResources,
- Map<TsFileResource, TsFileSequenceReader> readerMap,
- boolean ignoreAllNullRows)
+ Map<TsFileResource, TsFileSequenceReader> readerMap)
throws IOException {
this.tsFileResourcesSortedByDesc = new ArrayList<>(seqResources);
tsFileResourcesSortedByDesc.addAll(unseqResources);
+ this.databaseName = tsFileResourcesSortedByDesc.get(0).getDatabaseName();
// sort tsfiles from the newest to the oldest
Collections.sort(
this.tsFileResourcesSortedByDesc,
TsFileResource::compareFileCreationOrderByDesc);
this.readerMap = readerMap;
- this.ignoreAllNullRows = ignoreAllNullRows;
CompactionType type = null;
if (!seqResources.isEmpty() && !unseqResources.isEmpty()) {
@@ -217,12 +214,27 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
deviceIteratorMap.remove(resource);
}
- timeLowerBoundForCurrentDevice =
- CommonDateTimeUtils.currentTime()
- - DataNodeTTLCache.getInstance().getTTL(currentDevice.getLeft());
+ IDeviceID deviceID = currentDevice.left;
+ boolean isAligned = currentDevice.right;
+ ignoreAllNullRows = !isAligned ||
deviceID.getTableName().startsWith("root.");
+ if (!ignoreAllNullRows) {
+ ttlForCurrentDevice =
+ DataNodeTTLCache.getInstance().getTTLForTable(databaseName,
deviceID.getTableName());
+ } else {
+ ttlForCurrentDevice =
DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
+ }
+ timeLowerBoundForCurrentDevice = CommonDateTimeUtils.currentTime() -
ttlForCurrentDevice;
return currentDevice;
}
+ public long getTTLForCurrentDevice() {
+ return ttlForCurrentDevice;
+ }
+
+ public long getTimeLowerBoundForCurrentDevice() {
+ return timeLowerBoundForCurrentDevice;
+ }
+
/**
* Get all measurements and schemas of the current device from source files.
Traverse all the
* files from the newest to the oldest in turn and start traversing the
index tree from the
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
index 373dbe53619..787e67d4f5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/ReadChunkAlignedSeriesCompactionExecutor.java
@@ -366,10 +366,10 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
PageHeader timePageHeader = timePage.getHeader();
ByteBuffer uncompressedTimePageData = timePage.getUnCompressedData();
Decoder timeDecoder = Decoder.getDecoderByType(timePage.getEncoding(),
TSDataType.INT64);
- timePage.clear();
TimePageReader timePageReader =
new TimePageReader(timePageHeader, uncompressedTimePageData,
timeDecoder);
timePageReader.setDeleteIntervalList(timePage.getDeleteIntervalList());
+ timePage.clear();
List<ValuePageReader> valuePageReaders = new
ArrayList<>(valuePages.size());
int nonEmptyPageNum = 1;
@@ -452,6 +452,9 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
private boolean canFlushChunk(ChunkLoader timeChunk, List<ChunkLoader>
valueChunks)
throws IOException {
+ if (timeChunk.getModifiedStatus() == ModifiedStatus.PARTIAL_DELETED) {
+ return false;
+ }
boolean largeEnough =
timeChunk.getHeader().getDataSize() >= targetChunkSize
|| timeChunk.getChunkMetadata().getNumOfPoints() >=
targetChunkPointNum;
@@ -494,6 +497,9 @@ public class ReadChunkAlignedSeriesCompactionExecutor {
}
private boolean canFlushPage(PageLoader timePage, List<PageLoader>
valuePages) {
+ if (timePage.getModifiedStatus() == ModifiedStatus.PARTIAL_DELETED) {
+ return false;
+ }
long count = timePage.getHeader().getStatistics().getCount();
boolean largeEnough =
count >= targetPagePointNum
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
index c5f2120abe8..d044824a3f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
@@ -129,8 +129,7 @@ public class RepairTimePartitionScanTask implements
Callable<Void> {
sourceFile,
latch,
sourceFile.isSeq(),
- tsFileManager.getNextCompactionTaskId(),
- timePartition.getDatabaseName().startsWith("root."));
+ tsFileManager.getNextCompactionTaskId());
if (!submitRepairFileTaskSafely(task)) {
latch.countDown();
}
@@ -162,8 +161,7 @@ public class RepairTimePartitionScanTask implements
Callable<Void> {
overlapFile,
latch,
true,
- tsFileManager.getNextCompactionTaskId(),
- true);
+ tsFileManager.getNextCompactionTaskId());
LOGGER.info(
"[RepairScheduler] {} need to repair because it is overlapped with
other files",
overlapFile);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index ec08df4fdf5..7b9d7269184 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -45,8 +45,6 @@ public class CompactionScheduleContext {
private int fullyDirtyFileNum = 0;
private int partiallyDirtyFileNum = 0;
-
- private boolean isTreeModel = true;
// end region
private final Map<TsFileResource, Map<IDeviceID, DeviceInfo>>
partitionFileDeviceInfoCache;
@@ -55,11 +53,6 @@ public class CompactionScheduleContext {
this.partitionFileDeviceInfoCache = new HashMap<>();
}
- public CompactionScheduleContext(boolean isTreeModel) {
- this();
- this.isTreeModel = isTreeModel;
- }
-
public void addResourceDeviceTimeIndex(
TsFileResource tsFileResource, Map<IDeviceID, DeviceInfo> deviceInfoMap)
{
partitionFileDeviceInfoCache.put(tsFileResource, deviceInfoMap);
@@ -153,10 +146,10 @@ public class CompactionScheduleContext {
}
public ISeqCompactionPerformer getSeqCompactionPerformer() {
- ISeqCompactionPerformer seqCompactionPerformer =
-
IoTDBDescriptor.getInstance().getConfig().getInnerSeqCompactionPerformer().createInstance();
- seqCompactionPerformer.setIgnoreAllNullRows(isTreeModel);
- return seqCompactionPerformer;
+ return IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerSeqCompactionPerformer()
+ .createInstance();
}
public IUnseqCompactionPerformer getUnseqCompactionPerformer() {
@@ -165,18 +158,10 @@ public class CompactionScheduleContext {
.getConfig()
.getInnerUnseqCompactionPerformer()
.createInstance();
- unseqCompactionPerformer.setIgnoreAllNullRows(isTreeModel);
return unseqCompactionPerformer;
}
public ICrossCompactionPerformer getCrossCompactionPerformer() {
- ICrossCompactionPerformer crossCompactionPerformer =
-
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer().createInstance();
- crossCompactionPerformer.setIgnoreAllNullRows(isTreeModel);
- return crossCompactionPerformer;
- }
-
- public boolean isIgnoreAllNullRows() {
- return isTreeModel;
+ return
IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer().createInstance();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
index cf60c55108e..9ab7b1052c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
@@ -216,10 +216,17 @@ public class SettleSelectorImpl implements
ISettleSelector {
for (IDeviceID device : ((ArrayDeviceTimeIndex) timeIndex).getDevices()) {
// check expired device by ttl
// TODO: remove deviceId conversion
- long deviceTTL = DataNodeTTLCache.getInstance().getTTL(device);
- boolean hasSetTTL = deviceTTL != Long.MAX_VALUE;
+
+ long ttl;
+ String tableName = device.getTableName();
+ if (tableName.startsWith("root.")) {
+ ttl = DataNodeTTLCache.getInstance().getTTLForTree(device);
+ } else {
+ ttl = DataNodeTTLCache.getInstance().getTTLForTable(storageGroupName,
tableName);
+ }
+ boolean hasSetTTL = ttl != Long.MAX_VALUE;
boolean isDeleted =
- !timeIndex.isDeviceAlive(device, deviceTTL)
+ !timeIndex.isDeviceAlive(device, ttl)
|| isDeviceDeletedByMods(
modifications,
device,
@@ -234,8 +241,7 @@ public class SettleSelectorImpl implements ISettleSelector {
}
long outdatedTimeDiff = currentTime - timeIndex.getEndTime(device);
hasExpiredTooLong =
- hasExpiredTooLong
- || outdatedTimeDiff > Math.min(config.getMaxExpiredTime(), 3 *
deviceTTL);
+ hasExpiredTooLong || outdatedTimeDiff >
Math.min(config.getMaxExpiredTime(), 3 * ttl);
}
if (isDeleted) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java
index 9c5eeebdaf9..314ae8349e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -225,8 +225,7 @@ public class SizeTieredCompactionSelector
tsFileManager,
resource,
sequence,
- tsFileManager.getNextCompactionTaskId(),
- context.isIgnoreAllNullRows()));
+ tsFileManager.getNextCompactionTaskId()));
}
}
return taskList;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 7ae1186fe14..5adcb14bae9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -2212,10 +2212,15 @@ public class TsFileProcessor {
}
private long getQueryTimeLowerBound(IDeviceID deviceID) {
- long deviceTTL = DataNodeTTLCache.getInstance().getTTL(deviceID);
- return deviceTTL != Long.MAX_VALUE
- ? CommonDateTimeUtils.currentTime() - deviceTTL
- : Long.MIN_VALUE;
+ long ttl;
+ if (deviceID.getTableName().startsWith("root.")) {
+ ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
+ } else {
+ ttl =
+ DataNodeTTLCache.getInstance()
+ .getTTLForTable(this.storageGroupName, deviceID.getTableName());
+ }
+ return ttl != Long.MAX_VALUE ? CommonDateTimeUtils.currentTime() - ttl :
Long.MIN_VALUE;
}
public long getTimeRangeId() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index dcc74699987..dc56ad11a12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -224,4 +224,9 @@ public class QueryDataSource implements IQueryDataSource {
curUnSeqOrderTime = 0;
curUnSeqSatisfied = null;
}
+
+ public String getDatabaseName() {
+ List<TsFileResource> resources = !seqResources.isEmpty() ? seqResources :
unseqResources;
+ return resources.isEmpty() ? null : resources.get(0).getDatabaseName();
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLQueryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLQueryTest.java
index d2cea9fd1ca..10869cf1b87 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLQueryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLQueryTest.java
@@ -59,7 +59,7 @@ public class TTLQueryTest extends AbstractCompactionTest {
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
}
/** Device d1, d3 and d5 is deleted by TTL. */
@@ -108,11 +108,11 @@ public class TTLQueryTest extends AbstractCompactionTest {
// set ttl
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d1",
315360000000L);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d1", 315360000000L);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d3",
315360000000L);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d3", 315360000000L);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d5",
315360000000L);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d5", 315360000000L);
queryDataSource =
dataRegion.query(pathList, null, EnvironmentUtils.TEST_QUERY_CONTEXT,
null, null);
@@ -172,11 +172,11 @@ public class TTLQueryTest extends AbstractCompactionTest {
new MeasurementSchema("s2", getDataType(2))));
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d1", 1L);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d1", 1L);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d3", 1);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d3", 1);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d5", 1);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d5", 1);
queryDataSource =
dataRegion.query(pathList, null, EnvironmentUtils.TEST_QUERY_CONTEXT,
null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java
index 04c45845eef..9bc0d73311b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java
@@ -78,14 +78,14 @@ import static org.junit.Assert.fail;
public class TTLTest {
- private String sg1 = "root.TTL_SG1";
+ private String sg1Device = "root.TTL_SG1.d1";
private DataRegionId dataRegionId1 = new DataRegionId(1);
- private String sg2 = "root.TTL_SG2";
+ private String sg2Device = "root.TTL_SG2.d1";
private DataRegionId dataRegionId2 = new DataRegionId(1);
private long ttl = 12345;
private DataRegion dataRegion;
private String s1 = "s1";
- private String g1s1 = sg1 + IoTDBConstant.PATH_SEPARATOR + s1;
+ private String g1s1 = sg1Device + IoTDBConstant.PATH_SEPARATOR + s1;
private long prevPartitionInterval;
@Before
@@ -98,7 +98,7 @@ public class TTLTest {
IoTDBDescriptor.getInstance().getConfig().getSystemDir(),
String.valueOf(dataRegionId1.getId()),
new DirectFlushPolicy(),
- sg1);
+ sg1Device);
// createSchemas();
}
@@ -108,7 +108,7 @@ public class TTLTest {
dataRegion.abortCompaction();
EnvironmentUtils.cleanEnv();
CommonDescriptor.getInstance().getConfig().setTimePartitionInterval(prevPartitionInterval);
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
}
@Test
@@ -117,7 +117,7 @@ public class TTLTest {
InsertRowNode node =
new InsertRowNode(
new PlanNodeId("0"),
- new PartialPath(sg1),
+ new PartialPath(sg1Device),
false,
new String[] {"s1"},
new TSDataType[] {TSDataType.INT64},
@@ -129,7 +129,7 @@ public class TTLTest {
// ok without ttl
dataRegion.insert(node);
- DataNodeTTLCache.getInstance().setTTL(sg1, 1000);
+ DataNodeTTLCache.getInstance().setTTLForTree(sg1Device, 1000);
// with ttl
node.setTime(System.currentTimeMillis() - 1001);
boolean caught = false;
@@ -147,7 +147,7 @@ public class TTLTest {
InsertRowNode node =
new InsertRowNode(
new PlanNodeId("0"),
- new PartialPath(sg1),
+ new PartialPath(sg1Device),
false,
new String[] {"s1"},
new TSDataType[] {TSDataType.INT64},
@@ -185,7 +185,7 @@ public class TTLTest {
QueryDataSource dataSource =
dataRegion.query(
Collections.singletonList(mockMeasurementPath()),
- IDeviceID.Factory.DEFAULT_FACTORY.create(sg1),
+ IDeviceID.Factory.DEFAULT_FACTORY.create(sg1Device),
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
@@ -194,13 +194,13 @@ public class TTLTest {
assertEquals(4, seqResource.size());
assertEquals(4, unseqResource.size());
- DataNodeTTLCache.getInstance().setTTL(sg1, 500);
+ DataNodeTTLCache.getInstance().setTTLForTree(sg1Device, 500);
// files after ttl
dataSource =
dataRegion.query(
Collections.singletonList(mockMeasurementPath()),
- IDeviceID.Factory.DEFAULT_FACTORY.create(sg1),
+ IDeviceID.Factory.DEFAULT_FACTORY.create(sg1Device),
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
@@ -228,11 +228,11 @@ public class TTLTest {
// we cannot offer the exact number since when exactly ttl will be checked
is unknown
assertTrue(cnt <= 1000);
- DataNodeTTLCache.getInstance().setTTL(sg1, 1);
+ DataNodeTTLCache.getInstance().setTTLForTree(sg1Device, 1);
dataSource =
dataRegion.query(
Collections.singletonList(mockMeasurementPath()),
- IDeviceID.Factory.DEFAULT_FACTORY.create(sg1),
+ IDeviceID.Factory.DEFAULT_FACTORY.create(sg1Device),
EnvironmentUtils.TEST_QUERY_CONTEXT,
null,
null);
@@ -261,7 +261,7 @@ public class TTLTest {
private NonAlignedFullPath mockMeasurementPath() {
return new NonAlignedFullPath(
- IDeviceID.Factory.DEFAULT_FACTORY.create(sg1),
+ IDeviceID.Factory.DEFAULT_FACTORY.create(sg1Device),
new MeasurementSchema(
s1,
TSDataType.INT64,
@@ -284,8 +284,8 @@ public class TTLTest {
dataRegion.syncCloseAllWorkingTsFileProcessors();
// files before ttl
- File seqDir = new File(TierManager.getInstance().getNextFolderForTsFile(0,
true), sg1);
- File unseqDir = new
File(TierManager.getInstance().getNextFolderForTsFile(0, false), sg1);
+ File seqDir = new File(TierManager.getInstance().getNextFolderForTsFile(0,
true), sg1Device);
+ File unseqDir = new
File(TierManager.getInstance().getNextFolderForTsFile(0, false), sg1Device);
List<File> seqFiles = new ArrayList<>();
for (File directory : seqDir.listFiles()) {
@@ -325,7 +325,7 @@ public class TTLTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
- DataNodeTTLCache.getInstance().setTTL(sg1, 500);
+ DataNodeTTLCache.getInstance().setTTLForTree(sg1Device, 500);
for (long timePartition : dataRegion.getTimePartitions()) {
CompactionScheduler.tryToSubmitSettleCompactionTask(
dataRegion.getTsFileManager(), timePartition, new
CompactionScheduleContext(), true);
@@ -384,14 +384,14 @@ public class TTLTest {
SetTTLStatement statement1 =
(SetTTLStatement)
StatementGenerator.createStatement(
- "SET TTL TO " + sg1 + " 10000", ZoneId.systemDefault());
- assertEquals(sg1, statement1.getPath().getFullPath());
+ "SET TTL TO " + sg1Device + " 10000", ZoneId.systemDefault());
+ assertEquals(sg1Device, statement1.getPath().getFullPath());
assertEquals(10000, statement1.getTTL());
UnSetTTLStatement statement2 =
(UnSetTTLStatement)
- StatementGenerator.createStatement("UNSET TTL TO " + sg2,
ZoneId.systemDefault());
- assertEquals(sg2, statement2.getPath().getFullPath());
+ StatementGenerator.createStatement("UNSET TTL TO " + sg2Device,
ZoneId.systemDefault());
+ assertEquals(sg2Device, statement2.getPath().getFullPath());
assertEquals(TTLCache.NULL_TTL, statement2.getTTL());
}
@@ -416,7 +416,7 @@ public class TTLTest {
assertEquals(4, dataRegion.getSequenceFileList().size());
assertEquals(4, dataRegion.getUnSequenceFileList().size());
- DataNodeTTLCache.getInstance().setTTL(sg1, 1);
+ DataNodeTTLCache.getInstance().setTTLForTree(sg1Device, 1);
for (long timePartition : dataRegion.getTimePartitions()) {
CompactionScheduler.tryToSubmitSettleCompactionTask(
dataRegion.getTsFileManager(), timePartition, new
CompactionScheduleContext(), true);
@@ -427,7 +427,7 @@ public class TTLTest {
!= 0) {
sleep(200);
totalWaitingTime += 200;
- if (totalWaitingTime >= 5000) {
+ if (totalWaitingTime >= 50000) {
fail();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
index 48f6e77a335..f35c766ea58 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
@@ -842,8 +842,7 @@ public class AbstractCompactionTest {
protected List<IFullPath> getPaths(List<TsFileResource> resources)
throws IOException, IllegalPathException {
Set<IFullPath> paths = new HashSet<>();
- try (MultiTsFileDeviceIterator deviceIterator =
- new MultiTsFileDeviceIterator(resources, false)) {
+ try (MultiTsFileDeviceIterator deviceIterator = new
MultiTsFileDeviceIterator(resources)) {
while (deviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> iDeviceIDBooleanPair =
deviceIterator.nextDevice();
IDeviceID deviceID = iDeviceIDBooleanPair.getLeft();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
index 18c3f9bd5c1..82c8036c539 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
@@ -114,7 +114,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource));
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0);
task.start();
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -139,7 +139,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource));
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0);
task.start();
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -168,7 +168,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource));
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0);
task.start();
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -201,7 +201,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource));
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0);
task.start();
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -230,7 +230,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource));
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0);
task.start();
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -259,7 +259,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertFalse(TsFileResourceUtils.validateTsFileDataCorrectness(resource));
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
resource.isSeq(), 0);
task.start();
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -529,7 +529,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
seqResource2.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2,
true, 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2,
true, 0);
Assert.assertTrue(task.start());
Assert.assertEquals(1, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -580,7 +580,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
seqResource2.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2,
true, 0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, seqResource2,
true, 0);
Assert.assertTrue(task.start());
Assert.assertEquals(1, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
@@ -640,15 +640,15 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
}
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
Assert.assertTrue(task.getEstimatedMemoryCost() > 0);
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
- task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
false, 0, true);
+ task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
false, 0);
Assert.assertTrue(task.getEstimatedMemoryCost() > 0);
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
- task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
false, 0, true);
+ task = new RepairUnsortedFileCompactionTask(0, tsFileManager, resource,
false, 0);
Assert.assertEquals(0, task.getEstimatedMemoryCost());
}
@@ -674,7 +674,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
}
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
Assert.assertTrue(task.start());
TsFileResource target = tsFileManager.getTsFileList(false).get(0);
try (TsFileSequenceReader reader = new
TsFileSequenceReader(target.getTsFilePath())) {
@@ -723,7 +723,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
}
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
Assert.assertTrue(task.start());
Assert.assertEquals(
resource.getTsFileRepairStatus(),
TsFileRepairStatus.NEED_TO_REPAIR_BY_MOVE);
@@ -745,7 +745,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
}
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
Assert.assertTrue(task.start());
Assert.assertEquals(
resource.getTsFileRepairStatus(),
TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
@@ -767,7 +767,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
}
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
Assert.assertTrue(task.start());
Assert.assertEquals(resource.getTsFileRepairStatus(),
TsFileRepairStatus.NORMAL);
}
@@ -788,7 +788,7 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
}
resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
RepairUnsortedFileCompactionTask task =
- new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0, true);
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
Assert.assertTrue(task.start());
TsFileResource target = tsFileManager.getTsFileList(false).get(0);
try (TsFileSequenceReader reader = new
TsFileSequenceReader(target.getTsFilePath())) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionRecoverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionRecoverTest.java
index a8b92828bc9..7eddcc09c3d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionRecoverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionRecoverTest.java
@@ -68,7 +68,7 @@ public class SettleCompactionRecoverTest extends
AbstractCompactionTest {
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
}
// region Handle exception
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
index 58d8d5ed7f1..799002716c2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
@@ -59,7 +59,7 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
TsFileGeneratorUtils.useMultiType = originUseMultiType;
}
@@ -273,7 +273,7 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
tsFileManager.addAll(unseqResources, false);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d1", 100);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d1", 100);
generateModsFile(1, 10, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(1, 10, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
@@ -342,7 +342,7 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
tsFileManager.addAll(unseqResources, false);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d1", 100);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d1", 100);
generateModsFile(1, 10, seqResources, Long.MIN_VALUE, Long.MAX_VALUE);
generateModsFile(1, 10, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE);
@@ -402,7 +402,7 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
// add one device ttl
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d0", 100);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d0", 100);
// select first time, none partial_deleted and all_deleted files
// add device mods with already outdated device
@@ -821,7 +821,7 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
tsFileManager.addAll(unseqResources, false);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d10001",
100);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d10001", 100);
generateModsFile(1, 10, seqResources, Long.MIN_VALUE, Long.MAX_VALUE,
true);
generateModsFile(1, 10, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE,
true);
@@ -890,7 +890,7 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
tsFileManager.addAll(unseqResources, false);
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d10001",
100);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d10001", 100);
generateModsFile(1, 10, seqResources, Long.MIN_VALUE, Long.MAX_VALUE,
true);
generateModsFile(1, 10, unseqResources, Long.MIN_VALUE, Long.MAX_VALUE,
true);
@@ -950,7 +950,7 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
// add one device ttl
DataNodeTTLCache.getInstance()
- .setTTL(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR + "d10000",
100);
+ .setTTLForTree(COMPACTION_TEST_SG + IoTDBConstant.PATH_SEPARATOR +
"d10000", 100);
// select first time, none partial_deleted and all_deleted files
// add device mods with already outdated device
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
index 9d040b00bed..fe2b7954af1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionTaskTest.java
@@ -137,7 +137,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
@After
public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
TsFileGeneratorUtils.useMultiType = originUseMultiType;
IoTDBDescriptor.getInstance().getConfig().setInnerSeqCompactionPerformer(originSeqPerformer);
IoTDBDescriptor.getInstance()
@@ -153,7 +153,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
createFiles(5, 2, 3, 50, 0, 10000, 50, 50, isAligned, false);
// set ttl
- DataNodeTTLCache.getInstance().setTTL("root.**", 1);
+ DataNodeTTLCache.getInstance().setTTLForTree("root.**", 1);
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
@@ -188,7 +188,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(0, tsFileManager.getTsFileList(false).size());
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
}
@@ -227,7 +227,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(1, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
}
@@ -259,7 +259,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(6, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
partialDeletedFiles.clear();
@@ -278,7 +278,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(TsFileResourceStatus.DELETED,
tsFileResource.getStatus());
}
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
}
@@ -314,7 +314,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(0, tsFileManager.getTsFileList(false).size());
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
}
@@ -356,7 +356,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(0, tsFileManager.getTsFileList(false).size());
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
}
@@ -403,7 +403,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(1, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
}
@@ -447,7 +447,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
Assert.assertEquals(1, tsFileManager.getTsFileList(true).size());
Assert.assertEquals(1, tsFileManager.getTsFileList(false).size());
- DataNodeTTLCache.getInstance().clearAllTTL();
+ DataNodeTTLCache.getInstance().clearAllTTLForTree();
validateTargetDatas(sourceDatas, Collections.emptyList());
}
@@ -511,7 +511,7 @@ public class SettleCompactionTaskTest extends
AbstractCompactionTest {
protected void generateTTL(int deviceNum, long ttl) throws
IllegalPathException {
for (int dIndex = 0; dIndex < deviceNum; dIndex++) {
DataNodeTTLCache.getInstance()
- .setTTL(
+ .setTTLForTree(
COMPACTION_TEST_SG
+ IoTDBConstant.PATH_SEPARATOR
+ "d"
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java
index 651caadb150..529afb8964a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java
@@ -102,16 +102,13 @@ public class CompactionWithAllNullRowsTest extends
AbstractCompactionTest {
}
public ICompactionPerformer getPerformer() {
- ICompactionPerformer performer;
if
(performerType.equalsIgnoreCase(InnerSeqCompactionPerformer.READ_CHUNK.toString()))
{
- performer = new ReadChunkCompactionPerformer();
+ return new ReadChunkCompactionPerformer();
} else if
(performerType.equalsIgnoreCase(InnerUnseqCompactionPerformer.FAST.toString()))
{
- performer = new FastCompactionPerformer(false);
+ return new FastCompactionPerformer(false);
} else {
- performer = new ReadPointCompactionPerformer();
+ return new ReadPointCompactionPerformer();
}
- performer.setIgnoreAllNullRows(false);
- return performer;
}
@Test
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java
new file mode 100644
index 00000000000..af7ea63cce3
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelCompactionWithTTLTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.tablemodel;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.IdColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(Parameterized.class)
+public class TableModelCompactionWithTTLTest extends AbstractCompactionTest {
+
+ private final String performerType;
+ private String threadName;
+
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ this.threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+ DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ Thread.currentThread().setName(threadName);
+ DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+ }
+
+ public TableModelCompactionWithTTLTest(String performerType) {
+ this.performerType = performerType;
+ }
+
+ @Parameterized.Parameters(name = "type={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] {
+ {"read_chunk"}, {"fast"}, {"read_point"},
+ });
+ }
+
+ public ICompactionPerformer getPerformer() {
+ if
(performerType.equalsIgnoreCase(InnerSeqCompactionPerformer.READ_CHUNK.toString()))
{
+ return new ReadChunkCompactionPerformer();
+ } else if
(performerType.equalsIgnoreCase(InnerUnseqCompactionPerformer.FAST.toString()))
{
+ return new FastCompactionPerformer(false);
+ } else {
+ return new ReadPointCompactionPerformer();
+ }
+ }
+
+ @Test
+ public void testAllDataExpired() throws IOException {
+ createTable("t1", 1);
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource1)) {
+ writer.registerTableSchema("t1", Arrays.asList("id_column"));
+ writer.startChunkGroup("t1", Arrays.asList("id_field1"));
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1"),
+ new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ seqResources.add(resource1);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(0, tsFileManager, seqResources, true,
getPerformer(), 0);
+ Assert.assertTrue(task.start());
+ Assert.assertEquals(0, tsFileManager.getTsFileList(true).size());
+ }
+
+ @Test
+ public void testPartialDataExpired() throws IOException {
+ createTable("t1", 1);
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ long startTime = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(200);
+ long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(200);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource1)) {
+ writer.registerTableSchema("t1", Arrays.asList("id_column"));
+ writer.startChunkGroup("t1", Arrays.asList("id_field1"));
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1"),
+ new TimeRange[][][] {
+ new TimeRange[][] {new TimeRange[] {new TimeRange(startTime,
endTime)}}
+ },
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ seqResources.add(resource1);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(0, tsFileManager, seqResources, true,
getPerformer(), 0);
+ Assert.assertTrue(task.start());
+ TsFileResource target = tsFileManager.getTsFileList(true).get(0);
+ Assert.assertTrue(target.getFileStartTime() > startTime &&
target.getFileEndTime() == endTime);
+ }
+
+ @Test
+ public void testTableNotExist() throws IOException {
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ long startTime = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(200);
+ long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(200);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource1)) {
+ writer.registerTableSchema("t1", Arrays.asList("id_column"));
+ writer.startChunkGroup("t1", Arrays.asList("id_field1"));
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1"),
+ new TimeRange[][][] {
+ new TimeRange[][] {new TimeRange[] {new TimeRange(startTime,
endTime)}}
+ },
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ seqResources.add(resource1);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(0, tsFileManager, seqResources, true,
getPerformer(), 0);
+ Assert.assertTrue(task.start());
+ TsFileResource target = tsFileManager.getTsFileList(true).get(0);
+ Assert.assertTrue(target.getFileStartTime() == startTime &&
target.getFileEndTime() == endTime);
+ }
+
+ public void createTable(String tableName, long ttl) {
+ TsTable tsTable = new TsTable(tableName);
+ tsTable.addColumnSchema(new IdColumnSchema("id_column",
TSDataType.STRING));
+ tsTable.addColumnSchema(
+ new MeasurementColumnSchema(
+ "s1", TSDataType.STRING, TSEncoding.PLAIN, CompressionType.LZ4));
+ tsTable.addProp(TsTable.TTL_PROPERTY.toLowerCase(Locale.ENGLISH), ttl +
"");
+ DataNodeTableCache.getInstance().preUpdateTable(this.COMPACTION_TEST_SG,
tsTable);
+
DataNodeTableCache.getInstance().commitUpdateTable(this.COMPACTION_TEST_SG,
tableName);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
index 13ee5d97246..7c9b4ccd6e1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionCheckerUtils.java
@@ -526,8 +526,7 @@ public class CompactionCheckerUtils {
public static List<IFullPath> getAllPathsOfResources(List<TsFileResource>
resources)
throws IOException, IllegalPathException {
Set<IFullPath> paths = new HashSet<>();
- try (MultiTsFileDeviceIterator deviceIterator =
- new MultiTsFileDeviceIterator(resources, false)) {
+ try (MultiTsFileDeviceIterator deviceIterator = new
MultiTsFileDeviceIterator(resources)) {
while (deviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> iDeviceIDBooleanPair =
deviceIterator.nextDevice();
IDeviceID deviceID = iDeviceIDBooleanPair.getLeft();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
index b9d653efc84..d26b03964b7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java
@@ -126,7 +126,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
List<String> measurementSet = new ArrayList<>(4000);
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(seqResources, false)) {
+ new MultiTsFileDeviceIterator(seqResources)) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceIsAlignedPair =
multiTsFileDeviceIterator.nextDevice();
IDeviceID device = deviceIsAlignedPair.getLeft();
@@ -172,7 +172,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(seqResources, unseqResources, false)) {
+ new MultiTsFileDeviceIterator(seqResources, unseqResources)) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -203,7 +203,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(seqResources, unseqResources, false)) {
+ new MultiTsFileDeviceIterator(seqResources, unseqResources)) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -241,7 +241,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(seqResources, unseqResources, false)) {
+ new MultiTsFileDeviceIterator(seqResources, unseqResources)) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -280,7 +280,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(seqResources, unseqResources, false)) {
+ new MultiTsFileDeviceIterator(seqResources, unseqResources)) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -332,7 +332,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(seqResources, unseqResources, false)) {
+ new MultiTsFileDeviceIterator(seqResources, unseqResources)) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -384,7 +384,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(seqResources, false)) {
+ new MultiTsFileDeviceIterator(seqResources)) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -447,7 +447,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -526,7 +526,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -601,7 +601,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -681,7 +681,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -751,7 +751,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -830,7 +830,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -901,7 +901,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -990,7 +990,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -1061,7 +1061,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -1140,7 +1140,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -1211,7 +1211,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
int deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
@@ -1300,7 +1300,7 @@ public class MultiTsFileDeviceIteratorTest extends
AbstractCompactionTest {
deviceNum = 0;
try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
- new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true),
false)) {
+ new MultiTsFileDeviceIterator(tsFileManager.getTsFileList(true))) {
while (multiTsFileDeviceIterator.hasNextDevice()) {
Pair<IDeviceID, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index 84e7c05ed40..fa16c44d938 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.commons.schema.table;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
@@ -144,6 +146,19 @@ public class TsTable {
}
}
+ public long getTableTTL() {
+ long ttl = getTableTTLInMS();
+ return ttl == Long.MAX_VALUE
+ ? ttl
+ : CommonDateTimeUtils.convertMilliTimeWithPrecision(
+ ttl,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ }
+
+ public long getTableTTLInMS() {
+ return Long.parseLong(
+
getPropValue(TTL_PROPERTY.toLowerCase(Locale.ENGLISH)).orElse(Long.MAX_VALUE +
""));
+ }
+
public Optional<String> getPropValue(final String propKey) {
readWriteLock.readLock().lock();
try {