This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b33deab459 Pipe: Fixed the bug that mod may not be released in 
historical pipe (#17379)
6b33deab459 is described below

commit 6b33deab459283f11578ac4c5993e9432790ced7
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 31 10:10:43 2026 +0800

    Pipe: Fixed the bug that mod may not be released in historical pipe (#17379)
    
    * z
    
    * shop
    
    * fix
    
    * fix
    
    * Update IoTDBPipeDataSinkIT.java
---
 .../treemodel/auto/basic/IoTDBPipeDataSinkIT.java  | 74 +++++++++++++++++-----
 .../resource/tsfile/PipeTsFileResourceManager.java | 15 +++--
 ...istoricalDataRegionTsFileAndDeletionSource.java |  6 +-
 3 files changed, 73 insertions(+), 22 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
index 36a0ea0cc92..de1acfa32c7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
@@ -65,6 +65,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
   protected void setupConfig() {
     super.setupConfig();
     senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+    senderEnv.getConfig().getCommonConfig().setPipeAutoSplitFullEnabled(true);
   }
 
   @Test
@@ -105,9 +106,6 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
 
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
-
       // Do not fail if the failure has nothing to do with pipe
       // Because the failures will randomly generate due to resource limitation
       TestUtils.executeNonQueries(
@@ -172,9 +170,6 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
                       .setProcessorAttributes(processorAttributes))
               .getCode());
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
-
       // Do not fail if the failure has nothing to do with pipe
       // Because the failures will randomly generate due to resource limitation
       TestUtils.executeNonQueries(
@@ -236,9 +231,6 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
 
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
-
       // Do not fail if the failure has nothing to do with pipe
       // Because the failures will randomly generate due to resource limitation
       TestUtils.executeNonQueries(
@@ -416,9 +408,6 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
                       .setProcessorAttributes(processorAttributes))
               .getCode());
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
-
       // Do not fail if the failure has nothing to do with pipe
       // Because the failures will randomly generate due to resource limitation
       TestUtils.executeNonQueries(
@@ -512,9 +501,6 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
                       .setProcessorAttributes(processorAttributes))
               .getCode());
 
-      Assert.assertEquals(
-          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
-
       TestUtils.executeNonQueries(
           senderEnv,
           Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)", 
"flush"),
@@ -576,4 +562,62 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
                 "1635232151960,null,null,2.0,2.1,null,",
                 "1635232143960,6.0,4.0,null,null,null,")));
   }
+
+  @Test
+  public void testTransferMods() {
+    try {
+      TestUtils.executeNonQueries(
+          senderEnv,
+          Arrays.asList(
+              "create database root.sg_nonAligned",
+              "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 
with datatype=boolean, encoding=RLE,compressor=snappy",
+              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 
with datatype=int32, encoding=PLAIN,compressor=LZ4",
+              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 
with datatype=int64,encoding=gorilla,compressor=uncompressed",
+              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 
with datatype=float,encoding=chimp,compressor=gzip",
+              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 
with datatype=double,encoding=ts_2diff,compressor=zstd",
+              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 
with datatype=text,encoding=dictionary,compressor=lzma2",
+              "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, 
s2,s3,s4,s0,s5) 
values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 
device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5
 [...]
+              "flush",
+              "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
+              String.format(
+                  "create pipe test with source 
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
 with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+                  
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+          "count(timeseries),",
+          Collections.singleton("5,"));
+
+      TestUtils.executeNonQueries(
+          senderEnv, Arrays.asList("drop pipe test_history", "drop pipe 
test_realtime"));
+
+      TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
+
+      TestUtils.executeNonQueries(
+          senderEnv,
+          Arrays.asList(
+              "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
+              String.format(
+                  "create pipe test with source 
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
 with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+                  
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+          "count(timeseries),",
+          Collections.singleton("4,"),
+          15);
+      TestUtils.assertDataAlwaysOnEnv(
+          receiverEnv,
+          "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+          "count(timeseries),",
+          Collections.singleton("4,"));
+    } finally {
+      TestUtils.executeNonQueries(
+          senderEnv,
+          Arrays.asList(
+              "drop pipe test_history", "drop pipe test_realtime", "drop 
database root.**"));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index c1e4ff9ddf1..c84504fa52b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -355,13 +355,18 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public void unpinTsFileResource(final TsFileResource resource, final 
@Nullable String pipeName)
+  public void unpinTsFileResource(
+      final TsFileResource resource,
+      final boolean shouldTransferModFile,
+      final @Nullable String pipeName)
       throws IOException {
-    final File pinnedFile = 
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName);
-    decreaseFileReference(pinnedFile, pipeName);
+    decreaseFileReference(
+        getHardlinkOrCopiedFileInPipeDir(resource.getTsFile(), pipeName), 
pipeName);
 
-    if (resource.sharedModFileExists()) {
-      decreaseFileReference(resource.getSharedModFile().getFile(), pipeName);
+    if (shouldTransferModFile && resource.exclusiveModFileExists()) {
+      decreaseFileReference(
+          
getHardlinkOrCopiedFileInPipeDir(resource.getExclusiveModFile().getFile(), 
pipeName),
+          pipeName);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 32fcece4115..66f8d48ce28 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -904,7 +904,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
       return isReferenceCountIncreased ? event : null;
     } finally {
       try {
-        PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource, 
pipeName);
+        PipeDataNodeResourceManager.tsfile()
+            .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
       } catch (final IOException e) {
         LOGGER.warn(
             "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
@@ -989,7 +990,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
             if (resource instanceof TsFileResource) {
               try {
                 PipeDataNodeResourceManager.tsfile()
-                    .unpinTsFileResource((TsFileResource) resource, pipeName);
+                    .unpinTsFileResource(
+                        (TsFileResource) resource, shouldTransferModFile, 
pipeName);
               } catch (final IOException e) {
                 LOGGER.warn(
                     "Pipe {}@{}: failed to unpin TsFileResource after dropping 
pipe, original path: {}",

Reply via email to