This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch batch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 534e4931ba0add083563d3a9c28ce50d7061868f
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 14:48:15 2026 +0800

    Schema
---
 .../metric/schema/PipeSchemaRegionSinkMetrics.java |  58 +++
 .../batch/PipeSchemaRegionWritePlanEventBatch.java | 526 +++++++++++++++++++++
 .../airgap/IoTDBSchemaRegionAirGapSink.java        | 143 +++++-
 .../thrift/sync/IoTDBSchemaRegionSink.java         | 151 +++++-
 .../schema/PipeSchemaRegionSinkMetricsTest.java    | 118 +++++
 .../db/pipe/sink/PipeSchemaRegionSinkTest.java     | 317 +++++++++++++
 .../PipeSchemaRegionWritePlanEventBatchTest.java   | 464 ++++++++++++++++++
 7 files changed, 1747 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
index c6e8ad52950..4b2de4e7de0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.metrics.type.Rate;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
@@ -56,6 +57,7 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
 
   private void createMetrics(final String taskID) {
     createRate(taskID);
+    createHistogram(taskID);
   }
 
   private void createRate(final String taskID) {
@@ -72,6 +74,38 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
             String.valueOf(connector.getCreationTime())));
   }
 
+  private void createHistogram(final String taskID) {
+    final PipeSinkSubtask connector = connectorMap.get(taskID);
+
+    final Histogram schemaBatchSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime()));
+    connector.setTabletBatchSizeHistogram(schemaBatchSizeHistogram);
+
+    final Histogram schemaBatchTimeIntervalHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString(),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime()));
+    
connector.setTabletBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram);
+
+    final Histogram schemaBatchEventSizeHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            connector.getAttributeSortedString());
+    connector.setEventSizeHistogram(schemaBatchEventSizeHistogram);
+  }
+
   @Override
   public void unbindFrom(final AbstractMetricService metricService) {
     ImmutableSet.copyOf(connectorMap.keySet()).forEach(this::deregister);
@@ -83,6 +117,7 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
 
   private void removeMetrics(final String taskID) {
     removeRate(taskID);
+    removeHistogram(taskID);
   }
 
   private void removeRate(final String taskID) {
@@ -98,6 +133,29 @@ public class PipeSchemaRegionSinkMetrics implements 
IMetricSet {
     schemaRateMap.remove(taskID);
   }
 
+  private void removeHistogram(final String taskID) {
+    final PipeSinkSubtask connector = connectorMap.get(taskID);
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString(),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+        Tag.NAME.toString(),
+        connector.getAttributeSortedString());
+  }
+
   //////////////////////////// Register & deregister (pipe integration) 
////////////////////////////
 
   public void register(final PipeSinkSubtask pipeSinkSubtask) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java
new file mode 100644
index 00000000000..e03ac6b63ef
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeSchemaRegionWritePlanEventBatch.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup;
+import org.apache.iotdb.metrics.impl.DoNothingHistogram;
+import org.apache.iotdb.metrics.type.Histogram;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_DELAY_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY;
+
+public class PipeSchemaRegionWritePlanEventBatch implements AutoCloseable {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeSchemaRegionWritePlanEventBatch.class);
+
+  private static final PlanNodeId EMPTY_PLAN_NODE_ID = new PlanNodeId("");
+
+  private final int maxDelayInMs;
+  private final long maxBatchSizeInBytes;
+  private final PipeMemoryBlock allocatedMemoryBlock;
+
+  private final List<EnrichedEvent> events = new ArrayList<>();
+
+  private final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap = 
new HashMap<>();
+  private final Map<PartialPath, Pair<Integer, Integer>> templateActivationMap 
= new HashMap<>();
+
+  private BatchType batchType = BatchType.NONE;
+  private String pipeName;
+  private long creationTime;
+
+  private long totalBufferSize = 0;
+  private long firstEventProcessingTime = Long.MIN_VALUE;
+
+  private volatile boolean isClosed = false;
+
+  private Histogram batchSizeHistogram = new DoNothingHistogram();
+  private Histogram batchTimeIntervalHistogram = new DoNothingHistogram();
+  private Histogram eventSizeHistogram = new DoNothingHistogram();
+
+  public PipeSchemaRegionWritePlanEventBatch(final PipeParameters parameters) {
+    final Integer requestMaxDelayInMillis =
+        parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, 
SINK_IOTDB_BATCH_DELAY_MS_KEY);
+    if (Objects.isNull(requestMaxDelayInMillis)) {
+      final int requestMaxDelayConfig =
+          parameters.getIntOrDefault(
+              Arrays.asList(
+                  CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY, 
SINK_IOTDB_BATCH_DELAY_SECONDS_KEY),
+              CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE);
+      maxDelayInMs = requestMaxDelayConfig < 0 ? Integer.MAX_VALUE : 
requestMaxDelayConfig;
+    } else {
+      maxDelayInMs = requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInMillis;
+    }
+
+    maxBatchSizeInBytes =
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+            CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
+    allocatedMemoryBlock = 
PipeDataNodeResourceManager.memory().forceAllocate(maxBatchSizeInBytes);
+  }
+
+  public synchronized boolean onEvent(final PipeSchemaRegionWritePlanEvent 
event) {
+    if (isClosed || !canBatch(event)) {
+      return false;
+    }
+
+    if (events.isEmpty() || !Objects.equals(events.get(events.size() - 1), 
event)) {
+      if 
(!event.increaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName()))
 {
+        LOGGER.warn("Cannot increase reference count for event: {}, ignore it 
in batch.", event);
+        return true;
+      }
+
+      try {
+        if (Objects.isNull(pipeName)) {
+          pipeName = event.getPipeName();
+          creationTime = event.getCreationTime();
+        }
+        appendPlanNode(event.getPlanNode());
+        totalBufferSize += event.getPlanNode().serializeToByteBuffer().limit();
+        events.add(event);
+      } catch (final Exception e) {
+        
event.decreaseReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName(),
 false);
+        throw e;
+      }
+
+      if (firstEventProcessingTime == Long.MIN_VALUE) {
+        firstEventProcessingTime = System.currentTimeMillis();
+      }
+    }
+
+    return true;
+  }
+
+  private boolean canBatch(final PipeSchemaRegionWritePlanEvent event) {
+    final BatchType eventBatchType = resolveBatchType(event.getPlanNode());
+    if (eventBatchType == BatchType.NONE || 
containsNonEmptyProps(event.getPlanNode())) {
+      return false;
+    }
+
+    if (events.isEmpty()) {
+      return !hasAlignmentConflict(event.getPlanNode());
+    }
+
+    return Objects.equals(pipeName, event.getPipeName())
+        && creationTime == event.getCreationTime()
+        && batchType == eventBatchType
+        && !hasAlignmentConflict(event.getPlanNode());
+  }
+
+  private BatchType resolveBatchType(final PlanNode planNode) {
+    switch (planNode.getType()) {
+      case CREATE_TIME_SERIES:
+      case CREATE_ALIGNED_TIME_SERIES:
+      case CREATE_MULTI_TIME_SERIES:
+      case INTERNAL_CREATE_TIME_SERIES:
+      case INTERNAL_CREATE_MULTI_TIMESERIES:
+        return BatchType.TIMESERIES;
+      case ACTIVATE_TEMPLATE:
+      case BATCH_ACTIVATE_TEMPLATE:
+      case INTERNAL_BATCH_ACTIVATE_TEMPLATE:
+        return BatchType.TEMPLATE_ACTIVATE;
+      default:
+        return BatchType.NONE;
+    }
+  }
+
+  private boolean containsNonEmptyProps(final PlanNode planNode) {
+    switch (planNode.getType()) {
+      case CREATE_TIME_SERIES:
+        return hasNonEmptyProps(((CreateTimeSeriesNode) planNode).getProps());
+      case CREATE_MULTI_TIME_SERIES:
+        return ((CreateMultiTimeSeriesNode) planNode)
+            .getMeasurementGroupMap().values().stream()
+                
.anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps);
+      case INTERNAL_CREATE_TIME_SERIES:
+        return hasNonEmptyProps(((InternalCreateTimeSeriesNode) 
planNode).getMeasurementGroup());
+      case INTERNAL_CREATE_MULTI_TIMESERIES:
+        return ((InternalCreateMultiTimeSeriesNode) planNode)
+            .getDeviceMap().values().stream()
+                .map(Pair::getRight)
+                
.anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps);
+      default:
+        return false;
+    }
+  }
+
+  private static boolean hasNonEmptyProps(final MeasurementGroup 
measurementGroup) {
+    return Objects.nonNull(measurementGroup.getPropsList())
+        && measurementGroup.getPropsList().stream()
+            .anyMatch(PipeSchemaRegionWritePlanEventBatch::hasNonEmptyProps);
+  }
+
+  private static boolean hasNonEmptyProps(final Map<String, String> props) {
+    return Objects.nonNull(props) && !props.isEmpty();
+  }
+
+  private boolean hasAlignmentConflict(final PlanNode planNode) {
+    switch (planNode.getType()) {
+      case CREATE_TIME_SERIES:
+        return hasAlignmentConflict(
+            ((CreateTimeSeriesNode) planNode).getPath().getDevicePath(), 
false);
+      case CREATE_ALIGNED_TIME_SERIES:
+        return hasAlignmentConflict(((CreateAlignedTimeSeriesNode) 
planNode).getDevicePath(), true);
+      case CREATE_MULTI_TIME_SERIES:
+        return ((CreateMultiTimeSeriesNode) planNode)
+            .getMeasurementGroupMap().keySet().stream()
+                .anyMatch(devicePath -> hasAlignmentConflict(devicePath, 
false));
+      case INTERNAL_CREATE_TIME_SERIES:
+        return hasAlignmentConflict(
+            ((InternalCreateTimeSeriesNode) planNode).getDevicePath(),
+            ((InternalCreateTimeSeriesNode) planNode).isAligned());
+      case INTERNAL_CREATE_MULTI_TIMESERIES:
+        return ((InternalCreateMultiTimeSeriesNode) planNode)
+            .getDeviceMap().entrySet().stream()
+                .anyMatch(
+                    entry -> hasAlignmentConflict(entry.getKey(), 
entry.getValue().getLeft()));
+      default:
+        return false;
+    }
+  }
+
+  private boolean hasAlignmentConflict(final PartialPath devicePath, final 
boolean isAligned) {
+    final Pair<Boolean, MeasurementGroup> existing = deviceMap.get(devicePath);
+    return Objects.nonNull(existing) && !Objects.equals(existing.getLeft(), 
isAligned);
+  }
+
+  private void appendPlanNode(final PlanNode planNode) {
+    if (batchType == BatchType.NONE) {
+      batchType = resolveBatchType(planNode);
+    }
+
+    switch (planNode.getType()) {
+      case CREATE_TIME_SERIES:
+        appendCreateTimeSeriesNode((CreateTimeSeriesNode) planNode);
+        break;
+      case CREATE_ALIGNED_TIME_SERIES:
+        appendCreateAlignedTimeSeriesNode((CreateAlignedTimeSeriesNode) 
planNode);
+        break;
+      case CREATE_MULTI_TIME_SERIES:
+        appendCreateMultiTimeSeriesNode((CreateMultiTimeSeriesNode) planNode);
+        break;
+      case INTERNAL_CREATE_TIME_SERIES:
+        appendInternalCreateTimeSeriesNode((InternalCreateTimeSeriesNode) 
planNode);
+        break;
+      case INTERNAL_CREATE_MULTI_TIMESERIES:
+        
appendInternalCreateMultiTimeSeriesNode((InternalCreateMultiTimeSeriesNode) 
planNode);
+        break;
+      case ACTIVATE_TEMPLATE:
+        appendActivateTemplateNode((ActivateTemplateNode) planNode);
+        break;
+      case BATCH_ACTIVATE_TEMPLATE:
+        appendBatchActivateTemplateNode((BatchActivateTemplateNode) planNode);
+        break;
+      case INTERNAL_BATCH_ACTIVATE_TEMPLATE:
+        
appendInternalBatchActivateTemplateNode((InternalBatchActivateTemplateNode) 
planNode);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported schema plan node " + 
planNode.getType());
+    }
+  }
+
+  private void appendCreateTimeSeriesNode(final CreateTimeSeriesNode node) {
+    appendMeasurement(
+        node.getPath().getDevicePath(),
+        false,
+        node.getPath().getMeasurement(),
+        node.getDataType(),
+        node.getEncoding(),
+        node.getCompressor(),
+        node.getAlias(),
+        node.getTags(),
+        node.getAttributes());
+  }
+
+  private void appendCreateAlignedTimeSeriesNode(final 
CreateAlignedTimeSeriesNode node) {
+    for (int i = 0; i < node.getMeasurements().size(); ++i) {
+      appendMeasurement(
+          node.getDevicePath(),
+          true,
+          node.getMeasurements().get(i),
+          node.getDataTypes().get(i),
+          node.getEncodings().get(i),
+          node.getCompressors().get(i),
+          Objects.nonNull(node.getAliasList()) ? node.getAliasList().get(i) : 
null,
+          Objects.nonNull(node.getTagsList()) ? node.getTagsList().get(i) : 
null,
+          Objects.nonNull(node.getAttributesList()) ? 
node.getAttributesList().get(i) : null);
+    }
+  }
+
+  private void appendCreateMultiTimeSeriesNode(final CreateMultiTimeSeriesNode 
node) {
+    node.getMeasurementGroupMap()
+        .forEach(
+            (devicePath, measurementGroup) ->
+                appendMeasurementGroup(devicePath, false, measurementGroup));
+  }
+
+  private void appendInternalCreateTimeSeriesNode(final 
InternalCreateTimeSeriesNode node) {
+    appendMeasurementGroup(node.getDevicePath(), node.isAligned(), 
node.getMeasurementGroup());
+  }
+
+  private void appendInternalCreateMultiTimeSeriesNode(
+      final InternalCreateMultiTimeSeriesNode node) {
+    node.getDeviceMap()
+        .forEach(
+            (devicePath, isAlignedAndMeasurementGroup) ->
+                appendMeasurementGroup(
+                    devicePath,
+                    isAlignedAndMeasurementGroup.getLeft(),
+                    isAlignedAndMeasurementGroup.getRight()));
+  }
+
+  private void appendMeasurementGroup(
+      final PartialPath devicePath,
+      final boolean isAligned,
+      final MeasurementGroup measurementGroup) {
+    for (int i = 0; i < measurementGroup.size(); ++i) {
+      appendMeasurement(
+          devicePath,
+          isAligned,
+          measurementGroup.getMeasurements().get(i),
+          measurementGroup.getDataTypes().get(i),
+          measurementGroup.getEncodings().get(i),
+          measurementGroup.getCompressors().get(i),
+          Objects.nonNull(measurementGroup.getAliasList())
+              ? measurementGroup.getAliasList().get(i)
+              : null,
+          Objects.nonNull(measurementGroup.getTagsList())
+              ? measurementGroup.getTagsList().get(i)
+              : null,
+          Objects.nonNull(measurementGroup.getAttributesList())
+              ? measurementGroup.getAttributesList().get(i)
+              : null);
+    }
+  }
+
+  private void appendMeasurement(
+      final PartialPath devicePath,
+      final boolean isAligned,
+      final String measurement,
+      final TSDataType dataType,
+      final TSEncoding encoding,
+      final CompressionType compressor,
+      final String alias,
+      final Map<String, String> tags,
+      final Map<String, String> attributes) {
+    final MeasurementGroup group =
+        deviceMap
+            .computeIfAbsent(devicePath, key -> new Pair<>(isAligned, new 
MeasurementGroup()))
+            .getRight();
+    if (group.addMeasurement(measurement, dataType, encoding, compressor)) {
+      group.addAlias(alias);
+      group.addTags(tags);
+      group.addAttributes(attributes);
+    }
+  }
+
+  private void appendActivateTemplateNode(final ActivateTemplateNode node) {
+    templateActivationMap.putIfAbsent(
+        node.getActivatePath(), new Pair<>(node.getTemplateId(), 
node.getTemplateSetLevel()));
+  }
+
+  private void appendBatchActivateTemplateNode(final BatchActivateTemplateNode 
node) {
+    
node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent);
+  }
+
+  private void appendInternalBatchActivateTemplateNode(
+      final InternalBatchActivateTemplateNode node) {
+    
node.getTemplateActivationMap().forEach(templateActivationMap::putIfAbsent);
+  }
+
+  public synchronized boolean shouldEmit() {
+    if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) {
+      return false;
+    }
+    return totalBufferSize >= maxBatchSizeInBytes
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+  }
+
+  public synchronized void recordBatchMetrics() {
+    if (events.isEmpty() || firstEventProcessingTime == Long.MIN_VALUE) {
+      return;
+    }
+    batchTimeIntervalHistogram.update(System.currentTimeMillis() - 
firstEventProcessingTime);
+    batchSizeHistogram.update(totalBufferSize);
+    eventSizeHistogram.update(events.size());
+  }
+
+  public synchronized PlanNode toPlanNode() {
+    switch (batchType) {
+      case TIMESERIES:
+        return new InternalCreateMultiTimeSeriesNode(EMPTY_PLAN_NODE_ID, new 
HashMap<>(deviceMap));
+      case TEMPLATE_ACTIVATE:
+        return new BatchActivateTemplateNode(
+            EMPTY_PLAN_NODE_ID, new HashMap<>(templateActivationMap));
+      default:
+        throw new IllegalStateException("Cannot build schema batch plan node 
from empty batch.");
+    }
+  }
+
+  public synchronized void onSuccess() {
+    events.clear();
+    deviceMap.clear();
+    templateActivationMap.clear();
+    batchType = BatchType.NONE;
+    pipeName = null;
+    creationTime = 0;
+    totalBufferSize = 0;
+    firstEventProcessingTime = Long.MIN_VALUE;
+  }
+
+  public synchronized void decreaseEventsReferenceCount(
+      final String holderMessage, final boolean shouldReport) {
+    events.forEach(event -> event.decreaseReferenceCount(holderMessage, 
shouldReport));
+  }
+
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    final boolean removed =
+        events.removeIf(
+            event -> {
+              if (pipeNameToDrop.equals(event.getPipeName())
+                  && creationTimeToDrop == event.getCreationTime()
+                  && regionId == event.getRegionId()) {
+                
event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName());
+                return true;
+              }
+              return false;
+            });
+    if (removed) {
+      rebuildFromEvents();
+    }
+  }
+
+  private void rebuildFromEvents() {
+    deviceMap.clear();
+    templateActivationMap.clear();
+    batchType = BatchType.NONE;
+    pipeName = null;
+    creationTime = 0;
+    totalBufferSize = 0;
+
+    if (events.isEmpty()) {
+      firstEventProcessingTime = Long.MIN_VALUE;
+      return;
+    }
+
+    // After a partial discard, the enqueue timestamp of the oldest remaining 
event is unknown.
+    // Reset the emit window conservatively to avoid flushing immediately 
because of removed events.
+    firstEventProcessingTime = System.currentTimeMillis();
+    batchType = resolveBatchType(((PipeSchemaRegionWritePlanEvent) 
events.get(0)).getPlanNode());
+
+    for (final EnrichedEvent event : events) {
+      final PipeSchemaRegionWritePlanEvent schemaEvent = 
(PipeSchemaRegionWritePlanEvent) event;
+      if (Objects.isNull(pipeName)) {
+        pipeName = schemaEvent.getPipeName();
+        creationTime = schemaEvent.getCreationTime();
+      }
+      appendPlanNode(schemaEvent.getPlanNode());
+      totalBufferSize += 
schemaEvent.getPlanNode().serializeToByteBuffer().limit();
+    }
+  }
+
+  public synchronized boolean isEmpty() {
+    return events.isEmpty();
+  }
+
+  public synchronized int size() {
+    return events.size();
+  }
+
+  public synchronized String getPipeName() {
+    return pipeName;
+  }
+
+  public synchronized long getCreationTime() {
+    return creationTime;
+  }
+
+  public void setBatchSizeHistogram(final Histogram batchSizeHistogram) {
+    if (Objects.nonNull(batchSizeHistogram)) {
+      this.batchSizeHistogram = batchSizeHistogram;
+    }
+  }
+
+  public void setBatchTimeIntervalHistogram(final Histogram 
batchTimeIntervalHistogram) {
+    if (Objects.nonNull(batchTimeIntervalHistogram)) {
+      this.batchTimeIntervalHistogram = batchTimeIntervalHistogram;
+    }
+  }
+
+  public void setEventSizeHistogram(final Histogram eventSizeHistogram) {
+    if (Objects.nonNull(eventSizeHistogram)) {
+      this.eventSizeHistogram = eventSizeHistogram;
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    isClosed = true;
+    events.forEach(
+        event -> 
event.clearReferenceCount(PipeSchemaRegionWritePlanEventBatch.class.getName()));
+    events.clear();
+    deviceMap.clear();
+    templateActivationMap.clear();
+    allocatedMemoryBlock.close();
+  }
+
+  private enum BatchType {
+    NONE,
+    TIMESERIES,
+    TEMPLATE_ACTIVATE
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
index bc056857c17..874acdd6de6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java
@@ -24,11 +24,15 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -49,6 +53,19 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSchemaRegionAirGapSink.class);
 
+  private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch;
+
+  @Override
+  public void customize(
+      final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
+      throws Exception {
+    super.customize(parameters, configuration);
+
+    if (isTabletBatchModeEnabled) {
+      schemaRegionWritePlanEventBatch = new 
PipeSchemaRegionWritePlanEventBatch(parameters);
+    }
+  }
+
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
@@ -68,12 +85,21 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
 
     try {
       if (event instanceof PipeSchemaRegionWritePlanEvent) {
-        doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
+        if (isTabletBatchModeEnabled && 
Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+          doTransferWithBatch(socket, (PipeSchemaRegionWritePlanEvent) event);
+        } else {
+          doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
+        }
       } else if (event instanceof PipeSchemaRegionSnapshotEvent) {
+        flushBatchedEventsIfNecessary(socket);
         doTransferWrapper(socket, (PipeSchemaRegionSnapshotEvent) event);
-      } else if (!(event instanceof PipeHeartbeatEvent)) {
-        LOGGER.warn(
-            "IoTDBSchemaRegionAirGapSink does not support transferring generic 
event: {}.", event);
+      } else {
+        flushBatchedEventsIfNecessary(socket);
+        if (!(event instanceof PipeHeartbeatEvent)) {
+          LOGGER.warn(
+              "IoTDBSchemaRegionAirGapSink does not support transferring 
generic event: {}.",
+              event);
+        }
       }
     } catch (final IOException e) {
       isSocketAlive.set(socketIndex, false);
@@ -86,6 +112,43 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
     }
   }
 
+  private void doTransferWithBatch(
+      final AirGapSocket socket, final PipeSchemaRegionWritePlanEvent event)
+      throws PipeException, IOException {
+    if (schemaRegionWritePlanEventBatch.onEvent(event)) {
+      if (schemaRegionWritePlanEventBatch.shouldEmit()) {
+        flushBatchedEventsIfNecessary(socket);
+      }
+      return;
+    }
+
+    if (!schemaRegionWritePlanEventBatch.isEmpty()) {
+      flushBatchedEventsIfNecessary(socket);
+      if (schemaRegionWritePlanEventBatch.onEvent(event)) {
+        if (schemaRegionWritePlanEventBatch.shouldEmit()) {
+          flushBatchedEventsIfNecessary(socket);
+        }
+        return;
+      }
+    }
+
+    doTransferWrapper(socket, event);
+  }
+
+  private void flushBatchedEventsIfNecessary(final AirGapSocket socket)
+      throws PipeException, IOException {
+    if (Objects.isNull(schemaRegionWritePlanEventBatch)
+        || schemaRegionWritePlanEventBatch.isEmpty()) {
+      return;
+    }
+
+    schemaRegionWritePlanEventBatch.recordBatchMetrics();
+    doTransfer(socket, schemaRegionWritePlanEventBatch);
+    schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount(
+        IoTDBSchemaRegionAirGapSink.class.getName(), true);
+    schemaRegionWritePlanEventBatch.onSuccess();
+  }
+
   private void doTransferWrapper(
       final AirGapSocket socket,
       final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
@@ -107,21 +170,39 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       final AirGapSocket socket,
       final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
       throws PipeException, IOException {
-    if (!send(
+    doTransfer(
+        socket,
+        pipeSchemaRegionWritePlanEvent.getPlanNode(),
         pipeSchemaRegionWritePlanEvent.getPipeName(),
         pipeSchemaRegionWritePlanEvent.getCreationTime(),
-        socket,
-        PipeTransferPlanNodeReq.toTPipeTransferBytes(
-            pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
+        pipeSchemaRegionWritePlanEvent.toString());
+  }
+
+  private void doTransfer(
+      final AirGapSocket socket, final PipeSchemaRegionWritePlanEventBatch 
batch)
+      throws PipeException, IOException {
+    final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode 
planNode =
+        batch.toPlanNode();
+    doTransfer(socket, planNode, batch.getPipeName(), batch.getCreationTime(), 
planNode.toString());
+  }
+
+  private void doTransfer(
+      final AirGapSocket socket,
+      final 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode,
+      final String pipeName,
+      final long creationTime,
+      final String eventDescription)
+      throws PipeException, IOException {
+    if (!send(
+        pipeName, creationTime, socket, 
PipeTransferPlanNodeReq.toTPipeTransferBytes(planNode))) {
       final String errorMessage =
           String.format(
-              "Transfer data node write plan %s error. Socket: %s.",
-              pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), socket);
+              "Transfer data node write plan %s error. Socket: %s.", 
planNode.getType(), socket);
       receiverStatusHandler.handle(
           new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
               .setMessage(errorMessage),
           errorMessage,
-          pipeSchemaRegionWritePlanEvent.toString(),
+          eventDescription,
           true);
     }
   }
@@ -215,4 +296,44 @@ public class IoTDBSchemaRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       final String fileName, final long position, final byte[] payLoad) throws 
IOException {
     return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferBytes(fileName, 
position, payLoad);
   }
+
+  @Override
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      schemaRegionWritePlanEventBatch.discardEventsOfPipe(
+          pipeNameToDrop, creationTimeToDrop, regionId);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      schemaRegionWritePlanEventBatch.close();
+    }
+    super.close();
+  }
+
+  @Override
+  public void setTabletBatchSizeHistogram(final Histogram 
tabletBatchSizeHistogram) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      
schemaRegionWritePlanEventBatch.setBatchSizeHistogram(tabletBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTabletBatchTimeIntervalHistogram(
+      final Histogram tabletBatchTimeIntervalHistogram) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram(
+          tabletBatchTimeIntervalHistogram);
+    }
+  }
+
+  @Override
+  public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      
schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
index 15a72d1193a..d8965dce0dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java
@@ -25,11 +25,15 @@ import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFil
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
+import org.apache.iotdb.metrics.type.Histogram;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -54,6 +58,19 @@ public class IoTDBSchemaRegionSink extends 
IoTDBDataNodeSyncSink {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSchemaRegionSink.class);
 
