This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 18e55fee245 DAL: Fix deletion channel close exception (#15302)
18e55fee245 is described below
commit 18e55fee24597fdea1a8e2f45b13911de2c30009
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Apr 9 16:35:43 2025 +0800
DAL: Fix deletion channel close exception (#15302)
* fix deletion channel close exception
* refine DAL UT to cover switching DAL and delete DAL
* improve
---
.../pipe/consensus/deletion/DeletionResource.java | 6 ++++
.../deletion/persist/PageCacheDeletionBuffer.java | 22 ++++++++-----
.../db/pipe/consensus/DeletionResourceTest.java | 38 ++++++++++++++++------
3 files changed, 47 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
index 4755b2b962a..3bf5c1287b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -198,4 +199,9 @@ public class DeletionResource implements PersistentResource
{
FAILURE,
RUNNING,
}
+
+ @TestOnly
+ public void setPipeTaskReferenceCount(int pipeTaskReferenceCount) {
+ this.pipeTaskReferenceCount.set(pipeTaskReferenceCount);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index d8dcfc83ab5..0bde64fd371 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
@@ -63,7 +64,7 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
private static final long MAX_WAIT_CLOSE_TIME_IN_MS = 10000;
// Buffer config keep consistent with WAL.
- public static final int DAL_BUFFER_SIZE = config.getWalBufferSize() / 3;
+ public static int DAL_BUFFER_SIZE = config.getWalBufferSize() / 3;
// DeletionResources received from storage engine, which is waiting to be
persisted.
private final BlockingQueue<DeletionResource> deletionResources =
@@ -93,12 +94,10 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
private volatile File logFile;
private volatile FileOutputStream logStream;
private volatile FileChannel logChannel;
- // Max progressIndex among current .deletion file.
- private ProgressIndex maxProgressIndexInCurrentFile =
MinimumProgressIndex.INSTANCE;
- // Max progressIndex among last .deletion file. Used by PersistTask for
naming .deletion file.
+ // Max progressIndex among current .deletion file. Used by PersistTask for
naming .deletion file.
// Since deletions are written serially, DAL is also written serially. This
ensures that the
// maxProgressIndex of each batch increases in the same order as the
physical time.
- private volatile ProgressIndex maxProgressIndexInLastFile =
MinimumProgressIndex.INSTANCE;
+ private ProgressIndex maxProgressIndexInCurrentFile =
MinimumProgressIndex.INSTANCE;
public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) {
this.dataRegionId = dataRegionId;
@@ -201,7 +200,6 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
private void resetFileAttribute() {
// Reset file attributes.
this.totalSize.set(0);
- this.maxProgressIndexInLastFile = this.maxProgressIndexInCurrentFile;
this.maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE;
}
@@ -221,15 +219,16 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
private void switchLoggingFile() throws IOException {
try {
- // PipeConsensus ensures that deleteDataNodes use recoverProgressIndex.
ProgressIndex curProgressIndex =
ReplicateProgressDataNodeManager.extractLocalSimpleProgressIndex(
- maxProgressIndexInLastFile);
+ maxProgressIndexInCurrentFile);
+ // PipeConsensus ensures that deleteDataNodes use recoverProgressIndex.
if (!(curProgressIndex instanceof SimpleProgressIndex)) {
throw new IOException("Invalid deletion progress index: " +
curProgressIndex);
}
SimpleProgressIndex progressIndex = (SimpleProgressIndex)
curProgressIndex;
- // Deletion file name format:
"_{rebootTimes}_{memTableFlushOrderId}.deletion"
+ // Deletion file name format:
+ // "_{lastFileMaxRebootTimes}_{lastFileMaxMemTableFlushOrderId}.deletion"
this.logFile =
new File(
baseDirectory,
@@ -381,4 +380,9 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
}
}
}
+
+ @TestOnly
+ public static void setDalBufferSize(int dalBufferSize) {
+ DAL_BUFFER_SIZE = dalBufferSize;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index 37d2ea46bd8..8fff91a4c92 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -23,6 +23,8 @@ import
org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
@@ -31,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource.Status;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
+import
org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
@@ -67,6 +70,7 @@ public class DeletionResourceTest {
private static final String DELETION_BASE_DIR =
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir();
private static final int THIS_DATANODE_ID = 0;
+ private static final int TEST_DAL_FILE_SIZE = 1024;
private DeletionResourceManager deletionResourceManager;
private int previousDataNodeId;
@@ -101,17 +105,17 @@ public class DeletionResourceTest {
@Test
public void testAddBatchDeletionResource()
throws IllegalPathException, IOException, InterruptedException {
- addBatchDeletionResource(true);
- addBatchDeletionResource(false);
+ addBatchDeletionResource(true, 0);
+ addBatchDeletionResource(false, 10);
}
- public void addBatchDeletionResource(boolean isRelational)
+ public void addBatchDeletionResource(final boolean isRelational, final int
initialIndex)
throws IllegalPathException, InterruptedException, IOException {
deletionResourceManager =
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[1]);
int deletionCount = 10;
int rebootTimes = 0;
MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
- for (int i = 0; i < deletionCount; i++) {
+ for (int i = initialIndex; i < initialIndex + deletionCount; i++) {
AbstractDeleteDataNode deleteDataNode;
if (isRelational) {
deleteDataNode =
@@ -175,11 +179,12 @@ public class DeletionResourceTest {
@Test
public void testDeletionRemove() throws IllegalPathException,
InterruptedException, IOException {
- deletionRemove(true);
- deletionRemove(false);
+ PageCacheDeletionBuffer.setDalBufferSize(TEST_DAL_FILE_SIZE);
+ deletionRemove(true, 0);
+ deletionRemove(false, 20);
}
- public void deletionRemove(final boolean isRelational)
+ public void deletionRemove(final boolean isRelational, final int
initialIndex)
throws IllegalPathException, InterruptedException, IOException {
deletionResourceManager =
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[3]);
// new a deletion
@@ -187,7 +192,7 @@ public class DeletionResourceTest {
final int deletionCount = 20;
final MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
final List<PipeDeleteDataNodeEvent> deletionEvents = new ArrayList<>();
- for (int i = 0; i < deletionCount; i++) {
+ for (int i = initialIndex; i < initialIndex + deletionCount; i++) {
final AbstractDeleteDataNode deleteDataNode;
if (isRelational) {
deleteDataNode =
@@ -204,16 +209,27 @@ public class DeletionResourceTest {
deleteDataNode.setProgressIndex(
new RecoverProgressIndex(THIS_DATANODE_ID, new
SimpleProgressIndex(rebootTimes, i)));
final PipeDeleteDataNodeEvent deletionEvent =
- new PipeDeleteDataNodeEvent(deleteDataNode, true);
+ new PipeDeleteDataNodeEvent(
+ deleteDataNode, "Test", 10, null, null, null, null, true, true);
+ deletionEvent.setCommitterKeyAndCommitId(
+ new CommitterKey("Test", 10,
Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 0), i + 1);
deletionEvents.add(deletionEvent);
+
final DeletionResource deletionResource =
deletionResourceManager.registerDeletionResource(deleteDataNode);
+ deletionResource.setPipeTaskReferenceCount(1);
deletionEvent.setDeletionResource(
deletionResourceManager.getDeletionResource(deleteDataNode));
if (deletionResource.waitForResult() != Status.SUCCESS) {
Assert.fail();
}
}
+
+ // for event commit to invoke onCommit() to removeDAL
+ if (initialIndex == 0) {
+ PipeEventCommitManager.getInstance()
+ .register("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]),
"Test");
+ }
deletionEvents.forEach(deletionEvent ->
deletionEvent.increaseReferenceCount("test"));
final List<Path> paths =
Files.list(Paths.get(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_IDS[3]))
@@ -221,6 +237,7 @@ public class DeletionResourceTest {
Assert.assertTrue(paths.stream().anyMatch(Files::isRegularFile));
final int beforeFileCount = paths.size();
if (beforeFileCount < 2) {
+ // not generate enough DAL file
return;
}
// Remove deletion
@@ -231,7 +248,8 @@ public class DeletionResourceTest {
Files.list(Paths.get(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_IDS[3]))
.collect(Collectors.toList());
final int afterCount = newPaths.size();
- Assert.assertTrue(afterCount < beforeFileCount);
+ // assume all DAL are deleted except for the last one.
+ Assert.assertTrue(afterCount < beforeFileCount && afterCount == 1);
}
@Test