This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new becc0c4a305 Optimized the clear logic of Schema Region && Pipe: Fixed 
the bug of config listen type & The historical pipe does not work for 
deletion-only sync (#17553)
becc0c4a305 is described below

commit becc0c4a30523411f86d12943b13039edc05dd44
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 29 10:08:53 2026 +0800

    Optimized the clear logic of Schema Region && Pipe: Fixed the bug of config 
listen type & The historical pipe does not work for deletion-only sync (#17553)
    
    * Fix
    
    * tag
---
 .../pipe/source/ConfigRegionListeningFilter.java   |   8 +-
 .../source/ConfigRegionListeningFilterTest.java    | 101 +++++++++++++++++++++
 ...istoricalDataRegionTsFileAndDeletionSource.java |   6 +-
 .../schemaregion/impl/SchemaRegionMemoryImpl.java  |   4 +-
 ...ricalDataRegionTsFileAndDeletionSourceTest.java |  44 +++++++++
 5 files changed, 153 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
index 5911ee8fc89..ff2cdd5649f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java
@@ -229,8 +229,8 @@ public class ConfigRegionListeningFilter {
                   ConfigPhysicalPlanType.RevokeUser,
                   ConfigPhysicalPlanType.RevokeRoleFromUser,
                   ConfigPhysicalPlanType.RRevokeUserRole,
-                  ConfigPhysicalPlanType.RGrantUserAll,
-                  ConfigPhysicalPlanType.RGrantUserSysPri)));
+                  ConfigPhysicalPlanType.RRevokeUserAll,
+                  ConfigPhysicalPlanType.RRevokeUserSysPri)));
 
       // Table
       OPTION_PLAN_MAP.put(
@@ -266,7 +266,7 @@ public class ConfigRegionListeningFilter {
             && !((DeleteDatabasePlan) 
plan).getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
       case CreateDatabase:
       case AlterDatabase:
-        return !(((DatabaseSchemaPlan) plan)
+        return !((DatabaseSchemaPlan) plan)
                 .getSchema()
                 .getName()
                 .equals(SchemaConstant.SYSTEM_DATABASE)
@@ -277,7 +277,7 @@ public class ConfigRegionListeningFilter {
             && !((DatabaseSchemaPlan) plan)
                 .getSchema()
                 .getName()
-                .equals(Audit.TABLE_MODEL_AUDIT_DATABASE));
+                .equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
       // Table under audit db
       case PipeCreateTableOrView:
         return !((PipeCreateTableOrViewPlan) plan)
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilterTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilterTest.java
new file mode 100644
index 00000000000..46796a6da62
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.source;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.Audit;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Set;
+
+public class ConfigRegionListeningFilterTest {
+
+  @Test
+  public void testAuthUserRevokeUsesRevokeRelationalPlans() throws Exception {
+    final Set<ConfigPhysicalPlanType> listenedPlanTypes =
+        ConfigRegionListeningFilter.parseListeningPlanTypeSet(
+            new PipeParameters(
+                new HashMap<String, String>() {
+                  {
+                    put(PipeSourceConstant.EXTRACTOR_INCLUSION_KEY, 
"auth.user.revoke");
+                  }
+                }));
+
+    
Assert.assertTrue(listenedPlanTypes.contains(ConfigPhysicalPlanType.RevokeUser));
+    
Assert.assertTrue(listenedPlanTypes.contains(ConfigPhysicalPlanType.RevokeRoleFromUser));
+    
Assert.assertTrue(listenedPlanTypes.contains(ConfigPhysicalPlanType.RRevokeUserRole));
+    
Assert.assertTrue(listenedPlanTypes.contains(ConfigPhysicalPlanType.RRevokeUserAll));
+    
Assert.assertTrue(listenedPlanTypes.contains(ConfigPhysicalPlanType.RRevokeUserSysPri));
+    
Assert.assertFalse(listenedPlanTypes.contains(ConfigPhysicalPlanType.RGrantUserAll));
+    
Assert.assertFalse(listenedPlanTypes.contains(ConfigPhysicalPlanType.RGrantUserSysPri));
+  }
+
+  @Test
+  public void testInternalDatabasesAreFilteredForCreateAndAlter() {
+    Assert.assertFalse(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.CreateDatabase,
+                new TDatabaseSchema(SchemaConstant.SYSTEM_DATABASE))));
+    Assert.assertFalse(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.CreateDatabase,
+                new TDatabaseSchema(SchemaConstant.AUDIT_DATABASE))));
+    Assert.assertFalse(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.CreateDatabase,
+                new TDatabaseSchema(Audit.TABLE_MODEL_AUDIT_DATABASE))));
+
+    Assert.assertFalse(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.AlterDatabase,
+                new TDatabaseSchema(SchemaConstant.SYSTEM_DATABASE))));
+    Assert.assertFalse(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.AlterDatabase,
+                new TDatabaseSchema(SchemaConstant.AUDIT_DATABASE))));
+    Assert.assertFalse(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.AlterDatabase,
+                new TDatabaseSchema(Audit.TABLE_MODEL_AUDIT_DATABASE))));
+
+    Assert.assertTrue(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.CreateDatabase, new 
TDatabaseSchema("root.db"))));
+    Assert.assertTrue(
+        ConfigRegionListeningFilter.shouldPlanBeListened(
+            new DatabaseSchemaPlan(
+                ConfigPhysicalPlanType.AlterDatabase, new 
TDatabaseSchema("root.db"))));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 9fcae5c8144..6da04faeb3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -303,10 +303,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSource
       throws IllegalPathException {
     shouldExtractInsertion = listeningOptionPair.getLeft();
     shouldExtractDeletion = listeningOptionPair.getRight();
-    // Do nothing if extract deletion
-    if (!shouldExtractInsertion) {
-      return;
-    }
 
     final PipeRuntimeEnvironment environment = 
configuration.getRuntimeEnvironment();
 
@@ -478,7 +474,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
 
   @Override
   public synchronized void start() {
-    if (!shouldExtractInsertion) {
+    if (!shouldExtractInsertion && !shouldExtractDeletion) {
       hasBeenStarted = true;
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 754cca02092..8b37d467492 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -454,7 +454,9 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
         logWriter.close();
         logWriter = null;
       }
-      tagManager.clear();
+      if (tagManager != null) {
+        tagManager.clear();
+      }
 
       isRecovering = true;
       initialized = false;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
index 14f97ef79d6..93580e639eb 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
@@ -19,10 +19,17 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.historical;
 
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.junit.Assert;
@@ -35,12 +42,40 @@ import java.nio.file.Files;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 public class PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
 
+  @Test
+  public void testDeletionOnlyCustomizeInitializesSourceContext() throws 
Exception {
+    final PipeHistoricalDataRegionTsFileAndDeletionSource source =
+        new PipeHistoricalDataRegionTsFileAndDeletionSource();
+    final PipeParameters parameters =
+        new PipeParameters(
+            new HashMap<String, String>() {
+              {
+                put(PipeSourceConstant.EXTRACTOR_INCLUSION_KEY, "data.delete");
+              }
+            });
+
+    source.validate(new PipeParameterValidator(parameters));
+    source.customize(
+        parameters,
+        new PipeTaskRuntimeConfiguration(
+            new PipeTaskSourceRuntimeEnvironment(
+                "pipe", 1, 123, new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))));
+
+    Assert.assertEquals("pipe", getPrivateField(source, "pipeName"));
+    Assert.assertEquals(123, getPrivateField(source, "dataRegionId"));
+    Assert.assertEquals(false, getPrivateField(source, 
"shouldExtractInsertion"));
+    Assert.assertEquals(true, getPrivateField(source, 
"shouldExtractDeletion"));
+    Assert.assertNotNull(getPrivateField(source, "treePattern"));
+    Assert.assertNotNull(getPrivateField(source, "tablePattern"));
+  }
+
   @Test
   public void 
testSupplyReturnsProgressReportEventAfterSkippingDuplicateHistoricalTsFile()
       throws Exception {
@@ -118,6 +153,15 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
     field.set(source, value);
   }
 
+  private static Object getPrivateField(
+      final PipeHistoricalDataRegionTsFileAndDeletionSource source, final 
String fieldName)
+      throws ReflectiveOperationException {
+    final Field field =
+        
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    return field.get(source);
+  }
+
   private static class TestablePipeHistoricalDataRegionTsFileAndDeletionSource
       extends PipeHistoricalDataRegionTsFileAndDeletionSource {
 

Reply via email to