This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch 1.3-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/1.3-pipe by this push:
     new 3c9511c003e Cleaned multiple potential problems in pipe module (#17396)
3c9511c003e is described below

commit 3c9511c003e3cf0d5a53291c9b2bef51a2439c2b
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 09:34:25 2026 +0800

    Cleaned multiple potential problems in pipe module (#17396)
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * gras-shop
    
    * fix
    
    * spls
    
    * fix
    
    * pipe-dn
    
    * logger-bug
    
    * fix
---
 .../runtime/heartbeat/PipeHeartbeat.java           |  2 +-
 .../pipe/source/IoTDBConfigRegionSource.java       |  6 ++--
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |  2 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../agent/task/connection/PipeEventCollector.java  |  2 +-
 .../sink/PipeRealtimePriorityBlockingQueue.java    | 11 ++----
 .../pipe/metric/overview/PipeResourceMetrics.java  |  2 +-
 .../receiver/PipeDataNodeReceiverMetrics.java      |  4 +--
 .../processor/aggregate/AggregateProcessor.java    |  4 +--
 .../resource/memory/PipeDynamicMemoryBlock.java    |  4 +--
 .../client/IoTDBDataNodeAsyncClientManager.java    |  4 +--
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  2 +-
 .../websocket/WebSocketConnectorServer.java        | 23 ++++++++----
 .../PipeRealtimeDataRegionHybridSource.java        | 34 ++----------------
 .../realtime/PipeRealtimeDataRegionLogSource.java  | 42 ++++------------------
 .../realtime/PipeRealtimeDataRegionSource.java     | 35 ++----------------
 .../PipeRealtimeDataRegionTsFileSource.java        | 18 ++--------
 .../TsFileDeduplicationBlockingPendingQueue.java   |  2 +-
 .../task/connection/BlockingPendingQueue.java      | 21 +----------
 .../pipe/datastructure/pattern/PipePattern.java    | 13 +++++--
 .../pipe/sink/client/IoTDBSyncClientManager.java   |  2 +-
 21 files changed, 65 insertions(+), 170 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 13a3c4b83d6..e7e9d2cd97d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -55,7 +55,7 @@ public class PipeHeartbeat {
       // the final results and namely these dataNodes are omitted in 
calculation.
       remainingEventCountMap.put(
           pipeMeta.getStaticMeta(),
-          Objects.nonNull(pipeCompletedListFromAgent)
+          Objects.nonNull(pipeRemainingEventCountListFromAgent)
               ? pipeRemainingEventCountListFromAgent.get(i)
               : 0L);
       remainingTimeMap.put(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
index a89af14c8c0..4295aa9b16f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java
@@ -101,8 +101,10 @@ public class IoTDBConfigRegionSource extends 
IoTDBNonDataRegionSource {
   @Override
   public synchronized EnrichedEvent supply() throws Exception {
     final EnrichedEvent event = super.supply();
-    PipeEventCommitManager.getInstance()
-        .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+    if (Objects.nonNull(event)) {
+      PipeEventCommitManager.getInstance()
+          .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+    }
     return event;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 1e1d3388f75..01289109371 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -78,7 +78,7 @@ class PipeAgentLauncher {
         curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + 
offset));
         offset++;
       }
-      index += (offset + 1);
+      index += offset;
       fetchAndSavePipePluginJars(curList);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 64d88d22683..c437507f4b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -722,7 +722,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
               needMemory,
               freeMemorySizeInBytes,
-              freeMemorySizeInBytes,
+              reservedMemorySizeInBytes,
               PipeMemoryManager.getTotalMemorySizeInBytes());
       LOGGER.warn(message);
       throw new PipeException(message);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index e3f8203251c..4728f62c94f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -196,7 +196,7 @@ public class PipeEventCollector implements EventCollector {
       ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
     }
 
-    pendingQueue.directOffer(event);
+    pendingQueue.offer(event);
     collectInvocationCount.incrementAndGet();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 2baebeedc18..6d227ac31fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -72,7 +72,7 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   }
 
   @Override
-  public boolean directOffer(final Event event) {
+  public boolean offer(final Event event) {
     checkBeforeOffer(event);
 
     if (event instanceof TsFileInsertionEvent) {
@@ -85,18 +85,13 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
       ((EnrichedEvent) 
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
       return false;
     } else {
-      return super.directOffer(event);
+      return super.offer(event);
     }
   }
 
-  @Override
-  public boolean waitedOffer(final Event event) {
-    return directOffer(event);
-  }
-
   @Override
   public boolean put(final Event event) {
-    directOffer(event);
+    offer(event);
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
index 19a8060f89c..48c55fba59d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java
@@ -137,7 +137,7 @@ public class PipeResourceMetrics implements IMetricSet {
     // phantom reference count
     metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
 
-    metricService.remove(MetricType.RATE, 
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
+    metricService.remove(MetricType.COUNTER, 
Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
   }
 
   public void recordDiskIO(final long bytes) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
index 741fa302aec..9c2ecad3cc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java
@@ -282,14 +282,14 @@ public class PipeDataNodeReceiverMetrics implements 
IMetricSet {
         Tag.NAME.toString(),
         RECEIVER,
         Tag.TYPE.toString(),
-        "handshakeDatanodeV1");
+        "handshakeDataNodeV1");
     metricService.remove(
         MetricType.TIMER,
         Metric.PIPE_DATANODE_RECEIVER.toString(),
         Tag.NAME.toString(),
         RECEIVER,
         Tag.TYPE.toString(),
-        "handshakeDatanodeV2");
+        "handshakeDataNodeV2");
     metricService.remove(
         MetricType.TIMER,
         Metric.PIPE_DATANODE_RECEIVER.toString(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 2b4414ed75d..191ec7ee712 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -728,7 +728,7 @@ public class AggregateProcessor implements PipeProcessor {
               throw new UnsupportedOperationException(
                   String.format(
                       "The output tablet does not support column type %s",
-                      valueColumnTypes[rowIndex]));
+                      valueColumnTypes[columnIndex]));
           }
         } else {
           bitMaps[columnIndex].mark(rowIndex);
@@ -742,7 +742,7 @@ public class AggregateProcessor implements PipeProcessor {
     int filteredCount = 0;
     for (int i = 0; i < columnNameStringList.length; ++i) {
       if (!bitMaps[i].isAllMarked()) {
-        originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
+        originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
index 4e33b871828..3dbac912db3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java
@@ -44,7 +44,7 @@ public class PipeDynamicMemoryBlock {
 
   PipeDynamicMemoryBlock(
       final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long 
memoryUsageInBytes) {
-    this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
+    this.memoryUsageInBytes = Math.max(memoryUsageInBytes, 0);
     this.fixedMemoryBlock = fixedMemoryBlock;
   }
 
@@ -116,7 +116,7 @@ public class PipeDynamicMemoryBlock {
       if (Double.isNaN(historyMemoryEfficiency)
           || Double.isInfinite(historyMemoryEfficiency)
           || historyMemoryEfficiency < 0.0) {
-        currentMemoryEfficiency = 0.0;
+        historyMemoryEfficiency = 0.0;
       }
 
       this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 97a4d2621b0..435f890f3a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -465,7 +465,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
       while (true) {
         final TEndPoint targetNodeUrl = endPointList.get((int) (Math.random() 
* clientSize));
-        if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+        if (isUnhealthy(targetNodeUrl) && n < clientSize) {
           n++;
           continue;
         }
@@ -486,7 +486,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       long n = 0;
       while (true) {
         for (final TEndPoint targetNodeUrl : endPointList) {
-          if (isUnhealthy(targetNodeUrl) && n <= clientSize) {
+          if (isUnhealthy(targetNodeUrl) && n < clientSize) {
             n++;
             continue;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 918cf24fce5..6396c710abd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -168,7 +168,7 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
       throw new PipeConnectionException(
           String.format(
               "Network error when transfer tsfile event %s, because %s.",
-              ((PipeSchemaRegionWritePlanEvent) event).coreReportMessage(), 
e.getMessage()),
+              ((EnrichedEvent) event).coreReportMessage(), e.getMessage()),
           e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index 940d6ff63db..6da64460495 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,14 +92,18 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
           eventsWaitingForTransfer.remove(pipeName);
       while (!eventTransferQueue.isEmpty()) {
-        eventTransferQueue.forEach(
+        final List<EventWaitingForTransfer> eventWrappers;
+        synchronized (eventTransferQueue) {
+          eventWrappers = new ArrayList<>(eventTransferQueue);
+          eventTransferQueue.clear();
+        }
+        eventWrappers.forEach(
             (eventWrapper) -> {
               if (eventWrapper.event instanceof EnrichedEvent) {
                 ((EnrichedEvent) eventWrapper.event)
                     
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
               }
             });
-        eventTransferQueue.clear();
         synchronized (eventTransferQueue) {
           eventTransferQueue.notifyAll();
         }
@@ -270,8 +276,10 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
 
     LOGGER.warn(
         "The tablet of commitId: {} can't be parsed by client, it will be 
retried later.", eventId);
-    eventTransferQueue.put(
-        new EventWaitingForTransfer(eventId, eventWrapper.connector, 
eventWrapper.event));
+    synchronized (eventTransferQueue) {
+      eventTransferQueue.put(
+          new EventWaitingForTransfer(eventId, eventWrapper.connector, 
eventWrapper.event));
+    }
   }
 
   @Override
@@ -321,7 +329,9 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       }
     }
 
-    queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), 
connector, event));
+    synchronized (queue) {
+      queue.put(new 
EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
+    }
   }
 
   private class TransferThread extends Thread {
@@ -347,7 +357,8 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
           }
 
           try {
-            final EventWaitingForTransfer queueElement = queue.take();
+            EventWaitingForTransfer queueElement;
+            queueElement = queue.take();
             synchronized (queue) {
               queue.notifyAll();
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 95e00eb9b15..faabf8b68f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -58,7 +58,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
-      extractDirectly(event);
+      pendingQueue.offer(event);
     } else {
       throw new UnsupportedOperationException(
           String.format(
@@ -116,21 +116,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
         if (state == TsFileEpoch.State.USING_BOTH) {
           event.skipReportOnCommit();
         }
-        if (!pendingQueue.waitedOffer(event)) {
-          // This would not happen, but just in case.
-          // pendingQueue is unbounded, so it should never reach capacity.
-          final String errorMessage =
-              String.format(
-                  "extractTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                      + "has reached capacity, discard tablet event %s, 
current state %s",
-                  this, event, event.getTsFileEpoch().getState(this));
-          LOGGER.error(errorMessage);
-          PipeDataNodeAgent.runtime()
-              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-          // Ignore the tablet event.
-          
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
 false);
-        }
+        pendingQueue.offer(event);
         break;
       default:
         throw new UnsupportedOperationException(
@@ -176,21 +162,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       case EMPTY:
       case USING_TSFILE:
       case USING_BOTH:
-        if (!pendingQueue.waitedOffer(event)) {
-          // This would not happen, but just in case.
-          // pendingQueue is unbounded, so it should never reach capacity.
-          final String errorMessage =
-              String.format(
-                  "extractTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridExtractor %s "
-                      + "has reached capacity, discard TsFile event %s, 
current state %s",
-                  this, event, event.getTsFileEpoch().getState(this));
-          LOGGER.error(errorMessage);
-          PipeDataNodeAgent.runtime()
-              .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-          // Ignore the tsfile event.
-          
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
 false);
-        }
+        pendingQueue.offer(event);
         break;
       default:
         throw new UnsupportedOperationException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index ac7883fd2b0..d3adc4dc3b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -40,7 +40,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
       LoggerFactory.getLogger(PipeRealtimeDataRegionLogSource.class);
 
   @Override
-  protected void doExtract(PipeRealtimeEvent event) {
+  protected void doExtract(final PipeRealtimeEvent event) {
     final Event eventToExtract = event.getEvent();
 
     if (eventToExtract instanceof TabletInsertionEvent) {
@@ -51,7 +51,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
     } else if (eventToExtract instanceof PipeHeartbeatEvent) {
       extractHeartbeat(event);
     } else if (eventToExtract instanceof PipeSchemaRegionWritePlanEvent) {
-      extractDirectly(event);
+      pendingQueue.offer(event);
     } else {
       throw new UnsupportedOperationException(
           String.format(
@@ -60,27 +60,12 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
     }
   }
 
-  private void extractTabletInsertion(PipeRealtimeEvent event) {
+  private void extractTabletInsertion(final PipeRealtimeEvent event) {
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TABLET);
-
-    if (!pendingQueue.waitedOffer(event)) {
-      // this would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s 
"
-                  + "has reached capacity, discard tablet event %s, current 
state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // ignore this event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
-  private void extractTsFileInsertion(PipeRealtimeEvent event) {
+  private void extractTsFileInsertion(final PipeRealtimeEvent event) {
     final PipeTsFileInsertionEvent tsFileInsertionEvent =
         (PipeTsFileInsertionEvent) event.getEvent();
     if (!(tsFileInsertionEvent.isLoaded())) {
@@ -91,22 +76,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
     }
 
     event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-
-    if (!pendingQueue.waitedOffer(event)) {
-      // this would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of PipeRealtimeDataRegionLogExtractor %s 
"
-                  + "has reached capacity, discard loaded tsFile event %s, 
current state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // ignore this event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index b9bc57b977a..b6dcf3f615b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -367,21 +367,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
       return;
     }
 
-    if (!pendingQueue.waitedOffer(event)) {
-      // This would not happen, but just in case.
-      // pendingQueue is unbounded, so it should never reach capacity.
-      LOGGER.error(
-          "extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
-              + "has reached capacity, discard heartbeat event {}",
-          this,
-          event);
-
-      // Do not report exception since the PipeHeartbeatEvent doesn't affect
-      // the correction of pipe progress.
-
-      // Ignore this event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
   protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
@@ -393,24 +379,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
               
.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex()));
       return;
     }
-    extractDirectly(event);
-  }
-
-  protected void extractDirectly(final PipeRealtimeEvent event) {
-    if (!pendingQueue.waitedOffer(event)) {
-      // This would not happen, but just in case.
-      // Pending is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of %s %s " + "has reached capacity, 
discard event %s",
-              this.getClass().getSimpleName(), this, event);
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // Ignore the event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionSource.class.getName(), 
false);
-    }
+    pendingQueue.offer(event);
   }
 
   protected void maySkipIndex4Event(final PipeRealtimeEvent event) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 94e7b549c3f..dd9f8cbbe20 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -46,7 +46,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
     }
 
     if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) {
-      extractDirectly(event);
+      pendingQueue.offer(event);
       return;
     }
 
@@ -59,21 +59,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
       return;
     }
 
-    if (!pendingQueue.waitedOffer(event)) {
-      // This would not happen, but just in case.
-      // Pending is unbounded, so it should never reach capacity.
-      final String errorMessage =
-          String.format(
-              "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor 
%s "
-                  + "has reached capacity, discard TsFile event %s, current 
state %s",
-              this, event, event.getTsFileEpoch().getState(this));
-      LOGGER.error(errorMessage);
-      PipeDataNodeAgent.runtime()
-          .report(pipeTaskMeta, new 
PipeRuntimeNonCriticalException(errorMessage));
-
-      // Ignore the event.
-      
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
 false);
-    }
+    pendingQueue.offer(event);
 
     event.getTsFileEpoch().clearState(this);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index e4a3a545a58..b1677c9ffe6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -65,7 +65,7 @@ public class TsFileDeduplicationBlockingPendingQueue extends 
SubscriptionBlockin
 
   @Override
   public void directOffer(final Event event) {
-    inputPendingQueue.directOffer(event);
+    inputPendingQueue.offer(event);
   }
 
   private synchronized Event filter(final Event event) { // make it 
synchronized
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index aa93b093254..8773b03f9f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -50,26 +50,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
     this.eventCounter = eventCounter;
   }
 
-  public boolean waitedOffer(final E event) {
-    checkBeforeOffer(event);
-    try {
-      final boolean offered =
-          pendingQueue.offer(
-              event,
-              
PIPE_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(),
-              TimeUnit.MILLISECONDS);
-      if (offered) {
-        eventCounter.increaseEventCount(event);
-      }
-      return offered;
-    } catch (final InterruptedException e) {
-      LOGGER.info("pending queue offer is interrupted.", e);
-      Thread.currentThread().interrupt();
-      return false;
-    }
-  }
-
-  public boolean directOffer(final E event) {
+  public boolean offer(final E event) {
     checkBeforeOffer(event);
     final boolean offered = pendingQueue.offer(event);
     if (offered) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
index 8c2b4ca0409..c0118397b3c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
@@ -328,8 +328,17 @@ public abstract class PipePattern {
     final List<PipePattern> sortedPatterns = new ArrayList<>(patterns);
     sortedPatterns.sort(
         (o1, o2) -> {
-          final PartialPath p1 = o1.getBaseInclusionPaths().get(0);
-          final PartialPath p2 = o2.getBaseInclusionPaths().get(0);
+          final List<PartialPath> p1List = o1.getBaseInclusionPaths();
+          final List<PartialPath> p2List = o2.getBaseInclusionPaths();
+
+          if (p1List.isEmpty()) {
+            return p2List.isEmpty() ? 1 : -1;
+          }
+
+          // We can only approximate comparison here since TreePattern 
represents multiple paths.
+          // We use the first inclusion path as a representative.
+          final PartialPath p1 = p1List.get(0);
+          final PartialPath p2 = p2List.get(0);
 
           final int lenCompare = Integer.compare(p1.getNodeLength(), 
p2.getNodeLength());
           if (lenCompare != 0) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 969f42bfcec..eb25c7553db 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -361,7 +361,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
         final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
             endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
         if (Boolean.TRUE.equals(nextClientAndStatus.getRight())
-            && clientAndStatus.getLeft() != null) {
+            && nextClientAndStatus.getLeft() != null) {
           return nextClientAndStatus;
         }
       }

Reply via email to