This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 636bd465714 delete data and files together
636bd465714 is described below
commit 636bd465714d7c0e1ef77f421f7025af496050f9
Author: Colin Li <[email protected]>
AuthorDate: Mon Dec 4 18:39:07 2023 +0800
delete data and files together
---
.../org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java | 34 +++++
.../dataregion/DataExecutionVisitor.java | 21 ++-
.../db/storageengine/dataregion/DataRegion.java | 167 +++++++++++++++++++++
.../storageengine/dataregion/DataRegionTest.java | 86 +++++++++++
4 files changed, 305 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
index 6afc4ae73bf..4923fbd1129 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
@@ -250,6 +250,40 @@ public class IoTDBMultiDeviceIT {
}
assertEquals(2140, cnt);
}
+
+ statement.execute("DELETE FROM root.fans.** WHERE time <= 20000");
+ statement.execute("DELETE FROM root.car.** WHERE time <= 20000");
+
+ try (ResultSet resultSet = statement.executeQuery(selectSql)) {
+ int cnt = 0;
+ long before = -1;
+ while (resultSet.next()) {
+ long cur =
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+ if (cur <= before) {
+ fail("time order wrong!");
+ }
+ before = cur;
+ cnt++;
+ }
+ assertEquals(49, cnt);
+ }
+
+ statement.execute("DELETE FROM root.** WHERE time >= 20000");
+
+ try (ResultSet resultSet = statement.executeQuery(selectSql)) {
+ int cnt = 0;
+ long before = -1;
+ while (resultSet.next()) {
+ long cur =
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+ if (cur <= before) {
+ fail("time order wrong!");
+ }
+ before = cur;
+ cnt++;
+ }
+ assertEquals(0, cnt);
+ }
+
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 17059ff5fc6..53813fb93ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.consensus.statemachine.dataregion;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.exception.BatchProcessException;
@@ -191,11 +192,25 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
try {
for (PartialPath path : node.getPathList()) {
- dataRegion.deleteByDevice(
- path, node.getDeleteStartTime(), node.getDeleteEndTime(),
node.getSearchIndex());
+ PartialPath databaseToDelete = new
PartialPath(dataRegion.getDatabaseName() + ".**");
+ if (path.matchFullPath(databaseToDelete)
+ || path.getFullPath().equals(databaseToDelete.getFullPath())) {
+ LOGGER.info(
+ "now try to delete directly, databasePath: {}, deletePath:{}",
+ databaseToDelete.getFullPath(),
+ path.getFullPath());
+ dataRegion.deleteDataDirectly(
+ databaseToDelete,
+ node.getDeleteStartTime(),
+ node.getDeleteEndTime(),
+ node.getSearchIndex());
+ } else {
+ dataRegion.deleteByDevice(
+ path, node.getDeleteStartTime(), node.getDeleteEndTime(),
node.getSearchIndex());
+ }
}
return StatusUtils.OK;
- } catch (IOException e) {
+ } catch (IOException | IllegalPathException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 04a56569674..a884ba9fcd3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1904,6 +1904,51 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ public void deleteDataDirectly(
+ PartialPath pathToDelete, long startTime, long endTime, long
searchIndex) throws IOException {
+ logger.info(
+ "{} will delete data files directly for deleting data between {} and
{}",
+ databaseName + "-" + dataRegionId,
+ startTime,
+ endTime);
+
+ writeLock("deleteDataDirect");
+ boolean releasedLock = false;
+ try {
+ // delete last cache record if necessary
+ DataNodeSchemaCache.getInstance().takeWriteLock();
+ try {
+ DataNodeSchemaCache.getInstance().invalidate(databaseName);
+ } finally {
+ DataNodeSchemaCache.getInstance().releaseWriteLock();
+ }
+
+ // write log to impacted working TsFileProcessors
+ List<WALFlushListener> walListeners =
+ logDeletionInWAL(startTime, endTime, searchIndex, pathToDelete);
+
+ for (WALFlushListener walFlushListener : walListeners) {
+ if (walFlushListener.waitForResult() ==
WALFlushListener.Status.FAILURE) {
+ logger.error("Fail to log delete to wal.",
walFlushListener.getCause());
+ throw walFlushListener.getCause();
+ }
+ }
+ List<TsFileResource> sealedTsFileResource = new ArrayList<>();
+ List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
+ separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime,
endTime);
+ deleteDataDirectlyInFile(unsealedTsFileResource, pathToDelete,
startTime, endTime);
+ writeUnlock();
+ releasedLock = true;
+ deleteDataDirectlyInFile(sealedTsFileResource, pathToDelete, startTime,
endTime);
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ if (!releasedLock) {
+ writeUnlock();
+ }
+ }
+ }
+
private List<WALFlushListener> logDeletionInWAL(
long startTime, long endTime, long searchIndex, PartialPath path) {
List<WALFlushListener> walFlushListeners = new ArrayList<>();
@@ -2065,6 +2110,128 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ private void deleteDataDirectlyInFile(
+ List<TsFileResource> tsfileResourceList,
+ PartialPath pathToDelete,
+ long startTime,
+ long endTime)
+ throws IOException {
+ List<TsFileResource> deletedByMods = new ArrayList<>();
+ List<TsFileResource> deletedByFiles = new ArrayList<>();
+ separateTsFileToDelete(
+ new HashSet<>(pathToDelete.getDevicePathPattern()),
+ tsfileResourceList,
+ deletedByMods,
+ deletedByFiles,
+ startTime,
+ endTime);
+ Deletion deletion = new Deletion(pathToDelete,
MERGE_MOD_START_VERSION_NUM, startTime, endTime);
+ // can be deleted by mods.
+ for (TsFileResource tsFileResource : deletedByMods) {
+ ModificationFile modFile = tsFileResource.getModFile();
+ if (tsFileResource.isClosed()) {
+ long originSize = -1;
+ synchronized (modFile) {
+ try {
+ originSize = modFile.getSize();
+ // delete data in sealed file
+ if (tsFileResource.isCompacting()) {
+ // we have to set modification offset to MAX_VALUE, as the
offset of source chunk
+ // may change after compaction
+ deletion.setFileOffset(Long.MAX_VALUE);
+ // write deletion into compaction modification file
+ tsFileResource.getCompactionModFile().write(deletion);
+ // write deletion into modification file to enable read during
compaction
+ modFile.write(deletion);
+ // remember to close mod file
+ tsFileResource.getCompactionModFile().close();
+ modFile.close();
+ } else {
+ deletion.setFileOffset(tsFileResource.getTsFileSize());
+ // write deletion into modification file
+ boolean modFileExists = modFile.exists();
+
+ modFile.write(deletion);
+
+ // remember to close mod file
+ modFile.close();
+
+ // if file length greater than 1M,execute compact.
+ modFile.compact();
+
+ if (!modFileExists) {
+ FileMetrics.getInstance().increaseModFileNum(1);
+ }
+
+ // The file size may be smaller than the original file, so the
increment here may be
+ // negative
+ FileMetrics.getInstance().increaseModFileSize(modFile.getSize()
- originSize);
+ }
+ } catch (Throwable t) {
+ if (originSize != -1) {
+ modFile.truncate(originSize);
+ }
+ throw t;
+ }
+ logger.info(
+ "[Deletion] Deletion with path:{}, time:{}-{} written into mods
file:{}.",
+ deletion.getPath(),
+ deletion.getStartTime(),
+ deletion.getEndTime(),
+ modFile.getFilePath());
+ }
+ } else {
+ // delete data in memory of unsealed file
+ tsFileResource
+ .getProcessor()
+ .deleteDataInMemory(deletion, new
HashSet<>(pathToDelete.getDevicePathPattern()));
+ }
+ }
+
+ // can be deleted by files
+ for (TsFileResource tsFileResource : deletedByFiles) {
+ tsFileManager.remove(tsFileResource, tsFileResource.isSeq());
+ tsFileResource.writeLock();
+ try {
+ FileMetrics.getInstance()
+ .deleteTsFile(tsFileResource.isSeq(),
Collections.singletonList(tsFileResource));
+ if (tsFileResource.getModFile().exists()) {
+ FileMetrics.getInstance().decreaseModFileNum(1);
+
FileMetrics.getInstance().decreaseModFileSize(tsFileResource.getModFile().getSize());
+ }
+ tsFileResource.remove();
+ logger.info("Remove tsfile {} directly when delete data",
tsFileResource.getTsFilePath());
+ } finally {
+ tsFileResource.writeUnlock();
+ }
+ }
+ }
+
+ private void separateTsFileToDelete(
+ Set<PartialPath> pathToDelete,
+ List<TsFileResource> tsFileResourceList,
+ List<TsFileResource> deletedByMods,
+ List<TsFileResource> deletedByFiles,
+ long startTime,
+ long endTime) {
+ Set<String> deviceMatchInfo = new HashSet<>();
+ for (TsFileResource file : tsFileResourceList) {
+ long fileStartTime = file.getTimeIndex().getMinStartTime();
+ long fileEndTime = file.getTimeIndex().getMaxEndTime();
+
+ if (!canSkipDelete(file, pathToDelete, startTime, endTime,
deviceMatchInfo)) {
+ if (startTime <= fileStartTime
+ && endTime >= fileEndTime
+ && file.isClosed()
+ && file.setStatus(TsFileResourceStatus.DELETED)) {
+ deletedByFiles.add(file);
+ } else {
+ deletedByMods.add(file);
+ }
+ }
+ }
+ }
+
private void unsequenceFlushCallback(
TsFileProcessor processor, Map<String, Long> updateMap, long
systemFlushTime) {
TimePartitionManager.getInstance()
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 75f87c14159..88569a6329b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -1346,4 +1346,90 @@ public class DataRegionTest {
super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(),
storageGroupName);
}
}
+
+ // -- test for deleting data directly
+ // -- delete data and file only when:
+ // 1. tsfile is closed
+ // 2. tsfile is not compating
+ // 3. tsfile's start time and end time must be a subinterval
+ // of the given time range.
+
+ @Test
+ public void testDeleteDataDirectlySeqWriteModsOrDeleteFiles()
+ throws IllegalPathException, WriteProcessException, IOException {
+ for (int j = 100; j < 200; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+
+ TsFileResource tsFileResource =
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+ // delete data in work mem, no mods.
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 50,
100, 0);
+ Assert.assertTrue(tsFileResource.getTsFile().exists());
+ Assert.assertFalse(tsFileResource.getModFile().exists());
+
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ // delete data in closed file, but time not match
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100,
120, 0);
+ Assert.assertTrue(tsFileResource.getTsFile().exists());
+ Assert.assertTrue(tsFileResource.getModFile().exists());
+
+ // delete data in closed file, and time all match
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100,
199, 0);
+ Assert.assertFalse(tsFileResource.getTsFile().exists());
+ Assert.assertFalse(tsFileResource.getModFile().exists());
+ }
+
+ @Test
+ public void testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles()
+ throws IllegalPathException, WriteProcessException, IOException {
+ for (int j = 100; j < 200; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+ TsFileResource tsFileResourceSeq =
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ for (int j = 30; j < 100; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+
+ for (TsFileProcessor processor :
dataRegion.getWorkSequenceTsFileProcessors()) {
+ processor.syncFlush();
+ }
+ TsFileResource tsFileResourceUnSeq =
dataRegion.getTsFileManager().getTsFileList(false).get(0);
+
+ Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
+ Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+
+ // already closed, will have a mods file.
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 40,
60, 0);
+ // not close yet, just delete in memory.
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 140,
160, 0);
+
+ // delete data in mem table, there is no mods
+ Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
+ Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ Assert.assertTrue(tsFileResourceSeq.getModFile().exists());
+ Assert.assertFalse(tsFileResourceUnSeq.getModFile().exists());
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 40,
80, 0);
+ Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+ Assert.assertTrue(tsFileResourceUnSeq.getModFile().exists());
+
+ // seq file and unseq file have data file and mod file now,
+ // this deletion will remove data file and mod file.
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 30,
100, 0);
+ dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100,
199, 0);
+
+ Assert.assertFalse(tsFileResourceSeq.getTsFile().exists());
+ Assert.assertFalse(tsFileResourceUnSeq.getTsFile().exists());
+ Assert.assertFalse(tsFileResourceSeq.getModFile().exists());
+ Assert.assertFalse(tsFileResourceUnSeq.getModFile().exists());
+ }
}