This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 636bd465714 delete data and files together
636bd465714 is described below

commit 636bd465714d7c0e1ef77f421f7025af496050f9
Author: Colin Li <[email protected]>
AuthorDate: Mon Dec 4 18:39:07 2023 +0800

    delete data and files together
---
 .../org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java |  34 +++++
 .../dataregion/DataExecutionVisitor.java           |  21 ++-
 .../db/storageengine/dataregion/DataRegion.java    | 167 +++++++++++++++++++++
 .../storageengine/dataregion/DataRegionTest.java   |  86 +++++++++++
 4 files changed, 305 insertions(+), 3 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
index 6afc4ae73bf..4923fbd1129 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMultiDeviceIT.java
@@ -250,6 +250,40 @@ public class IoTDBMultiDeviceIT {
         }
         assertEquals(2140, cnt);
       }
+
+      statement.execute("DELETE FROM root.fans.** WHERE time <= 20000");
+      statement.execute("DELETE FROM root.car.** WHERE time <= 20000");
+
+      try (ResultSet resultSet = statement.executeQuery(selectSql)) {
+        int cnt = 0;
+        long before = -1;
+        while (resultSet.next()) {
+          long cur = 
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+          if (cur <= before) {
+            fail("time order wrong!");
+          }
+          before = cur;
+          cnt++;
+        }
+        assertEquals(49, cnt);
+      }
+
+      statement.execute("DELETE FROM root.** WHERE time >= 20000");
+
+      try (ResultSet resultSet = statement.executeQuery(selectSql)) {
+        int cnt = 0;
+        long before = -1;
+        while (resultSet.next()) {
+          long cur = 
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+          if (cur <= before) {
+            fail("time order wrong!");
+          }
+          before = cur;
+          cnt++;
+        }
+        assertEquals(0, cnt);
+      }
+
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 17059ff5fc6..53813fb93ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.consensus.statemachine.dataregion;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.exception.BatchProcessException;
@@ -191,11 +192,25 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
     try {
       for (PartialPath path : node.getPathList()) {
-        dataRegion.deleteByDevice(
-            path, node.getDeleteStartTime(), node.getDeleteEndTime(), 
node.getSearchIndex());
+        PartialPath databaseToDelete = new 
PartialPath(dataRegion.getDatabaseName() + ".**");
+        if (path.matchFullPath(databaseToDelete)
+            || path.getFullPath().equals(databaseToDelete.getFullPath())) {
+          LOGGER.info(
+              "now try to delete directly, databasePath: {}, deletePath:{}",
+              databaseToDelete.getFullPath(),
+              path.getFullPath());
+          dataRegion.deleteDataDirectly(
+              databaseToDelete,
+              node.getDeleteStartTime(),
+              node.getDeleteEndTime(),
+              node.getSearchIndex());
+        } else {
+          dataRegion.deleteByDevice(
+              path, node.getDeleteStartTime(), node.getDeleteEndTime(), 
node.getSearchIndex());
+        }
       }
       return StatusUtils.OK;
-    } catch (IOException e) {
+    } catch (IOException | IllegalPathException e) {
       LOGGER.error("Error in executing plan node: {}", node, e);
       return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 04a56569674..a884ba9fcd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1904,6 +1904,51 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  public void deleteDataDirectly(
+      PartialPath pathToDelete, long startTime, long endTime, long 
searchIndex) throws IOException {
+    logger.info(
+        "{} will delete data files directly for deleting data between {} and 
{}",
+        databaseName + "-" + dataRegionId,
+        startTime,
+        endTime);
+
+    writeLock("deleteDataDirect");
+    boolean releasedLock = false;
+    try {
+      // delete last cache record if necessary
+      DataNodeSchemaCache.getInstance().takeWriteLock();
+      try {
+        DataNodeSchemaCache.getInstance().invalidate(databaseName);
+      } finally {
+        DataNodeSchemaCache.getInstance().releaseWriteLock();
+      }
+
+      // write log to impacted working TsFileProcessors
+      List<WALFlushListener> walListeners =
+          logDeletionInWAL(startTime, endTime, searchIndex, pathToDelete);
+
+      for (WALFlushListener walFlushListener : walListeners) {
+        if (walFlushListener.waitForResult() == 
WALFlushListener.Status.FAILURE) {
+          logger.error("Fail to log delete to wal.", 
walFlushListener.getCause());
+          throw walFlushListener.getCause();
+        }
+      }
+      List<TsFileResource> sealedTsFileResource = new ArrayList<>();
+      List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
+      separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime, 
endTime);
+      deleteDataDirectlyInFile(unsealedTsFileResource, pathToDelete, 
startTime, endTime);
+      writeUnlock();
+      releasedLock = true;
+      deleteDataDirectlyInFile(sealedTsFileResource, pathToDelete, startTime, 
endTime);
+    } catch (Exception e) {
+      throw new IOException(e);
+    } finally {
+      if (!releasedLock) {
+        writeUnlock();
+      }
+    }
+  }
+
   private List<WALFlushListener> logDeletionInWAL(
       long startTime, long endTime, long searchIndex, PartialPath path) {
     List<WALFlushListener> walFlushListeners = new ArrayList<>();
@@ -2065,6 +2110,128 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  private void deleteDataDirectlyInFile(
+      List<TsFileResource> tsfileResourceList,
+      PartialPath pathToDelete,
+      long startTime,
+      long endTime)
+      throws IOException {
+    List<TsFileResource> deletedByMods = new ArrayList<>();
+    List<TsFileResource> deletedByFiles = new ArrayList<>();
+    separateTsFileToDelete(
+        new HashSet<>(pathToDelete.getDevicePathPattern()),
+        tsfileResourceList,
+        deletedByMods,
+        deletedByFiles,
+        startTime,
+        endTime);
+    Deletion deletion = new Deletion(pathToDelete, 
MERGE_MOD_START_VERSION_NUM, startTime, endTime);
+    // can be deleted by mods.
+    for (TsFileResource tsFileResource : deletedByMods) {
+      ModificationFile modFile = tsFileResource.getModFile();
+      if (tsFileResource.isClosed()) {
+        long originSize = -1;
+        synchronized (modFile) {
+          try {
+            originSize = modFile.getSize();
+            // delete data in sealed file
+            if (tsFileResource.isCompacting()) {
+              // we have to set modification offset to MAX_VALUE, as the 
offset of source chunk
+              // may change after compaction
+              deletion.setFileOffset(Long.MAX_VALUE);
+              // write deletion into compaction modification file
+              tsFileResource.getCompactionModFile().write(deletion);
+              // write deletion into modification file to enable read during 
compaction
+              modFile.write(deletion);
+              // remember to close mod file
+              tsFileResource.getCompactionModFile().close();
+              modFile.close();
+            } else {
+              deletion.setFileOffset(tsFileResource.getTsFileSize());
+              // write deletion into modification file
+              boolean modFileExists = modFile.exists();
+
+              modFile.write(deletion);
+
+              // remember to close mod file
+              modFile.close();
+
+              // if file length greater than 1M,execute compact.
+              modFile.compact();
+
+              if (!modFileExists) {
+                FileMetrics.getInstance().increaseModFileNum(1);
+              }
+
+              // The file size may be smaller than the original file, so the 
increment here may be
+              // negative
+              FileMetrics.getInstance().increaseModFileSize(modFile.getSize() 
- originSize);
+            }
+          } catch (Throwable t) {
+            if (originSize != -1) {
+              modFile.truncate(originSize);
+            }
+            throw t;
+          }
+          logger.info(
+              "[Deletion] Deletion with path:{}, time:{}-{} written into mods 
file:{}.",
+              deletion.getPath(),
+              deletion.getStartTime(),
+              deletion.getEndTime(),
+              modFile.getFilePath());
+        }
+      } else {
+        // delete data in memory of unsealed file
+        tsFileResource
+            .getProcessor()
+            .deleteDataInMemory(deletion, new 
HashSet<>(pathToDelete.getDevicePathPattern()));
+      }
+    }
+
+    // can be deleted by files
+    for (TsFileResource tsFileResource : deletedByFiles) {
+      tsFileManager.remove(tsFileResource, tsFileResource.isSeq());
+      tsFileResource.writeLock();
+      try {
+        FileMetrics.getInstance()
+            .deleteTsFile(tsFileResource.isSeq(), 
Collections.singletonList(tsFileResource));
+        if (tsFileResource.getModFile().exists()) {
+          FileMetrics.getInstance().decreaseModFileNum(1);
+          
FileMetrics.getInstance().decreaseModFileSize(tsFileResource.getModFile().getSize());
+        }
+        tsFileResource.remove();
+        logger.info("Remove tsfile {} directly when delete data", 
tsFileResource.getTsFilePath());
+      } finally {
+        tsFileResource.writeUnlock();
+      }
+    }
+  }
+
+  private void separateTsFileToDelete(
+      Set<PartialPath> pathToDelete,
+      List<TsFileResource> tsFileResourceList,
+      List<TsFileResource> deletedByMods,
+      List<TsFileResource> deletedByFiles,
+      long startTime,
+      long endTime) {
+    Set<String> deviceMatchInfo = new HashSet<>();
+    for (TsFileResource file : tsFileResourceList) {
+      long fileStartTime = file.getTimeIndex().getMinStartTime();
+      long fileEndTime = file.getTimeIndex().getMaxEndTime();
+
+      if (!canSkipDelete(file, pathToDelete, startTime, endTime, 
deviceMatchInfo)) {
+        if (startTime <= fileStartTime
+            && endTime >= fileEndTime
+            && file.isClosed()
+            && file.setStatus(TsFileResourceStatus.DELETED)) {
+          deletedByFiles.add(file);
+        } else {
+          deletedByMods.add(file);
+        }
+      }
+    }
+  }
+
   private void unsequenceFlushCallback(
       TsFileProcessor processor, Map<String, Long> updateMap, long 
systemFlushTime) {
     TimePartitionManager.getInstance()
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 75f87c14159..88569a6329b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -1346,4 +1346,90 @@ public class DataRegionTest {
       super(systemInfoDir, "0", new TsFileFlushPolicy.DirectFlushPolicy(), 
storageGroupName);
     }
   }
+
+  // -- test for deleting data directly
+  // -- delete data and file only when:
+  // 1. tsfile is closed
+  // 2. tsfile is not compating
+  // 3. tsfile's start time and end time must be a subinterval
+  // of the given time range.
+
+  @Test
+  public void testDeleteDataDirectlySeqWriteModsOrDeleteFiles()
+      throws IllegalPathException, WriteProcessException, IOException {
+    for (int j = 100; j < 200; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+
+    TsFileResource tsFileResource = 
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+    // delete data in work mem, no mods.
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 50, 
100, 0);
+    Assert.assertTrue(tsFileResource.getTsFile().exists());
+    Assert.assertFalse(tsFileResource.getModFile().exists());
+
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    // delete data in closed file, but time not match
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100, 
120, 0);
+    Assert.assertTrue(tsFileResource.getTsFile().exists());
+    Assert.assertTrue(tsFileResource.getModFile().exists());
+
+    // delete data in closed file, and time all match
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100, 
199, 0);
+    Assert.assertFalse(tsFileResource.getTsFile().exists());
+    Assert.assertFalse(tsFileResource.getModFile().exists());
+  }
+
+  @Test
+  public void testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles()
+      throws IllegalPathException, WriteProcessException, IOException {
+    for (int j = 100; j < 200; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+    TsFileResource tsFileResourceSeq = 
dataRegion.getTsFileManager().getTsFileList(true).get(0);
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+    for (int j = 30; j < 100; j++) {
+      TSRecord record = new TSRecord(j, deviceId);
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
+      dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+    }
+
+    for (TsFileProcessor processor : 
dataRegion.getWorkSequenceTsFileProcessors()) {
+      processor.syncFlush();
+    }
+    TsFileResource tsFileResourceUnSeq = 
dataRegion.getTsFileManager().getTsFileList(false).get(0);
+
+    Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
+    Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+
+    // already closed, will have a mods file.
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 40, 
60, 0);
+    // not close yet, just delete in memory.
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 140, 
160, 0);
+
+    // delete data in mem table, there is no mods
+    Assert.assertTrue(tsFileResourceSeq.getTsFile().exists());
+    Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+    Assert.assertTrue(tsFileResourceSeq.getModFile().exists());
+    Assert.assertFalse(tsFileResourceUnSeq.getModFile().exists());
+    dataRegion.syncCloseAllWorkingTsFileProcessors();
+
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 40, 
80, 0);
+    Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists());
+    Assert.assertTrue(tsFileResourceUnSeq.getModFile().exists());
+
+    // seq file and unseq file have data file and mod file now,
+    // this deletion will remove data file and mod file.
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 30, 
100, 0);
+    dataRegion.deleteDataDirectly(new PartialPath("root.vehicle.d0.**"), 100, 
199, 0);
+
+    Assert.assertFalse(tsFileResourceSeq.getTsFile().exists());
+    Assert.assertFalse(tsFileResourceUnSeq.getTsFile().exists());
+    Assert.assertFalse(tsFileResourceSeq.getModFile().exists());
+    Assert.assertFalse(tsFileResourceUnSeq.getModFile().exists());
+  }
 }

Reply via email to