+  private PipeSchemaRegionWritePlanEventBatch schemaRegionWritePlanEventBatch;
+
+  @Override
+  public void customize(
+      final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
+      throws Exception {
+    super.customize(parameters, configuration);
+
+    if (isTabletBatchModeEnabled) {
+      schemaRegionWritePlanEventBatch = new 
PipeSchemaRegionWritePlanEventBatch(parameters);
+    }
+  }
+
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     throw new UnsupportedOperationException(
@@ -69,13 +86,56 @@ public class IoTDBSchemaRegionSink extends 
IoTDBDataNodeSyncSink {
   @Override
   public void transfer(final Event event) throws Exception {
     if (event instanceof PipeSchemaRegionWritePlanEvent) {
-      doTransferWrapper((PipeSchemaRegionWritePlanEvent) event);
+      if (isTabletBatchModeEnabled && 
Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+        doTransferWithBatch((PipeSchemaRegionWritePlanEvent) event);
+      } else {
+        doTransferWrapper((PipeSchemaRegionWritePlanEvent) event);
+      }
     } else if (event instanceof PipeSchemaRegionSnapshotEvent) {
+      flushBatchedEventsIfNecessary();
       doTransferWrapper((PipeSchemaRegionSnapshotEvent) event);
-    } else if (!(event instanceof PipeHeartbeatEvent)) {
-      LOGGER.warn(
-          "IoTDBSchemaRegionConnector does not support transferring generic 
event: {}.", event);
+    } else {
+      flushBatchedEventsIfNecessary();
+      if (!(event instanceof PipeHeartbeatEvent)) {
+        LOGGER.warn(
+            "IoTDBSchemaRegionConnector does not support transferring generic 
event: {}.", event);
+      }
+    }
+  }
+
+  private void doTransferWithBatch(final PipeSchemaRegionWritePlanEvent event)
+      throws PipeException {
+    if (schemaRegionWritePlanEventBatch.onEvent(event)) {
+      if (schemaRegionWritePlanEventBatch.shouldEmit()) {
+        flushBatchedEventsIfNecessary();
+      }
+      return;
     }
+
+    if (!schemaRegionWritePlanEventBatch.isEmpty()) {
+      flushBatchedEventsIfNecessary();
+      if (schemaRegionWritePlanEventBatch.onEvent(event)) {
+        if (schemaRegionWritePlanEventBatch.shouldEmit()) {
+          flushBatchedEventsIfNecessary();
+        }
+        return;
+      }
+    }
+
+    doTransferWrapper(event);
+  }
+
+  private void flushBatchedEventsIfNecessary() throws PipeException {
+    if (Objects.isNull(schemaRegionWritePlanEventBatch)
+        || schemaRegionWritePlanEventBatch.isEmpty()) {
+      return;
+    }
+
+    schemaRegionWritePlanEventBatch.recordBatchMetrics();
+    doTransfer(schemaRegionWritePlanEventBatch);
+    schemaRegionWritePlanEventBatch.decreaseEventsReferenceCount(
+        IoTDBSchemaRegionSink.class.getName(), true);
+    schemaRegionWritePlanEventBatch.onSuccess();
   }
 
   private void doTransferWrapper(
@@ -95,43 +155,56 @@ public class IoTDBSchemaRegionSink extends 
IoTDBDataNodeSyncSink {
 
   private void doTransfer(final PipeSchemaRegionWritePlanEvent 
pipeSchemaRegionWritePlanEvent)
       throws PipeException {
+    doTransfer(
+        pipeSchemaRegionWritePlanEvent.getPlanNode(),
+        pipeSchemaRegionWritePlanEvent.getPipeName(),
+        pipeSchemaRegionWritePlanEvent.getCreationTime(),
+        pipeSchemaRegionWritePlanEvent.getPlanNode().toString());
+    LOGGER.info("Successfully transferred schema event {}.", 
pipeSchemaRegionWritePlanEvent);
+  }
+
+  private void doTransfer(final PipeSchemaRegionWritePlanEventBatch batch) 
throws PipeException {
+    final org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode 
planNode =
+        batch.toPlanNode();
+    doTransfer(planNode, batch.getPipeName(), batch.getCreationTime(), 
planNode.toString());
+    LOGGER.info("Successfully transferred batched schema events, batch size 
{}.", batch.size());
+  }
+
+  private void doTransfer(
+      final 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode planNode,
+      final String pipeName,
+      final long creationTime,
+      final String eventDescription)
+      throws PipeException {
     final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
 
     final TPipeTransferResp resp;
     try {
       final TPipeTransferReq req =
-          compressIfNeeded(
-              PipeTransferPlanNodeReq.toTPipeTransferReq(
-                  pipeSchemaRegionWritePlanEvent.getPlanNode()));
+          
compressIfNeeded(PipeTransferPlanNodeReq.toTPipeTransferReq(planNode));
       rateLimitIfNeeded(
-          pipeSchemaRegionWritePlanEvent.getPipeName(),
-          pipeSchemaRegionWritePlanEvent.getCreationTime(),
-          clientAndStatus.getLeft().getEndPoint(),
-          req.getBody().length);
+          pipeName, creationTime, clientAndStatus.getLeft().getEndPoint(), 
req.getBody().length);
       resp = clientAndStatus.getLeft().pipeTransfer(req);
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
           String.format(
               "Network error when transfer schema region write plan %s, 
because %s.",
-              pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), 
e.getMessage()),
+              planNode.getType(), e.getMessage()),
           e);
     }
 
     final TSStatus status = resp.getStatus();
-    // Only handle the failed statuses to avoid string format performance 
overhead
-    if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && resp.getStatus().getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
       receiverStatusHandler.handle(
           status,
           String.format(
               "Transfer data node write plan %s error, result status %s.",
-              pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status),
-          pipeSchemaRegionWritePlanEvent.getPlanNode().toString(),
+              planNode.getType(), status),
+          eventDescription,
           true);
     }
-
-    LOGGER.info("Successfully transferred schema event {}.", 
pipeSchemaRegionWritePlanEvent);
   }
 
   private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent 
pipeSchemaRegionSnapshotEvent)
@@ -245,6 +318,46 @@ public class IoTDBSchemaRegionSink extends 
IoTDBDataNodeSyncSink {
     return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, 
position, payLoad);
   }
 
