This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new aaa5729075a Fix PBTree flush for negative child address (#17955) 
(#17978)
aaa5729075a is described below

commit aaa5729075ab659b200747983a7f1b879eff19cc
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 18:40:59 2026 +0800

    Fix PBTree flush for negative child address (#17955) (#17978)
    
    * Fix PBTree flush for negative child address
    
    * Address PBTree flush review comments
    
    (cherry picked from commit 8d56ae71e5fda188854bf78463d4d4b45488d2ed)
---
 .../mtree/impl/pbtree/flush/Scheduler.java         | 69 +++++++++++++---------
 .../pbtree/schemafile/pagemgr/PageManager.java     |  9 ++-
 .../metadata/mtree/schemafile/SchemaFileTest.java  | 33 +++++++++++
 3 files changed, 80 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
index b467b473347..9ef0fbf0f30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/flush/Scheduler.java
@@ -76,7 +76,8 @@ public class Scheduler {
     this.releaseFlushStrategy = releaseFlushStrategy;
   }
 
-  private void executeFlush(CachedMTreeStore store, int regionId, 
AtomicInteger remainToFlush) {
+  private void executeFlush(
+      CachedMTreeStore store, int regionId, AtomicInteger remainToFlush, 
boolean propagateFailure) {
     IMemoryManager memoryManager = store.getMemoryManager();
     ISchemaFile file = store.getSchemaFile();
     LockManager lockManager = store.getLockManager();
@@ -97,6 +98,9 @@ public class Scheduler {
           regionId,
           e.getMessage(),
           e);
+      if (propagateFailure) {
+        throw new RuntimeException(e);
+      }
     } finally {
       long time = System.currentTimeMillis() - startTime;
       if (time > 10_000) {
@@ -145,22 +149,26 @@ public class Scheduler {
                     CompletableFuture.runAsync(
                         () -> {
                           int regionId = entry.getKey();
-                          CachedMTreeStore store = entry.getValue();
-                          if (store == null) {
-                            // store has been closed
-                            return;
-                          }
-                          LockManager lockManager = store.getLockManager();
-                          lockManager.globalReadLock();
-                          if (!regionToStore.containsKey(regionId)) {
-                            // double check store have not been closed
-                            return;
-                          }
                           try {
-                            executeFlush(store, regionId, null);
-                            executeRelease(store, false);
+                            CachedMTreeStore store = entry.getValue();
+                            if (store == null) {
+                              // store has been closed
+                              return;
+                            }
+                            LockManager lockManager = store.getLockManager();
+                            lockManager.globalReadLock();
+                            try {
+                              if (!regionToStore.containsKey(regionId)) {
+                                // double check store have not been closed
+                                return;
+                              }
+                              executeFlush(store, regionId, null, true);
+                              executeRelease(store, false);
+                            } finally {
+                              lockManager.globalReadUnlock();
+                            }
                           } finally {
-                            lockManager.globalReadUnlock();
+                            flushingRegionSet.remove(regionId);
                           }
                         },
                         workerPool))
@@ -221,22 +229,25 @@ public class Scheduler {
       flushingRegionSet.add(regionId);
       workerPool.submit(
           () -> {
-            CachedMTreeStore store = regionToStore.get(regionId);
-            if (store == null) {
-              // store has been closed
-              return;
-            }
-            LockManager lockManager = store.getLockManager();
-            lockManager.globalReadLock();
-            if (!regionToStore.containsKey(regionId)) {
-              // double check store have not been closed
-              return;
-            }
             try {
-
-              executeFlush(store, regionId, remainToFlush);
+              CachedMTreeStore store = regionToStore.get(regionId);
+              if (store == null) {
+                // store has been closed
+                return;
+              }
+              LockManager lockManager = store.getLockManager();
+              lockManager.globalReadLock();
+              try {
+                if (!regionToStore.containsKey(regionId)) {
+                  // double check store have not been closed
+                  return;
+                }
+                executeFlush(store, regionId, remainToFlush, false);
+              } finally {
+                lockManager.globalReadUnlock();
+              }
             } finally {
-              lockManager.globalReadUnlock();
+              flushingRegionSet.remove(regionId);
             }
           });
       if (remainToFlush.get() <= 0) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
index 06d8c279844..56e0b5a6dc2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageManager.java
@@ -292,8 +292,6 @@ public abstract class PageManager implements IPageManager {
             .entrySet()) {
       child = entry.getValue();
       actualAddress = getTargetSegmentAddress(curSegAddr, entry.getKey(), cxt);
-      childBuffer = RecordUtils.node2Buffer(child);
-
       curPage = getPageInstance(SchemaFile.getPageIndex(actualAddress), cxt);
       if 
(curPage.getAsSegmentedPage().read(SchemaFile.getSegIndex(actualAddress), 
entry.getKey())
           == null) {
@@ -302,6 +300,13 @@ public abstract class PageManager implements IPageManager {
                 "Node[%s] has no child[%s] in pbtree file.", node.getName(), 
entry.getKey()));
       }
 
+      if (!child.isMeasurement() && getNodeAddress(child) < 0) {
+        short estSegSize = estimateSegmentSize(child);
+        long glbIndex = preAllocateSegment(estSegSize, cxt);
+        SchemaFile.setNodeAddress(child, glbIndex);
+      }
+      childBuffer = RecordUtils.node2Buffer(child);
+
       // prepare alias comparison
       if (child.isMeasurement() && child.getAsMeasurementMNode().getAlias() != 
null) {
         alias = child.getAsMeasurementMNode().getAlias();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index 75a8ab8bc00..bb25ea10092 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -315,6 +315,39 @@ public class SchemaFileTest {
     sf.close();
   }
 
+  @Test
+  public void testFlushUpdatedChildWithNegativeSegmentAddress() throws 
Exception {
+    ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", 
TEST_SCHEMA_REGION_ID);
+    ICachedMNode sgNode = nodeFactory.createDatabaseDeviceMNode(null, 
"sg").getAsMNode();
+    ICachedMNode device = nodeFactory.createDeviceMNode(sgNode, 
"d1").getAsMNode();
+    sgNode.addChild(device);
+
+    writeMNodeInTest(sf, sgNode);
+
+    // Typical flush order: the parent already has this child record on disk, 
while the updated
+    // child object in memory may still have no valid segment address.
+    
ICachedMNodeContainer.getCachedMNodeContainer(device).setSegmentAddress(-1L);
+    addNodeToUpdateBuffer(sgNode, device);
+    writeMNodeInTest(sf, sgNode);
+
+    Assert.assertTrue(getSegAddrInContainer(device) >= 0);
+
+    device.addChild(getMeasurementNode(device, "s1", "alias_s1"));
+    writeMNodeInTest(sf, device);
+    Assert.assertEquals(
+        "alias_s1", sf.getChildNode(device, 
"s1").getAsMeasurementMNode().getAlias());
+
+    long deviceSegmentAddress = getSegAddrInContainer(device);
+    sf.close();
+
+    sf = SchemaFile.loadSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
+    ICachedMNode loadedDevice = sf.getChildNode(sgNode, "d1");
+    Assert.assertEquals(deviceSegmentAddress, 
getSegAddrInContainer(loadedDevice));
+    Assert.assertEquals(
+        "alias_s1", sf.getChildNode(loadedDevice, 
"s1").getAsMeasurementMNode().getAlias());
+    sf.close();
+  }
+
   @Test
   public void testMassiveSegment() throws MetadataException, IOException {
     ICachedMNode dbNode = nodeFactory.createDatabaseDeviceMNode(null, 
"sgRoot");

Reply via email to