This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 0698664d845 Pipe: Optimized insert node cache hit possibility & Pipe: 
Set thread name for pipe receiver (#15263) (#15264)
0698664d845 is described below

commit 0698664d845d8b5233f2c1c54da2908f0f3a8af8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Apr 3 00:21:51 2025 +0800

    Pipe: Optimized insert node cache hit possibility & Pipe: Set thread name 
for pipe receiver (#15263) (#15264)
    
    * Pipe: Optimized insert node cache hit possibility
    
    * Pipe: Set thread name for pipe receiver
---
 .../db/storageengine/dataregion/wal/utils/WALEntryHandler.java | 10 ++++++++--
 .../storageengine/dataregion/wal/utils/WALEntryPosition.java   |  4 ++--
 .../storageengine/dataregion/wal/utils/WALInsertNodeCache.java |  6 ++++++
 .../apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java  |  4 ++++
 4 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index 333842e38ef..f5d7406f5a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinExc
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,8 +91,13 @@ public class WALEntryHandler {
   public InsertNode getInsertNodeViaCacheIfPossible() {
     try {
       final WALEntryValue finalValue = value;
-      return finalValue instanceof InsertNode ? (InsertNode) finalValue : null;
-    } catch (Exception e) {
+      if (finalValue instanceof InsertNode) {
+        return (InsertNode) finalValue;
+      }
+      final Pair<ByteBuffer, InsertNode> byteBufferInsertNodePair =
+          walEntryPosition.getByteBufferOrInsertNodeIfPossible();
+      return byteBufferInsertNodePair == null ? null : 
byteBufferInsertNodePair.getRight();
+    } catch (final Exception e) {
       logger.warn("Fail to get insert node via cache. {}", this, e);
       throw e;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 438e6a897e2..1370745cea2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -64,8 +64,8 @@ public class WALEntryPosition {
    * Try to read the wal entry directly from the cache. No need to check if 
the wal entry is ready
    * for read.
    */
-  public Pair<ByteBuffer, InsertNode> 
readByteBufferOrInsertNodeViaCacheDirectly() {
-    return cache.getByteBufferOrInsertNode(this);
+  public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNodeIfPossible() {
+    return cache.getByteBufferOrInsertNodeIfPossible(this);
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index efba8703072..983363689c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -224,6 +224,12 @@ public class WALInsertNodeCache {
     return pair;
   }
 
+  public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNodeIfPossible(
+      final WALEntryPosition position) {
+    hasPipeRunning = true;
+    return lruCache.getIfPresent(position);
+  }
+
   public void cacheInsertNodeIfNeeded(
       final WALEntryPosition walEntryPosition, final InsertNode insertNode) {
     // reduce memory usage
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 60cff71fbfa..b5df0d98266 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -111,6 +111,10 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     }
 
     receiverId.set(RECEIVER_ID_GENERATOR.incrementAndGet());
+    Thread.currentThread()
+        .setName(
+            String.format(
+                "Pipe-Receiver-%s-%s:%s", receiverId.get(), getSenderHost(), 
getSenderPort()));
 
     // Clear the original receiver file dir if exists
     if (receiverFileDirWithIdSuffix.get() != null) {

Reply via email to