+  @Override
+  public synchronized void discardEventsOfPipe(
+      final String pipeNameToDrop, final long creationTimeToDrop, final int 
regionId) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      schemaRegionWritePlanEventBatch.discardEventsOfPipe(
+          pipeNameToDrop, creationTimeToDrop, regionId);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      schemaRegionWritePlanEventBatch.close();
+    }
+    super.close();
+  }
+
+  @Override
+  public void setTabletBatchSizeHistogram(final Histogram 
tabletBatchSizeHistogram) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      
schemaRegionWritePlanEventBatch.setBatchSizeHistogram(tabletBatchSizeHistogram);
+    }
+  }
+
+  @Override
+  public void setTabletBatchTimeIntervalHistogram(
+      final Histogram tabletBatchTimeIntervalHistogram) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      schemaRegionWritePlanEventBatch.setBatchTimeIntervalHistogram(
+          tabletBatchTimeIntervalHistogram);
+    }
+  }
+
+  @Override
+  public void setBatchEventSizeHistogram(final Histogram eventSizeHistogram) {
+    if (Objects.nonNull(schemaRegionWritePlanEventBatch)) {
+      
schemaRegionWritePlanEventBatch.setEventSizeHistogram(eventSizeHistogram);
+    }
+  }
+
   @Override
   protected void mayLimitRateAndRecordIO(final long requiredBytes) {
     // Do nothing
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java
new file mode 100644
index 00000000000..d61fda93be5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetricsTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric.schema;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.type.Histogram;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PipeSchemaRegionSinkMetricsTest {
+
+  @Test
+  public void testRegisterAndDeregisterCreateAndRemoveHistograms() throws 
Exception {
+    final String taskId = "schema-task-" + System.nanoTime();
+    boolean deregistered = false;
+    final AbstractMetricService metricService = 
Mockito.mock(AbstractMetricService.class);
+    final PipeSinkSubtask subtask = Mockito.mock(PipeSinkSubtask.class);
+    final Rate rate = Mockito.mock(Rate.class);
+    final Histogram batchSizeHistogram = Mockito.mock(Histogram.class);
+    final Histogram batchTimeHistogram = Mockito.mock(Histogram.class);
+    final Histogram eventSizeHistogram = Mockito.mock(Histogram.class);
+
+    when(subtask.getTaskID()).thenReturn(taskId);
+    when(subtask.getAttributeSortedString()).thenReturn("schema_test");
+    when(subtask.getCreationTime()).thenReturn(1L);
+    when(metricService.getOrCreateRate(anyString(), any(MetricLevel.class), 
any(String[].class)))
+        .thenReturn(rate);
+    when(metricService.getOrCreateHistogram(
+            anyString(), any(MetricLevel.class), any(String[].class)))
+        .thenReturn(batchSizeHistogram, batchTimeHistogram, 
eventSizeHistogram);
+
+    final PipeSchemaRegionSinkMetrics metrics = 
PipeSchemaRegionSinkMetrics.getInstance();
+
+    final Field metricServiceField =
+        PipeSchemaRegionSinkMetrics.class.getDeclaredField("metricService");
+    metricServiceField.setAccessible(true);
+    metricServiceField.set(metrics, null);
+
+    try {
+      metrics.register(subtask);
+      metrics.bindTo(metricService);
+
+      verify(subtask).setTabletBatchSizeHistogram(batchSizeHistogram);
+      verify(subtask).setTabletBatchTimeIntervalHistogram(batchTimeHistogram);
+      verify(subtask).setEventSizeHistogram(eventSizeHistogram);
+
+      metrics.deregister(taskId);
+
+      verify(metricService)
+          .remove(
+              MetricType.HISTOGRAM,
+              Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
+              Tag.NAME.toString(),
+              "schema_test",
+              Tag.CREATION_TIME.toString(),
+              "1");
+      verify(metricService)
+          .remove(
+              MetricType.HISTOGRAM,
+              Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
+              Tag.NAME.toString(),
+              "schema_test",
+              Tag.CREATION_TIME.toString(),
+              "1");
+      verify(metricService)
+          .remove(
+              MetricType.HISTOGRAM,
+              Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+              Tag.NAME.toString(),
+              "schema_test");
+      verify(metricService)
+          .remove(
+              MetricType.RATE,
+              Metric.PIPE_CONNECTOR_SCHEMA_TRANSFER.toString(),
+              Tag.NAME.toString(),
+              "schema_test",
+              Tag.CREATION_TIME.toString(),
+              "1");
+      deregistered = true;
+    } finally {
+      if (!deregistered) {
+        metrics.deregister(taskId);
+      }
+      metricServiceField.set(metrics, null);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java
new file mode 100644
index 00000000000..d6caae2503a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionSinkTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
+import 
org.apache.iotdb.db.pipe.sink.protocol.airgap.IoTDBSchemaRegionAirGapSink;
+import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBSchemaRegionSink;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PipeSchemaRegionSinkTest {
+
+  @Test
+  public void testSyncSinkFlushesBatchedEventsOnHeartbeat() throws Exception {
+    final TestIoTDBSchemaRegionSyncSink sink = new 
TestIoTDBSchemaRegionSyncSink();
+    try {
+      final IoTDBDataNodeSyncClientManager clientManager =
+          Mockito.mock(IoTDBDataNodeSyncClientManager.class);
+      final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client =
+          
Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class);
+      when(clientManager.getClient()).thenAnswer(invocation -> new 
Pair<>(client, true));
+      when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667));
+      
when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp());
+
+      setField(sink, "clientManager", clientManager);
+      enableBatching(sink);
+
+      final PipeSchemaRegionWritePlanEvent event =
+          createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+
+      sink.transfer(event);
+      Assert.assertFalse(event.isReleased());
+      verify(client, never()).pipeTransfer(any(TPipeTransferReq.class));
+
+      sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+      verify(client, times(1)).pipeTransfer(any(TPipeTransferReq.class));
+      Assert.assertTrue(event.isReleased());
+      Assert.assertTrue(getBatch(sink).isEmpty());
+    } finally {
+      sink.close();
+    }
+  }
+
+  @Test
+  public void testSyncSinkFlushesBufferedEventsBeforeStandaloneTransfer() 
throws Exception {
+    final TestIoTDBSchemaRegionSyncSink sink = new 
TestIoTDBSchemaRegionSyncSink();
+    try {
+      final IoTDBDataNodeSyncClientManager clientManager =
+          Mockito.mock(IoTDBDataNodeSyncClientManager.class);
+      final org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient client =
+          
Mockito.mock(org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient.class);
+      when(clientManager.getClient()).thenAnswer(invocation -> new 
Pair<>(client, true));
+      when(client.getEndPoint()).thenReturn(new TEndPoint("127.0.0.1", 6667));
+      
when(client.pipeTransfer(any(TPipeTransferReq.class))).thenReturn(createSuccessResp());
+
+      setField(sink, "clientManager", clientManager);
+      enableBatching(sink);
+
+      final PipeSchemaRegionWritePlanEvent batchedEvent =
+          createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+      final PipeSchemaRegionWritePlanEvent standaloneEvent =
+          createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L);
+
+      sink.transfer(batchedEvent);
+      sink.transfer(standaloneEvent);
+
+      verify(client, times(2)).pipeTransfer(any(TPipeTransferReq.class));
+      Assert.assertTrue(batchedEvent.isReleased());
+      Assert.assertTrue(standaloneEvent.isReleased());
+      Assert.assertTrue(getBatch(sink).isEmpty());
+    } finally {
+      sink.close();
+    }
+  }
+
+  @Test
+  public void testAirGapSinkFlushesBatchedEventsOnHeartbeat() throws Exception 
{
+    final TestIoTDBSchemaRegionAirGapSink sink = new 
TestIoTDBSchemaRegionAirGapSink();
+    try {
+      enableBatching(sink);
+
+      final PipeSchemaRegionWritePlanEvent event =
+          createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+
+      sink.transfer(event);
+      Assert.assertFalse(event.isReleased());
+      Assert.assertEquals(0, sink.getSendCount());
+
+      sink.transfer(new PipeHeartbeatEvent(-1, false));
+
+      Assert.assertEquals(1, sink.getSendCount());
+      Assert.assertTrue(event.isReleased());
+      Assert.assertTrue(getBatch(sink).isEmpty());
+    } finally {
+      sink.close();
+    }
+  }
+
+  @Test
+  public void testAirGapSinkFlushesBufferedEventsBeforeStandaloneTransfer() 
throws Exception {
+    final TestIoTDBSchemaRegionAirGapSink sink = new 
TestIoTDBSchemaRegionAirGapSink();
+    try {
+      enableBatching(sink);
+
+      final PipeSchemaRegionWritePlanEvent batchedEvent =
+          createBatchableEvent("root.db.d1.s1", "pipeA", 1L);
+      final PipeSchemaRegionWritePlanEvent standaloneEvent =
+          createNonBatchableEvent("root.db.d2.s1", "pipeA", 1L);
+
+      sink.transfer(batchedEvent);
+      sink.transfer(standaloneEvent);
+
+      Assert.assertEquals(2, sink.getSendCount());
+      Assert.assertTrue(batchedEvent.isReleased());
+      Assert.assertTrue(standaloneEvent.isReleased());
+      Assert.assertTrue(getBatch(sink).isEmpty());
+    } finally {
+      sink.close();
+    }
+  }
+
+  private PipeParameters createParameters() {
+    return new PipeParameters(
+        new HashMap<String, String>() {
+          {
+            put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, "100000");
+            put(CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1048576");
+          }
+        });
+  }
+
+  private void enableBatching(final Object sink) throws Exception {
+    setField(sink, "isTabletBatchModeEnabled", true);
+    setField(
+        sink,
+        "schemaRegionWritePlanEventBatch",
+        new PipeSchemaRegionWritePlanEventBatch(createParameters()));
+  }
+
+  private PipeSchemaRegionWritePlanEventBatch getBatch(final Object sink) 
throws Exception {
+    return (PipeSchemaRegionWritePlanEventBatch)
+        getFieldValue(sink, "schemaRegionWritePlanEventBatch");
+  }
+
+  private PipeSchemaRegionWritePlanEvent createBatchableEvent(
+      final String path, final String pipeName, final long creationTime) 
throws Exception {
+    return new PipeSchemaRegionWritePlanEvent(
+        new CreateTimeSeriesNode(
+            new 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path),
+            new org.apache.iotdb.commons.path.MeasurementPath(path),
+            TSDataType.INT64,
+            TSEncoding.PLAIN,
+            CompressionType.LZ4,
+            null,
+            null,
+            null,
+            null),
+        pipeName,
+        creationTime,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        false);
+  }
+
+  private PipeSchemaRegionWritePlanEvent createNonBatchableEvent(
+      final String path, final String pipeName, final long creationTime) 
throws Exception {
+    return new PipeSchemaRegionWritePlanEvent(
+        new CreateTimeSeriesNode(
+            new 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId(path + 
"-p"),
+            new org.apache.iotdb.commons.path.MeasurementPath(path),
+            TSDataType.INT64,
+            TSEncoding.PLAIN,
+            CompressionType.LZ4,
+            java.util.Collections.singletonMap("prop", "v1"),
+            null,
+            null,
+            null),
+        pipeName,
+        creationTime,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        false);
+  }
+
+  private TPipeTransferResp createSuccessResp() {
+    final TPipeTransferResp resp = new TPipeTransferResp();
+    resp.setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("success"));
+    return resp;
+  }
+
+  private void setField(final Object target, final String fieldName, final 
Object value)
+      throws Exception {
+    Field field = findField(target.getClass(), fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+
+  private Object getFieldValue(final Object target, final String fieldName) 
throws Exception {
+    Field field = findField(target.getClass(), fieldName);
+    field.setAccessible(true);
+    return field.get(target);
+  }
+
+  private Field findField(final Class<?> clazz, final String fieldName)
+      throws NoSuchFieldException {
+    Class<?> current = clazz;
+    while (current != null) {
+      try {
+        return current.getDeclaredField(fieldName);
+      } catch (NoSuchFieldException ignored) {
+        current = current.getSuperclass();
+      }
+    }
+    throw new NoSuchFieldException(fieldName);
+  }
+
+  private static class TestIoTDBSchemaRegionSyncSink extends 
IoTDBSchemaRegionSink {
+
+    @Override
+    public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) {
+      return req;
+    }
+
+    @Override
+    public void rateLimitIfNeeded(
+        final String pipeName,
+        final long creationTime,
+        final TEndPoint endPoint,
+        final long bytesLength) {
+      // Do nothing in tests.
+    }
+  }
+
+  private static class TestIoTDBSchemaRegionAirGapSink extends 
IoTDBSchemaRegionAirGapSink {
+
+    private int sendCount = 0;
+
+    private TestIoTDBSchemaRegionAirGapSink() {
+      sockets.add(new AirGapSocket("127.0.0.1", 6667));
+      isSocketAlive.add(true);
+    }
+
+    @Override
+    protected int nextSocketIndex() {
+      return 0;
+    }
+
+    @Override
+    protected boolean send(
+        final String pipeName,
+        final long creationTime,
+        final AirGapSocket socket,
+        final byte[] bytes) {
+      sendCount++;
+      return true;
+    }
+
+    private int getSendCount() {
+      return sendCount;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java
new file mode 100644
index 00000000000..2f31fdb41f7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSchemaRegionWritePlanEventBatchTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeSchemaRegionWritePlanEventBatch;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.ActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.BatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalBatchActivateTemplateNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateMultiTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.InternalCreateTimeSeriesNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup;
+import org.apache.iotdb.metrics.type.Histogram;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class PipeSchemaRegionWritePlanEventBatchTest {
+
+  @Test
+  public void testBatchTimeSeriesEvents() throws Exception {
+    try (PipeSchemaRegionWritePlanEventBatch batch =
+        new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 
1048576))) {
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new CreateTimeSeriesNode(
+                      new PlanNodeId("1"),
+                      new MeasurementPath("root.db.d1.s1"),
+                      TSDataType.INT64,
+                      TSEncoding.PLAIN,
+                      CompressionType.LZ4,
+                      null,
+                      Collections.singletonMap("tag", "v1"),
+                      Collections.singletonMap("attr", "a1"),
+                      "alias1"),
+                  "pipeA",
+                  1L)));
+
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new CreateAlignedTimeSeriesNode(
+                      new PlanNodeId("2"),
+                      new PartialPath("root.db.d2"),
+                      Arrays.asList("s1", "s2"),
+                      Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE),
+                      Arrays.asList(TSEncoding.RLE, TSEncoding.GORILLA),
+                      Arrays.asList(CompressionType.SNAPPY, 
CompressionType.ZSTD),
+                      Arrays.asList("alias2", null),
+                      Arrays.asList(Collections.singletonMap("tag", "v2"), 
null),
+                      Arrays.asList(Collections.singletonMap("attr", "a2"), 
null)),
+                  "pipeA",
+                  1L)));
+
+      final PlanNode planNode = batch.toPlanNode();
+      Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode);
+
+      final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap =
+          ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap();
+      Assert.assertEquals(2, deviceMap.size());
+
+      final Pair<Boolean, MeasurementGroup> d1Group = deviceMap.get(new 
PartialPath("root.db.d1"));
+      Assert.assertNotNull(d1Group);
+      Assert.assertFalse(d1Group.getLeft());
+      Assert.assertEquals(Collections.singletonList("s1"), 
d1Group.getRight().getMeasurements());
+      Assert.assertEquals(Collections.singletonList("alias1"), 
d1Group.getRight().getAliasList());
+      Assert.assertEquals(
+          Collections.singletonList(Collections.singletonMap("tag", "v1")),
+          d1Group.getRight().getTagsList());
+      Assert.assertEquals(
+          Collections.singletonList(Collections.singletonMap("attr", "a1")),
+          d1Group.getRight().getAttributesList());
+
+      final Pair<Boolean, MeasurementGroup> d2Group = deviceMap.get(new 
PartialPath("root.db.d2"));
+      Assert.assertNotNull(d2Group);
+      Assert.assertTrue(d2Group.getLeft());
+      Assert.assertEquals(Arrays.asList("s1", "s2"), 
d2Group.getRight().getMeasurements());
+      Assert.assertEquals(Arrays.asList("alias2", null), 
d2Group.getRight().getAliasList());
+    }
+  }
+
+  @Test
+  public void testBatchAdditionalTimeSeriesNodeTypes() throws Exception {
+    try (PipeSchemaRegionWritePlanEventBatch batch =
+        new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 
1048576))) {
+      final Map<PartialPath, MeasurementGroup> createMultiMap = new 
HashMap<>();
+      createMultiMap.put(new PartialPath("root.db.d1"), 
createMeasurementGroup("s1", "alias1"));
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new CreateMultiTimeSeriesNode(new PlanNodeId("1"), 
createMultiMap),
+                  "pipeA",
+                  1L)));
+
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new InternalCreateTimeSeriesNode(
+                      new PlanNodeId("2"),
+                      new PartialPath("root.db.d2"),
+                      createMeasurementGroup("s2", "alias2"),
+                      true),
+                  "pipeA",
+                  1L)));
+
+      final Map<PartialPath, Pair<Boolean, MeasurementGroup>> 
internalCreateMultiMap =
+          new HashMap<>();
+      internalCreateMultiMap.put(
+          new PartialPath("root.db.d3"), new Pair<>(false, 
createMeasurementGroup("s3", "alias3")));
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new InternalCreateMultiTimeSeriesNode(
+                      new PlanNodeId("3"), internalCreateMultiMap),
+                  "pipeA",
+                  1L)));
+
+      final PlanNode planNode = batch.toPlanNode();
+      Assert.assertTrue(planNode instanceof InternalCreateMultiTimeSeriesNode);
+
+      final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap =
+          ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap();
+      Assert.assertEquals(3, deviceMap.size());
+      Assert.assertFalse(deviceMap.get(new 
PartialPath("root.db.d1")).getLeft());
+      Assert.assertTrue(deviceMap.get(new 
PartialPath("root.db.d2")).getLeft());
+      Assert.assertFalse(deviceMap.get(new 
PartialPath("root.db.d3")).getLeft());
+      Assert.assertEquals(
+          Collections.singletonList("s2"),
+          deviceMap.get(new 
PartialPath("root.db.d2")).getRight().getMeasurements());
+      Assert.assertEquals(
+          Collections.singletonList("alias3"),
+          deviceMap.get(new 
PartialPath("root.db.d3")).getRight().getAliasList());
+    }
+  }
+
+  @Test
+  public void testBatchTemplateActivationEvents() throws Exception {
+    try (PipeSchemaRegionWritePlanEventBatch batch =
+        new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 
1048576))) {
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new ActivateTemplateNode(
+                      new PlanNodeId("1"), new PartialPath("root.db.d1"), 1, 
10),
+                  "pipeA",
+                  1L)));
+
+      final Map<PartialPath, Pair<Integer, Integer>> templateActivationMap = 
new HashMap<>();
+      templateActivationMap.put(new PartialPath("root.db.d2"), new Pair<>(2, 
20));
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new BatchActivateTemplateNode(new PlanNodeId("2"), 
templateActivationMap),
+                  "pipeA",
+                  1L)));
+
+      final Map<PartialPath, Pair<Integer, Integer>> 
internalTemplateActivationMap =
+          new HashMap<>();
+      internalTemplateActivationMap.put(new PartialPath("root.db.d3"), new 
Pair<>(3, 30));
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new InternalBatchActivateTemplateNode(
+                      new PlanNodeId("3"), internalTemplateActivationMap),
+                  "pipeA",
+                  1L)));
+
+      final PlanNode planNode = batch.toPlanNode();
+      Assert.assertTrue(planNode instanceof BatchActivateTemplateNode);
+
+      final Map<PartialPath, Pair<Integer, Integer>> batchedMap =
+          ((BatchActivateTemplateNode) planNode).getTemplateActivationMap();
+      Assert.assertEquals(3, batchedMap.size());
+      Assert.assertEquals(new Pair<>(10, 1), batchedMap.get(new 
PartialPath("root.db.d1")));
+      Assert.assertEquals(new Pair<>(2, 20), batchedMap.get(new 
PartialPath("root.db.d2")));
+      Assert.assertEquals(new Pair<>(3, 30), batchedMap.get(new 
PartialPath("root.db.d3")));
+
+      Assert.assertFalse(
+          batch.onEvent(
+              createEvent(
+                  new CreateTimeSeriesNode(
+                      new PlanNodeId("4"),
+                      new MeasurementPath("root.db.d4.s1"),
+                      TSDataType.INT64,
+                      TSEncoding.PLAIN,
+                      CompressionType.LZ4,
+                      null,
+                      null,
+                      null,
+                      null),
+                  "pipeA",
+                  1L)));
+    }
+  }
+
+  @Test
+  public void testRejectDifferentPipePropsAndAlignmentConflict() throws 
Exception {
+    try (PipeSchemaRegionWritePlanEventBatch batch =
+        new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 
1048576))) {
+      Assert.assertFalse(
+          batch.onEvent(
+              createEvent(
+                  new CreateTimeSeriesNode(
+                      new PlanNodeId("1"),
+                      new MeasurementPath("root.db.d1.s1"),
+                      TSDataType.INT64,
+                      TSEncoding.PLAIN,
+                      CompressionType.LZ4,
+                      Collections.singletonMap("prop", "v1"),
+                      null,
+                      null,
+                      null),
+                  "pipeA",
+                  1L)));
+
+      final Map<PartialPath, MeasurementGroup> measurementGroupMapWithProps = 
new HashMap<>();
+      measurementGroupMapWithProps.put(
+          new PartialPath("root.db.d2"), 
createMeasurementGroupWithProps("s1"));
+      Assert.assertFalse(
+          batch.onEvent(
+              createEvent(
+                  new CreateMultiTimeSeriesNode(new PlanNodeId("2"), 
measurementGroupMapWithProps),
+                  "pipeA",
+                  1L)));
+
+      Assert.assertTrue(
+          batch.onEvent(
+              createEvent(
+                  new CreateTimeSeriesNode(
+                      new PlanNodeId("3"),
+                      new MeasurementPath("root.db.d3.s1"),
+                      TSDataType.INT64,
+                      TSEncoding.PLAIN,
+                      CompressionType.LZ4,
+                      null,
+                      null,
+                      null,
+                      null),
+                  "pipeA",
+                  1L)));
+
+      Assert.assertFalse(
+          batch.onEvent(
+              createEvent(
+                  new CreateAlignedTimeSeriesNode(
+                      new PlanNodeId("4"),
+                      new PartialPath("root.db.d3"),
+                      Collections.singletonList("s2"),
+                      Collections.singletonList(TSDataType.INT32),
+                      Collections.singletonList(TSEncoding.RLE),
+                      Collections.singletonList(CompressionType.SNAPPY),
+                      null,
+                      null,
+                      null),
+                  "pipeA",
+                  1L)));
+
+      Assert.assertFalse(
+          batch.onEvent(
+              createEvent(
+                  new CreateTimeSeriesNode(
+                      new PlanNodeId("5"),
+                      new MeasurementPath("root.db.d4.s1"),
+                      TSDataType.INT64,
+                      TSEncoding.PLAIN,
+                      CompressionType.LZ4,
+                      null,
+                      null,
+                      null,
+                      null),
+                  "pipeB",
+                  1L)));
+    }
+  }
+
+  @Test
+  public void testDiscardEventsOfPipeRebuildsBatchAndResetsEmitWindow() throws 
Exception {
+    try (PipeSchemaRegionWritePlanEventBatch batch =
+        new PipeSchemaRegionWritePlanEventBatch(createParameters(10000, 
1048576))) {
+      final PipeSchemaRegionWritePlanEvent removedEvent =
+          createEvent(
+              new CreateTimeSeriesNode(
+                  new PlanNodeId("1"),
+                  new MeasurementPath("root.db.d1.s1"),
+                  TSDataType.INT64,
+                  TSEncoding.PLAIN,
+                  CompressionType.LZ4,
+                  null,
+                  null,
+                  null,
+                  null),
+              "pipeA",
+              1L,
+              1);
+      final PipeSchemaRegionWritePlanEvent remainingEvent =
+          createEvent(
+              new CreateTimeSeriesNode(
+                  new PlanNodeId("2"),
+                  new MeasurementPath("root.db.d2.s1"),
+                  TSDataType.INT64,
+                  TSEncoding.PLAIN,
+                  CompressionType.LZ4,
+                  null,
+                  null,
+                  null,
+                  null),
+              "pipeA",
+              1L,
+              2);
+
+      Assert.assertTrue(batch.onEvent(removedEvent));
+      Assert.assertTrue(batch.onEvent(remainingEvent));
+
+      setField(batch, "firstEventProcessingTime", System.currentTimeMillis() - 
20000);
+      Assert.assertTrue(batch.shouldEmit());
+
+      batch.discardEventsOfPipe("pipeA", 1L, 1);
+
+      Assert.assertTrue(removedEvent.isReleased());
+      Assert.assertFalse(remainingEvent.isReleased());
+      Assert.assertEquals(1, batch.size());
+      Assert.assertEquals("pipeA", batch.getPipeName());
+      Assert.assertEquals(1L, batch.getCreationTime());
+      Assert.assertFalse(batch.shouldEmit());
+
+      final PlanNode planNode = batch.toPlanNode();
+      final Map<PartialPath, Pair<Boolean, MeasurementGroup>> deviceMap =
+          ((InternalCreateMultiTimeSeriesNode) planNode).getDeviceMap();
+      Assert.assertEquals(1, deviceMap.size());
+      Assert.assertTrue(deviceMap.containsKey(new PartialPath("root.db.d2")));
+    }
+  }
+
+  @Test
+  public void testRecordMetricsAndCloseReleaseEvents() throws Exception {
+    try (PipeSchemaRegionWritePlanEventBatch batch =
+        new PipeSchemaRegionWritePlanEventBatch(createParameters(1000, 
1048576))) {
+      final Histogram batchSizeHistogram = Mockito.mock(Histogram.class);
+      final Histogram batchTimeIntervalHistogram = 
Mockito.mock(Histogram.class);
+      final Histogram eventSizeHistogram = Mockito.mock(Histogram.class);
+      batch.setBatchSizeHistogram(batchSizeHistogram);
+      batch.setBatchTimeIntervalHistogram(batchTimeIntervalHistogram);
+      batch.setEventSizeHistogram(eventSizeHistogram);
+
+      final PipeSchemaRegionWritePlanEvent event =
+          createEvent(
+              new CreateTimeSeriesNode(
+                  new PlanNodeId("1"),
+                  new MeasurementPath("root.db.d1.s1"),
+                  TSDataType.INT64,
+                  TSEncoding.PLAIN,
+                  CompressionType.LZ4,
+                  null,
+                  null,
+                  null,
+                  null),
+              "pipeA",
+              1L);
+
+      Assert.assertTrue(batch.onEvent(event));
+      batch.recordBatchMetrics();
+
+      verify(batchTimeIntervalHistogram, times(1)).update(anyLong());
+      verify(batchSizeHistogram, times(1)).update(anyLong());
+      verify(eventSizeHistogram, times(1)).update(1L);
+
+      batch.close();
+      Assert.assertTrue(event.isReleased());
+    }
+  }
+
+  private PipeParameters createParameters(final int delayInMs, final long 
batchSizeInBytes) {
+    return new PipeParameters(
+        new HashMap<String, String>() {
+          {
+            put(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, String.valueOf(delayInMs));
+            put(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
String.valueOf(batchSizeInBytes));
+          }
+        });
+  }
+
+  private MeasurementGroup createMeasurementGroup(final String measurement, 
final String alias) {
+    final MeasurementGroup measurementGroup = new MeasurementGroup();
+    measurementGroup.addMeasurement(
+        measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4);
+    measurementGroup.addAlias(alias);
+    measurementGroup.addTags(Collections.singletonMap("tag", alias));
+    measurementGroup.addAttributes(Collections.singletonMap("attr", alias));
+    return measurementGroup;
+  }
+
+  private MeasurementGroup createMeasurementGroupWithProps(final String 
measurement) {
+    final MeasurementGroup measurementGroup = new MeasurementGroup();
+    measurementGroup.addMeasurement(
+        measurement, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.LZ4);
+    measurementGroup.addProps(Collections.singletonMap("prop", "v1"));
+    return measurementGroup;
+  }
+
+  private PipeSchemaRegionWritePlanEvent createEvent(
+      final PlanNode planNode, final String pipeName, final long creationTime) 
{
+    return createEvent(planNode, pipeName, creationTime, -1);
+  }
+
+  private PipeSchemaRegionWritePlanEvent createEvent(
+      final PlanNode planNode, final String pipeName, final long creationTime, 
final int regionId) {
+    final PipeSchemaRegionWritePlanEvent event =
+        new PipeSchemaRegionWritePlanEvent(
+            planNode, pipeName, creationTime, null, null, null, null, null, 
null, true, false);
+    event.setCommitterKeyAndCommitId(new CommitterKey(pipeName, creationTime, 
regionId, -1), 1L);
+    return event;
+  }
+
+  private void setField(final Object target, final String fieldName, final 
Object value)
+      throws Exception {
+    final Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+}

Reply via email to