This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new aaa5729075a Fix PBTree flush for negative child address (#17955)
(#17978)
aaa5729075a is described below
commit aaa5729075ab659b200747983a7f1b879eff19cc
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 18:40:59 2026 +0800
Fix PBTree flush for negative child address (#17955) (#17978)
* Fix PBTree flush for negative child address
* Address PBTree flush review comments
(cherry picked from commit 8d56ae71e5fda188854bf78463d4d4b45488d2ed)
---
.../mtree/impl/pbtree/flush/Scheduler.java | 69 +++++++++++++---------
.../pbtree/schemafile/pagemgr/PageManager.java | 9 ++-
.../metadata/mtree/schemafile/SchemaFileTest.java | 33 +++++++++++
3 files changed, 80 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
index b467b473347..9ef0fbf0f30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -76,7 +76,8 @@ public class Scheduler {
this.releaseFlushStrategy = releaseFlushStrategy;
}
- private void executeFlush(CachedMTreeStore store, int regionId,
AtomicInteger remainToFlush) {
+ private void executeFlush(
+ CachedMTreeStore store, int regionId, AtomicInteger remainToFlush,
boolean propagateFailure) {
IMemoryManager memoryManager = store.getMemoryManager();
ISchemaFile file = store.getSchemaFile();
LockManager lockManager = store.getLockManager();
@@ -97,6 +98,9 @@ public class Scheduler {
regionId,
e.getMessage(),
e);
+ if (propagateFailure) {
+ throw new RuntimeException(e);
+ }
} finally {
long time = System.currentTimeMillis() - startTime;
if (time > 10_000) {
@@ -145,22 +149,26 @@ public class Scheduler {
CompletableFuture.runAsync(
() -> {
int regionId = entry.getKey();
- CachedMTreeStore store = entry.getValue();
- if (store == null) {
- // store has been closed
- return;
- }
- LockManager lockManager = store.getLockManager();
- lockManager.globalReadLock();
- if (!regionToStore.containsKey(regionId)) {
- // double check store have not been closed
- return;
- }
try {
- executeFlush(store, regionId, null);
- executeRelease(store, false);
+ CachedMTreeStore store = entry.getValue();
+ if (store == null) {
+ // store has been closed
+ return;
+ }
+ LockManager lockManager = store.getLockManager();
+ lockManager.globalReadLock();
+ try {
+ if (!regionToStore.containsKey(regionId)) {
+ // double check store have not been closed
+ return;
+ }
+ executeFlush(store, regionId, null, true);
+ executeRelease(store, false);
+ } finally {
+ lockManager.globalReadUnlock();
+ }
} finally {
- lockManager.globalReadUnlock();
+ flushingRegionSet.remove(regionId);
}
},
workerPool))
@@ -221,22 +229,25 @@ public class Scheduler {
flushingRegionSet.add(regionId);
workerPool.submit(
() -> {
- CachedMTreeStore store = regionToStore.get(regionId);
- if (store == null) {
- // store has been closed
- return;
- }
- LockManager lockManager = store.getLockManager();
- lockManager.globalReadLock();
- if (!regionToStore.containsKey(regionId)) {
- // double check store have not been closed
- return;
- }
try {
-
- executeFlush(store, regionId, remainToFlush);
+ CachedMTreeStore store = regionToStore.get(regionId);
+ if (store == null) {
+ // store has been closed
+ return;
+ }
+ LockManager lockManager = store.getLockManager();
+ lockManager.globalReadLock();
+ try {
+ if (!regionToStore.containsKey(regionId)) {
+ // double check store have not been closed
+ return;
+ }
+ executeFlush(store, regionId, remainToFlush, false);
+ } finally {
+ lockManager.globalReadUnlock();
+ }
} finally {
- lockManager.globalReadUnlock();
+ flushingRegionSet.remove(regionId);
}
});
if (remainToFlush.get() <= 0) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 06d8c279844..56e0b5a6dc2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -292,8 +292,6 @@ public abstract class PageManager implements IPageManager {
.entrySet()) {
child = entry.getValue();
actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
- childBuffer = RecordUtils.node2Buffer(child);
-
curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
if
(curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress),
entry.getKey())
== null) {
@@ -302,6 +300,13 @@ public abstract class PageManager implements IPageManager {
"Node[%s] has no child[%s] in pbtree file.", node.getName(),
entry.getKey()));
}
+ if (!child.isMeasurement() && getNodeAddress(child) < 0) {
+ short estSegSize = estimateSegmentSize(child);
+ long glbIndex = preAllocateSegment(estSegSize, cxt);
+ SchemaFile.setNodeAddress(child, glbIndex);
+ }
+ childBuffer = RecordUtils.node2Buffer(child);
+
// prepare alias comparison
if (child.isMeasurement() && child.getAsMeasurementMNode().getAlias() !=
null) {
alias = child.getAsMeasurementMNode().getAlias();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index 75a8ab8bc00..bb25ea10092 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -315,6 +315,39 @@ public class SchemaFileTest {
sf.close();
}
+ @Test
+ public void testFlushUpdatedChildWithNegativeSegmentAddress() throws
Exception {
+ ISchemaFile sf = SchemaFile.initSchemaFile("root.sg",
TEST_SCHEMA_REGION_ID);
+ ICachedMNode sgNode = nodeFactory.createDatabaseDeviceMNode(null,
"sg").getAsMNode();
+ ICachedMNode device = nodeFactory.createDeviceMNode(sgNode,
"d1").getAsMNode();
+ sgNode.addChild(device);
+
+ writeMNodeInTest(sf, sgNode);
+
+ // Typical flush order: the parent already has this child record on disk,
while the updated
+ // child object in memory may still have no valid segment address.
+
ICachedMNodeContainer.getCachedMNodeContainer(device).setSegmentAddress(-1L);
+ addNodeToUpdateBuffer(sgNode, device);
+ writeMNodeInTest(sf, sgNode);
+
+ Assert.assertTrue(getSegAddrInContainer(device) >= 0);
+
+ device.addChild(getMeasurementNode(device, "s1", "alias_s1"));
+ writeMNodeInTest(sf, device);
+ Assert.assertEquals(
+ "alias_s1", sf.getChildNode(device,
"s1").getAsMeasurementMNode().getAlias());
+
+ long deviceSegmentAddress = getSegAddrInContainer(device);
+ sf.close();
+
+ sf = SchemaFile.loadSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
+ ICachedMNode loadedDevice = sf.getChildNode(sgNode, "d1");
+ Assert.assertEquals(deviceSegmentAddress,
getSegAddrInContainer(loadedDevice));
+ Assert.assertEquals(
+ "alias_s1", sf.getChildNode(loadedDevice,
"s1").getAsMeasurementMNode().getAlias());
+ sf.close();
+ }
+
@Test
public void testMassiveSegment() throws MetadataException, IOException {
ICachedMNode dbNode = nodeFactory.createDatabaseDeviceMNode(null,
"sgRoot");