This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new d39d22fa63a Support sink config key for pipe request slicing (#17858) 
(#17883)
d39d22fa63a is described below

commit d39d22fa63afe7885e77cd93ea74ddbf8576f9cf
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 18:17:49 2026 +0800

    Support sink config key for pipe request slicing (#17858) (#17883)
    
    * Support sink config key for pipe request slicing
    
    * Support processor output series aliases
---
 .../it/env/cluster/config/MppCommonConfig.java     |  2 +-
 .../twostage/plugin/TwoStageCountProcessor.java    | 22 ++++++--
 .../plugin/TwoStageCountProcessorTest.java         | 57 +++++++++++++++++++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  8 ++-
 .../commons/pipe/config/PipeDescriptorTest.java    | 65 ++++++++++++++++++++++
 5 files changed, 146 insertions(+), 8 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index d9017df36e7..8109bf02c7e 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -508,7 +508,7 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
   public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
       int pipeConnectorRequestSliceThresholdBytes) {
     setProperty(
-        "pipe_connector_request_slice_threshold_bytes",
+        "pipe_sink_request_slice_threshold_bytes",
         String.valueOf(pipeConnectorRequestSliceThresholdBytes));
 
     return this;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
index 16a20a5f509..340ac085a01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -74,6 +74,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class TwoStageCountProcessor implements PipeProcessor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwoStageCountProcessor.class);
+  private static final String LEGACY_PROCESSOR_OUTPUT_SERIES_KEY = 
"processor.output.series";
 
   private String pipeName;
   private long creationTime;
@@ -98,10 +99,17 @@ public class TwoStageCountProcessor implements 
PipeProcessor {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    
validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+    validator.validateSynonymAttributes(
+        
Collections.singletonList(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY),
+        Collections.singletonList(LEGACY_PROCESSOR_OUTPUT_SERIES_KEY),
+        true);
 
     final String rawOutputSeries =
-        
validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY);
+        validator
+            .getParameters()
+            .getStringByKeys(
+                PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY,
+                LEGACY_PROCESSOR_OUTPUT_SERIES_KEY);
     try {
       PathUtils.isLegalPath(rawOutputSeries);
     } catch (IllegalPathException e) {
@@ -119,8 +127,7 @@ public class TwoStageCountProcessor implements 
PipeProcessor {
     regionId = runtimeEnvironment.getRegionId();
     pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
 
-    outputSeries =
-        new 
PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY));
+    outputSeries = parseOutputSeries(parameters);
 
     if (Objects.nonNull(pipeTaskMeta) && 
Objects.nonNull(pipeTaskMeta.getProgressIndex())) {
       if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) {
@@ -152,6 +159,13 @@ public class TwoStageCountProcessor implements 
PipeProcessor {
     twoStageAggregateSender = new TwoStageAggregateSender(pipeName, 
creationTime);
   }
 
+  static PartialPath parseOutputSeries(final PipeParameters parameters)
+      throws IllegalPathException {
+    return new PartialPath(
+        parameters.getStringByKeys(
+            PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY, 
LEGACY_PROCESSOR_OUTPUT_SERIES_KEY));
+  }
+
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
       throws Exception {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java
new file mode 100644
index 00000000000..2957ffd4e3e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.twostage.plugin;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class TwoStageCountProcessorTest {
+
+  @Test
+  public void testOutputSeriesSupportsNewAndLegacyKeys() throws Exception {
+    Assert.assertEquals(
+        "root.db.d.s1", parseOutputSeries("processor.output.series", 
"root.db.d.s1").getFullPath());
+    Assert.assertEquals(
+        "root.db.d.s2", parseOutputSeries("processor.output-series", 
"root.db.d.s2").getFullPath());
+  }
+
+  @Test
+  public void testValidateOutputSeriesSupportsNewAndLegacyKeys() throws 
Exception {
+    validateOutputSeries("processor.output.series", "root.db.d.s1");
+    validateOutputSeries("processor.output-series", "root.db.d.s2");
+  }
+
+  private PartialPath parseOutputSeries(final String key, final String value) 
throws Exception {
+    return TwoStageCountProcessor.parseOutputSeries(
+        new PipeParameters(Collections.singletonMap(key, value)));
+  }
+
+  private void validateOutputSeries(final String key, final String value) 
throws Exception {
+    new TwoStageCountProcessor()
+        .validate(
+            new PipeParameterValidator(new 
PipeParameters(Collections.singletonMap(key, value))));
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 83db711e5ec..ee035ba3eb1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -445,9 +445,11 @@ public class PipeDescriptor {
 
     config.setPipeSinkRequestSliceThresholdBytes(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_connector_request_slice_threshold_bytes",
-                
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_request_slice_threshold_bytes"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_request_slice_threshold_bytes",
+                        
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes())))));
 
     config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java
new file mode 100644
index 00000000000..00d98212271
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/config/PipeDescriptorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.TrimProperties;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PipeDescriptorTest {
+
+  private final CommonConfig config = 
CommonDescriptor.getInstance().getConfig();
+
+  private int originalRequestSliceThresholdBytes;
+
+  @Before
+  public void setUp() {
+    originalRequestSliceThresholdBytes = 
config.getPipeSinkRequestSliceThresholdBytes();
+  }
+
+  @After
+  public void tearDown() {
+    
config.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+  }
+
+  @Test
+  public void testPipeRequestSliceThresholdSupportsSinkAndConnectorKeys() {
+    final TrimProperties connectorProperties = new TrimProperties();
+    
connectorProperties.setProperty("pipe_connector_request_slice_threshold_bytes", 
"123");
+    PipeDescriptor.loadPipeInternalConfig(config, connectorProperties);
+    Assert.assertEquals(123, config.getPipeSinkRequestSliceThresholdBytes());
+
+    final TrimProperties sinkProperties = new TrimProperties();
+    sinkProperties.setProperty("pipe_sink_request_slice_threshold_bytes", 
"456");
+    PipeDescriptor.loadPipeInternalConfig(config, sinkProperties);
+    Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
+
+    final TrimProperties bothProperties = new TrimProperties();
+    bothProperties.setProperty("pipe_connector_request_slice_threshold_bytes", 
"123");
+    bothProperties.setProperty("pipe_sink_request_slice_threshold_bytes", 
"456");
+    PipeDescriptor.loadPipeInternalConfig(config, bothProperties);
+    Assert.assertEquals(456, config.getPipeSinkRequestSliceThresholdBytes());
+  }
+}

Reply via email to