This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 18e55fee245 DAL: Fix deletion channel close exception (#15302)
18e55fee245 is described below

commit 18e55fee24597fdea1a8e2f45b13911de2c30009
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Apr 9 16:35:43 2025 +0800

    DAL: Fix deletion channel close exception (#15302)
    
    * fix deletion channel close exception
    
    * refine DAL UT to cover switching DAL and delete DAL
    
    * improve
---
 .../pipe/consensus/deletion/DeletionResource.java  |  6 ++++
 .../deletion/persist/PageCacheDeletionBuffer.java  | 22 ++++++++-----
 .../db/pipe/consensus/DeletionResourceTest.java    | 38 ++++++++++++++++------
 3 files changed, 47 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
index 4755b2b962a..3bf5c1287b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -198,4 +199,9 @@ public class DeletionResource implements PersistentResource 
{
     FAILURE,
     RUNNING,
   }
+
+  @TestOnly
+  public void setPipeTaskReferenceCount(int pipeTaskReferenceCount) {
+    this.pipeTaskReferenceCount.set(pipeTaskReferenceCount);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index d8dcfc83ab5..0bde64fd371 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
@@ -63,7 +64,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
   private static final long MAX_WAIT_CLOSE_TIME_IN_MS = 10000;
 
   // Buffer config keep consistent with WAL.
-  public static final int DAL_BUFFER_SIZE = config.getWalBufferSize() / 3;
+  public static int DAL_BUFFER_SIZE = config.getWalBufferSize() / 3;
 
   // DeletionResources received from storage engine, which is waiting to be 
persisted.
   private final BlockingQueue<DeletionResource> deletionResources =
@@ -93,12 +94,10 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
   private volatile File logFile;
   private volatile FileOutputStream logStream;
   private volatile FileChannel logChannel;
-  // Max progressIndex among current .deletion file.
-  private ProgressIndex maxProgressIndexInCurrentFile = 
MinimumProgressIndex.INSTANCE;
-  // Max progressIndex among last .deletion file. Used by PersistTask for 
naming .deletion file.
+  // Max progressIndex among current .deletion file. Used by PersistTask for 
naming .deletion file.
   // Since deletions are written serially, DAL is also written serially. This 
ensures that the
   // maxProgressIndex of each batch increases in the same order as the 
physical time.
-  private volatile ProgressIndex maxProgressIndexInLastFile = 
MinimumProgressIndex.INSTANCE;
+  private ProgressIndex maxProgressIndexInCurrentFile = 
MinimumProgressIndex.INSTANCE;
 
   public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) {
     this.dataRegionId = dataRegionId;
@@ -201,7 +200,6 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
   private void resetFileAttribute() {
     // Reset file attributes.
     this.totalSize.set(0);
-    this.maxProgressIndexInLastFile = this.maxProgressIndexInCurrentFile;
     this.maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE;
   }
 
@@ -221,15 +219,16 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
 
   private void switchLoggingFile() throws IOException {
     try {
-      // PipeConsensus ensures that deleteDataNodes use recoverProgressIndex.
       ProgressIndex curProgressIndex =
           ReplicateProgressDataNodeManager.extractLocalSimpleProgressIndex(
-              maxProgressIndexInLastFile);
+              maxProgressIndexInCurrentFile);
+      // PipeConsensus ensures that deleteDataNodes use recoverProgressIndex.
       if (!(curProgressIndex instanceof SimpleProgressIndex)) {
         throw new IOException("Invalid deletion progress index: " + 
curProgressIndex);
       }
       SimpleProgressIndex progressIndex = (SimpleProgressIndex) 
curProgressIndex;
-      // Deletion file name format: 
"_{rebootTimes}_{memTableFlushOrderId}.deletion"
+      // Deletion file name format:
+      // "_{lastFileMaxRebootTimes}_{lastFileMaxMemTableFlushOrderId}.deletion"
       this.logFile =
           new File(
               baseDirectory,
@@ -381,4 +380,9 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
       }
     }
   }
+
+  @TestOnly
+  public static void setDalBufferSize(int dalBufferSize) {
+    DAL_BUFFER_SIZE = dalBufferSize;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index 37d2ea46bd8..8fff91a4c92 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -23,6 +23,8 @@ import 
org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
@@ -31,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource.Status;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
+import 
org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
@@ -67,6 +70,7 @@ public class DeletionResourceTest {
   private static final String DELETION_BASE_DIR =
       
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir();
   private static final int THIS_DATANODE_ID = 0;
+  private static final int TEST_DAL_FILE_SIZE = 1024;
   private DeletionResourceManager deletionResourceManager;
   private int previousDataNodeId;
 
@@ -101,17 +105,17 @@ public class DeletionResourceTest {
   @Test
   public void testAddBatchDeletionResource()
       throws IllegalPathException, IOException, InterruptedException {
-    addBatchDeletionResource(true);
-    addBatchDeletionResource(false);
+    addBatchDeletionResource(true, 0);
+    addBatchDeletionResource(false, 10);
   }
 
-  public void addBatchDeletionResource(boolean isRelational)
+  public void addBatchDeletionResource(final boolean isRelational, final int 
initialIndex)
       throws IllegalPathException, InterruptedException, IOException {
     deletionResourceManager = 
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[1]);
     int deletionCount = 10;
     int rebootTimes = 0;
     MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
-    for (int i = 0; i < deletionCount; i++) {
+    for (int i = initialIndex; i < initialIndex + deletionCount; i++) {
       AbstractDeleteDataNode deleteDataNode;
       if (isRelational) {
         deleteDataNode =
@@ -175,11 +179,12 @@ public class DeletionResourceTest {
 
   @Test
   public void testDeletionRemove() throws IllegalPathException, 
InterruptedException, IOException {
-    deletionRemove(true);
-    deletionRemove(false);
+    PageCacheDeletionBuffer.setDalBufferSize(TEST_DAL_FILE_SIZE);
+    deletionRemove(true, 0);
+    deletionRemove(false, 20);
   }
 
-  public void deletionRemove(final boolean isRelational)
+  public void deletionRemove(final boolean isRelational, final int 
initialIndex)
       throws IllegalPathException, InterruptedException, IOException {
     deletionResourceManager = 
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[3]);
     // new a deletion
@@ -187,7 +192,7 @@ public class DeletionResourceTest {
     final int deletionCount = 20;
     final MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
     final List<PipeDeleteDataNodeEvent> deletionEvents = new ArrayList<>();
-    for (int i = 0; i < deletionCount; i++) {
+    for (int i = initialIndex; i < initialIndex + deletionCount; i++) {
       final AbstractDeleteDataNode deleteDataNode;
       if (isRelational) {
         deleteDataNode =
@@ -204,16 +209,27 @@ public class DeletionResourceTest {
       deleteDataNode.setProgressIndex(
           new RecoverProgressIndex(THIS_DATANODE_ID, new 
SimpleProgressIndex(rebootTimes, i)));
       final PipeDeleteDataNodeEvent deletionEvent =
-          new PipeDeleteDataNodeEvent(deleteDataNode, true);
+          new PipeDeleteDataNodeEvent(
+              deleteDataNode, "Test", 10, null, null, null, null, true, true);
+      deletionEvent.setCommitterKeyAndCommitId(
+          new CommitterKey("Test", 10, 
Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 0), i + 1);
       deletionEvents.add(deletionEvent);
+
       final DeletionResource deletionResource =
           deletionResourceManager.registerDeletionResource(deleteDataNode);
+      deletionResource.setPipeTaskReferenceCount(1);
       deletionEvent.setDeletionResource(
           deletionResourceManager.getDeletionResource(deleteDataNode));
       if (deletionResource.waitForResult() != Status.SUCCESS) {
         Assert.fail();
       }
     }
+
+    // for event commit to invoke onCommit() to removeDAL
+    if (initialIndex == 0) {
+      PipeEventCommitManager.getInstance()
+          .register("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 
"Test");
+    }
     deletionEvents.forEach(deletionEvent -> 
deletionEvent.increaseReferenceCount("test"));
     final List<Path> paths =
         Files.list(Paths.get(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_IDS[3]))
@@ -221,6 +237,7 @@ public class DeletionResourceTest {
     Assert.assertTrue(paths.stream().anyMatch(Files::isRegularFile));
     final int beforeFileCount = paths.size();
     if (beforeFileCount < 2) {
+      // not generate enough DAL file
       return;
     }
     // Remove deletion
@@ -231,7 +248,8 @@ public class DeletionResourceTest {
         Files.list(Paths.get(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_IDS[3]))
             .collect(Collectors.toList());
     final int afterCount = newPaths.size();
-    Assert.assertTrue(afterCount < beforeFileCount);
+    // assume all DAL are deleted except for the last one.
+    Assert.assertTrue(afterCount < beforeFileCount && afterCount == 1);
   }
 
   @Test

Reply via email to