This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8d56ae71e5f Fix PBTree flush for negative child address (#17955)
8d56ae71e5f is described below
commit 8d56ae71e5fda188854bf78463d4d4b45488d2ed
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 08:50:02 2026 +0800
Fix PBTree flush for negative child address (#17955)
* Fix PBTree flush for negative child address
* Address PBTree flush review comments
---
.../mtree/impl/pbtree/flush/Scheduler.java | 69 +++++++++++++---------
.../pbtree/schemafile/pagemgr/PageManager.java | 9 ++-
.../metadata/mtree/schemafile/SchemaFileTest.java | 33 +++++++++++
3 files changed, 80 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
index d7fde20990e..c85e9a7074f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -77,7 +77,8 @@ public class Scheduler {
this.releaseFlushStrategy = releaseFlushStrategy;
}
- private void executeFlush(CachedMTreeStore store, int regionId,
AtomicInteger remainToFlush) {
+ private void executeFlush(
+ CachedMTreeStore store, int regionId, AtomicInteger remainToFlush,
boolean propagateFailure) {
IMemoryManager memoryManager = store.getMemoryManager();
ISchemaFile file = store.getSchemaFile();
LockManager lockManager = store.getLockManager();
@@ -98,6 +99,9 @@ public class Scheduler {
regionId,
e.getMessage(),
e);
+ if (propagateFailure) {
+ throw new RuntimeException(e);
+ }
} finally {
long time = System.currentTimeMillis() - startTime;
if (time > 10_000) {
@@ -146,22 +150,26 @@ public class Scheduler {
CompletableFuture.runAsync(
() -> {
int regionId = entry.getKey();
- CachedMTreeStore store = entry.getValue();
- if (store == null) {
- // store has been closed
- return;
- }
- LockManager lockManager = store.getLockManager();
- lockManager.globalReadLock();
- if (!regionToStore.containsKey(regionId)) {
- // double check store have not been closed
- return;
- }
try {
- executeFlush(store, regionId, null);
- executeRelease(store, false);
+ CachedMTreeStore store = entry.getValue();
+ if (store == null) {
+ // store has been closed
+ return;
+ }
+ LockManager lockManager = store.getLockManager();
+ lockManager.globalReadLock();
+ try {
+ if (!regionToStore.containsKey(regionId)) {
+ // double check store have not been closed
+ return;
+ }
+ executeFlush(store, regionId, null, true);
+ executeRelease(store, false);
+ } finally {
+ lockManager.globalReadUnlock();
+ }
} finally {
- lockManager.globalReadUnlock();
+ flushingRegionSet.remove(regionId);
}
},
workerPool))
@@ -222,22 +230,25 @@ public class Scheduler {
flushingRegionSet.add(regionId);
workerPool.submit(
() -> {
- CachedMTreeStore store = regionToStore.get(regionId);
- if (store == null) {
- // store has been closed
- return;
- }
- LockManager lockManager = store.getLockManager();
- lockManager.globalReadLock();
- if (!regionToStore.containsKey(regionId)) {
- // double check store have not been closed
- return;
- }
try {
-
- executeFlush(store, regionId, remainToFlush);
+ CachedMTreeStore store = regionToStore.get(regionId);
+ if (store == null) {
+ // store has been closed
+ return;
+ }
+ LockManager lockManager = store.getLockManager();
+ lockManager.globalReadLock();
+ try {
+ if (!regionToStore.containsKey(regionId)) {
+ // double check store have not been closed
+ return;
+ }
+ executeFlush(store, regionId, remainToFlush, false);
+ } finally {
+ lockManager.globalReadUnlock();
+ }
} finally {
- lockManager.globalReadUnlock();
+ flushingRegionSet.remove(regionId);
}
});
if (remainToFlush.get() <= 0) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 997698591e6..ea2f1f27f28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -293,8 +293,6 @@ public abstract class PageManager implements IPageManager {
.entrySet()) {
child = entry.getValue();
actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
- childBuffer = RecordUtils.node2Buffer(child);
-
curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
if
(curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress),
entry.getKey())
== null) {
@@ -305,6 +303,13 @@ public abstract class PageManager implements IPageManager {
entry.getKey()));
}
+ if (!child.isMeasurement() && getNodeAddress(child) < 0) {
+ short estSegSize = estimateSegmentSize(child);
+ long glbIndex = preAllocateSegment(estSegSize, cxt);
+ SchemaFile.setNodeAddress(child, glbIndex);
+ }
+ childBuffer = RecordUtils.node2Buffer(child);
+
// prepare alias comparison
if (child.isMeasurement() && child.getAsMeasurementMNode().getAlias() !=
null) {
alias = child.getAsMeasurementMNode().getAlias();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index 771caf7504c..7813b0dd9a9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -315,6 +315,39 @@ public class SchemaFileTest {
sf.close();
}
+ @Test
+ public void testFlushUpdatedChildWithNegativeSegmentAddress() throws
Exception {
+ ISchemaFile sf = SchemaFile.initSchemaFile("root.sg",
TEST_SCHEMA_REGION_ID);
+ ICachedMNode sgNode = nodeFactory.createDatabaseDeviceMNode(null,
"sg").getAsMNode();
+ ICachedMNode device = nodeFactory.createDeviceMNode(sgNode,
"d1").getAsMNode();
+ sgNode.addChild(device);
+
+ writeMNodeInTest(sf, sgNode);
+
+ // Typical flush order: the parent already has this child record on disk,
while the updated
+ // child object in memory may still have no valid segment address.
+
ICachedMNodeContainer.getCachedMNodeContainer(device).setSegmentAddress(-1L);
+ addNodeToUpdateBuffer(sgNode, device);
+ writeMNodeInTest(sf, sgNode);
+
+ Assert.assertTrue(getSegAddrInContainer(device) >= 0);
+
+ device.addChild(getMeasurementNode(device, "s1", "alias_s1"));
+ writeMNodeInTest(sf, device);
+ Assert.assertEquals(
+ "alias_s1", sf.getChildNode(device,
"s1").getAsMeasurementMNode().getAlias());
+
+ long deviceSegmentAddress = getSegAddrInContainer(device);
+ sf.close();
+
+ sf = SchemaFile.loadSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
+ ICachedMNode loadedDevice = sf.getChildNode(sgNode, "d1");
+ Assert.assertEquals(deviceSegmentAddress,
getSegAddrInContainer(loadedDevice));
+ Assert.assertEquals(
+ "alias_s1", sf.getChildNode(loadedDevice,
"s1").getAsMeasurementMNode().getAlias());
+ sf.close();
+ }
+
@Test
public void testMassiveSegment() throws MetadataException, IOException {
ICachedMNode dbNode = nodeFactory.createDatabaseDeviceMNode(null,
"sgRoot");