This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 044620e7f0b [To dev/1.3] Pipe: Fixed the log of disruptor queue & 
deleted the useless binary buffer (#17341) (#17359)
044620e7f0b is described below

commit 044620e7f0b0893d27ad279986840bc43b15ad19
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 20:05:36 2026 +0800

    [To dev/1.3] Pipe: Fixed the log of disruptor queue & deleted the useless 
binary buffer (#17341) (#17359)
    
    * wz
    
    * fx
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 22 ++-------
 .../evolvable/batch/PipeTabletEventPlainBatch.java | 31 ++-----------
 .../request/PipeTransferTabletBatchReq.java        | 54 ++++------------------
 .../protocol/airgap/IoTDBDataRegionAirGapSink.java |  9 +---
 .../pipeconsensus/PipeConsensusAsyncSink.java      | 13 +-----
 .../pipeconsensus/PipeConsensusSyncSink.java       | 20 ++------
 .../PipeConsensusTransferBatchReqBuilder.java      | 16 ++-----
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  7 +--
 .../thrift/sync/IoTDBDataRegionSyncSink.java       |  7 +--
 .../sink/protocol/writeback/WriteBackSink.java     | 21 ++-------
 .../realtime/assigner/DisruptorQueue.java          |  4 +-
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   |  3 +-
 12 files changed, 37 insertions(+), 170 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index ade4d67b2dd..bc7040a0598 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -37,7 +37,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.collector.TabletCollector;
@@ -51,7 +50,8 @@ import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -102,18 +102,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     this.allocatedMemoryBlock = new AtomicReference<>();
   }
 
+  @Nonnull
   public InsertNode getInsertNode() {
     return insertNode;
   }
 
-  public ByteBuffer getByteBuffer() throws WALPipeException {
-    final InsertNode node = insertNode;
-    if (Objects.isNull(node)) {
-      throw new PipeException("InsertNode has been released");
-    }
-    return node.serializeToByteBuffer();
-  }
-
   public String getDeviceId() {
     final InsertNode node = insertNode;
     if (Objects.isNull(node)) {
@@ -214,9 +207,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   public boolean mayEventTimeOverlappedWithTimeRange() {
     try {
       final InsertNode insertNode = getInsertNode();
-      if (Objects.isNull(insertNode)) {
-        return true;
-      }
 
       if (insertNode instanceof InsertRowNode) {
         final long timestamp = ((InsertRowNode) insertNode).getTime();
@@ -258,9 +248,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   public boolean mayEventPathsOverlappedWithPattern() {
     try {
       final InsertNode insertNode = getInsertNode();
-      if (Objects.isNull(insertNode)) {
-        return true;
-      }
 
       if (insertNode instanceof InsertRowNode || insertNode instanceof 
InsertTabletNode) {
         final PartialPath devicePartialPath = insertNode.getDevicePath();
@@ -355,9 +342,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
       dataContainers = new ArrayList<>();
       final InsertNode node = getInsertNode();
-      if (Objects.isNull(node)) {
-        throw new PipeException("InsertNode has been released");
-      }
       switch (node.getType()) {
         case INSERT_ROW:
         case INSERT_TABLET:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index d450be8200c..837d4db639a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -24,14 +24,11 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -40,23 +37,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
-
-  private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
   private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
   private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
 
   // Used to rate limit when transferring data
   private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new 
HashMap<>();
 
-  PipeTabletEventPlainBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
-    super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
-  }
-
   PipeTabletEventPlainBatch(
       final int maxDelayInMs,
       final long requestMaxBatchSizeInBytes,
@@ -65,8 +53,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   }
 
   @Override
-  protected boolean constructBatch(final TabletInsertionEvent event)
-      throws WALPipeException, IOException {
+  protected boolean constructBatch(final TabletInsertionEvent event) throws 
IOException {
     final int bufferSize = buildTabletInsertionBuffer(event);
     totalBufferSize += bufferSize;
     pipe2BytesAccumulated.compute(
@@ -81,7 +68,6 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   public synchronized void onSuccess() {
     super.onSuccess();
 
-    binaryBuffers.clear();
     insertNodeBuffers.clear();
     tabletBuffers.clear();
 
@@ -89,8 +75,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   }
 
   public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
-    return PipeTransferTabletBatchReq.toTPipeTransferReq(
-        binaryBuffers, insertNodeBuffers, tabletBuffers);
+    return PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers, 
tabletBuffers);
   }
 
   public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
@@ -101,8 +86,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     return pipe2BytesAccumulated;
   }
 
-  private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
-      throws IOException, WALPipeException {
+  private int buildTabletInsertionBuffer(final TabletInsertionEvent event) 
throws IOException {
     final ByteBuffer buffer;
     if (event instanceof PipeInsertNodeTabletInsertionEvent) {
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
@@ -110,13 +94,8 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
       // Read the bytebuffer from the wal file and transfer it directly 
without serializing or
       // deserializing if possible
       final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
-      if (Objects.isNull(insertNode)) {
-        buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
-        binaryBuffers.add(buffer);
-      } else {
-        buffer = insertNode.serializeToByteBuffer();
-        insertNodeBuffers.add(buffer);
-      }
+      buffer = insertNode.serializeToByteBuffer();
+      insertNodeBuffers.add(buffer);
     } else {
       final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
           (PipeRawTabletInsertionEvent) event;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 48bd1016763..94a838ee0ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -45,7 +45,6 @@ import java.util.Objects;
 
 public class PipeTransferTabletBatchReq extends TPipeTransferReq {
 
-  private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new 
ArrayList<>();
   private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs 
= new ArrayList<>();
   private final transient List<PipeTransferTabletRawReq> tabletReqs = new 
ArrayList<>();
 
@@ -61,26 +60,6 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
     final List<InsertTabletStatement> insertTabletStatementList = new 
ArrayList<>();
 
-    for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
-      final InsertBaseStatement statement = binaryReq.constructStatement();
-      if (statement.isEmpty()) {
-        continue;
-      }
-      if (statement instanceof InsertRowStatement) {
-        insertRowStatementList.add((InsertRowStatement) statement);
-      } else if (statement instanceof InsertTabletStatement) {
-        insertTabletStatementList.add((InsertTabletStatement) statement);
-      } else if (statement instanceof InsertRowsStatement) {
-        insertRowStatementList.addAll(
-            ((InsertRowsStatement) statement).getInsertRowStatementList());
-      } else {
-        throw new UnsupportedOperationException(
-            String.format(
-                "unknown InsertBaseStatement %s constructed from 
PipeTransferTabletBinaryReq.",
-                binaryReq));
-      }
-    }
-
     for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) 
{
       final InsertBaseStatement statement = insertNodeReq.constructStatement();
       if (statement.isEmpty()) {
@@ -117,9 +96,7 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletBatchReq toTPipeTransferReq(
-      final List<ByteBuffer> binaryBuffers,
-      final List<ByteBuffer> insertNodeBuffers,
-      final List<ByteBuffer> tabletBuffers)
+      final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> 
tabletBuffers)
       throws IOException {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
 
@@ -130,11 +107,8 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
     batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
-      for (final ByteBuffer binaryBuffer : binaryBuffers) {
-        ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
-        outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
-      }
+      // Binary req, for rolling upgrading
+      ReadWriteIOUtils.write(0, outputStream);
 
       ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
       for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
@@ -157,16 +131,10 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
       final TPipeTransferReq transferReq) {
     final PipeTransferTabletBatchReq batchReq = new 
PipeTransferTabletBatchReq();
 
-    int size = ReadWriteIOUtils.readInt(transferReq.body);
-    for (int i = 0; i < size; ++i) {
-      final int length = ReadWriteIOUtils.readInt(transferReq.body);
-      final byte[] body = new byte[length];
-      transferReq.body.get(body);
-      batchReq.binaryReqs.add(
-          
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
-    }
+    // Binary req, for rolling upgrading
+    ReadWriteIOUtils.readInt(transferReq.body);
 
-    size = ReadWriteIOUtils.readInt(transferReq.body);
+    int size = ReadWriteIOUtils.readInt(transferReq.body);
     for (int i = 0; i < size; ++i) {
       batchReq.insertNodeReqs.add(
           PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
@@ -188,11 +156,6 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
 
   /////////////////////////////// TestOnly ///////////////////////////////
 
-  @TestOnly
-  public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
-    return binaryReqs;
-  }
-
   @TestOnly
   public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
     return insertNodeReqs;
@@ -214,8 +177,7 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
       return false;
     }
     final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
-    return binaryReqs.equals(that.binaryReqs)
-        && insertNodeReqs.equals(that.insertNodeReqs)
+    return insertNodeReqs.equals(that.insertNodeReqs)
         && tabletReqs.equals(that.tabletReqs)
         && version == that.version
         && type == that.type
@@ -224,6 +186,6 @@ public class PipeTransferTabletBatchReq extends 
TPipeTransferReq {
 
   @Override
   public int hashCode() {
-    return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, 
body);
+    return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index a2f09cf5e50..918cf24fce5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -194,13 +193,9 @@ public class IoTDBDataRegionAirGapSink extends 
IoTDBDataNodeAirGapSink {
   private void doTransfer(
       final AirGapSocket socket,
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
-      throws PipeException, WALPipeException, IOException {
+      throws PipeException, IOException {
     final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
-    final byte[] bytes =
-        Objects.isNull(insertNode)
-            ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
-                pipeInsertNodeTabletInsertionEvent.getByteBuffer())
-            : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
+    final byte[] bytes = 
PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
 
     if (!send(
         pipeInsertNodeTabletInsertionEvent.getPipeName(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
index a34781dee2e..47380ec799d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java
@@ -48,7 +48,6 @@ import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensu
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
-import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -64,7 +63,6 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -280,15 +278,8 @@ public class PipeConsensusAsyncSink extends IoTDBSink 
implements ConsensusPipeSi
       final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
       final ProgressIndex progressIndex = 
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
       final TPipeConsensusTransferReq pipeConsensusTransferReq =
-          Objects.isNull(insertNode)
-              ? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
-                  pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
-                  tCommitId,
-                  tConsensusGroupId,
-                  progressIndex,
-                  thisDataNodeId)
-              : PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
-                  insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId);
+          PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+              insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId);
       final PipeConsensusTabletInsertNodeEventHandler 
pipeConsensusInsertNodeReqHandler =
           new PipeConsensusTabletInsertNodeEventHandler(
               pipeInsertNodeTabletInsertionEvent,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index b059e484734..3d992932e9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -38,7 +38,6 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.builder.PipeConsensusSyncBatchReqBuilder;
-import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
@@ -255,21 +254,10 @@ public class PipeConsensusSyncSink extends IoTDBSink {
       insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
       progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
 
-      if (insertNode != null) {
-        resp =
-            syncPipeConsensusServiceClient.pipeConsensusTransfer(
-                PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
-                    insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId));
-      } else {
-        resp =
-            syncPipeConsensusServiceClient.pipeConsensusTransfer(
-                PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
-                    pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
-                    tCommitId,
-                    tConsensusGroupId,
-                    progressIndex,
-                    thisDataNodeId));
-      }
+      resp =
+          syncPipeConsensusServiceClient.pipeConsensusTransfer(
+              PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+                  insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId));
     } catch (final Exception e) {
       throw new PipeConnectionException(
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index eb5ce6fe48c..e13e1c1a802 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -28,7 +28,6 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBatchReq;
-import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -199,17 +198,10 @@ public abstract class 
PipeConsensusTransferBatchReqBuilder implements AutoClosea
     final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
     // PipeConsensus will transfer binary data to TPipeConsensusTransferReq
     final ProgressIndex progressIndex = 
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
-    if (Objects.isNull(insertNode)) {
-      buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
-      batchReqs.add(
-          PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
-              buffer, commitId, consensusGroupId, progressIndex, 
thisDataNodeId));
-    } else {
-      buffer = insertNode.serializeToByteBuffer();
-      batchReqs.add(
-          PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
-              insertNode, commitId, consensusGroupId, progressIndex, 
thisDataNodeId));
-    }
+    buffer = insertNode.serializeToByteBuffer();
+    batchReqs.add(
+        PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+            insertNode, commitId, consensusGroupId, progressIndex, 
thisDataNodeId));
 
     return buffer.limit();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index f8e0684188a..6dc9a695b90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -39,7 +39,6 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatc
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
@@ -281,11 +280,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
       final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
       final TPipeTransferReq pipeTransferReq =
-          compressIfNeeded(
-              Objects.isNull(insertNode)
-                  ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                      pipeInsertNodeTabletInsertionEvent.getByteBuffer())
-                  : 
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
+          
compressIfNeeded(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
       final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler =
           new PipeTransferTabletInsertNodeEventHandler(
               pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index a40074392f4..a13f40b1b83 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -39,7 +39,6 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatc
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTransferBatchReqBuilder;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -319,11 +318,7 @@ public class IoTDBDataRegionSyncSink extends 
IoTDBDataNodeSyncSink {
 
       final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
       final TPipeTransferReq req =
-          compressIfNeeded(
-              insertNode != null
-                  ? 
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)
-                  : PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                      pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
+          
compressIfNeeded(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
       rateLimitIfNeeded(
           pipeInsertNodeTabletInsertionEvent.getPipeName(),
           pipeInsertNodeTabletInsertionEvent.getCreationTime(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 80c9e09f9ac..21976f63ab6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -22,12 +22,10 @@ package org.apache.iotdb.db.pipe.sink.protocol.writeback;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
-import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -53,7 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
-import java.util.Objects;
 
 public class WriteBackSink implements PipeConnector {
 
@@ -125,23 +122,13 @@ public class WriteBackSink implements PipeConnector {
 
   private void doTransfer(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
-      throws PipeException, WALPipeException {
+      throws PipeException {
     final TSStatus status;
 
     final InsertNode insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNode();
-    if (Objects.isNull(insertNode)) {
-      status =
-          PipeDataNodeAgent.receiver()
-              .thrift()
-              .receive(
-                  PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                      pipeInsertNodeTabletInsertionEvent.getByteBuffer()))
-              .getStatus();
-    } else {
-      final InsertBaseStatement statement =
-          
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
-      status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : 
executeStatement(statement);
-    }
+    final InsertBaseStatement statement =
+        
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
+    status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : 
executeStatement(statement);
 
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index c7add196e69..2a2fa110749 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -112,9 +112,9 @@ public class DisruptorQueue {
   private void mayPrintExceedingLog() {
     final long remainingCapacity = ringBuffer.remainingCapacity();
     final long bufferSize = ringBuffer.getBufferSize();
-    if ((double) remainingCapacity / bufferSize >= 0.5
+    if ((double) remainingCapacity / bufferSize <= 0.5
         && System.currentTimeMillis()
-                - 
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()
+                - 
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds() * 1000L
             >= lastLogTime) {
       LOGGER.warn(
           "The assigner queue content has exceeded half, it may be stuck and 
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 9b7e38f629e..c69b4f53a46 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -253,8 +253,7 @@ public class PipeDataNodeThriftRequestTest {
     }
 
     final PipeTransferTabletBatchReq req =
-        PipeTransferTabletBatchReq.toTPipeTransferReq(
-            binaryBuffers, insertNodeBuffers, tabletBuffers);
+        PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers, 
tabletBuffers);
 
     final PipeTransferTabletBatchReq deserializedReq =
         PipeTransferTabletBatchReq.fromTPipeTransferReq(req);

Reply via email to