This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 0698664d845 Pipe: Optimized insert node cache hit possibility & Pipe:
Set thread name for pipe receiver (#15263) (#15264)
0698664d845 is described below
commit 0698664d845d8b5233f2c1c54da2908f0f3a8af8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Apr 3 00:21:51 2025 +0800
Pipe: Optimized insert node cache hit possibility & Pipe: Set thread name
for pipe receiver (#15263) (#15264)
* Pipe: Optimized insert node cache hit possibility
* Pipe: Set thread name for pipe receiver
---
.../db/storageengine/dataregion/wal/utils/WALEntryHandler.java | 10 ++++++++--
.../storageengine/dataregion/wal/utils/WALEntryPosition.java | 4 ++--
.../storageengine/dataregion/wal/utils/WALInsertNodeCache.java | 6 ++++++
.../apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 4 ++++
4 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index 333842e38ef..f5d7406f5a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinExc
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,8 +91,13 @@ public class WALEntryHandler {
public InsertNode getInsertNodeViaCacheIfPossible() {
try {
final WALEntryValue finalValue = value;
- return finalValue instanceof InsertNode ? (InsertNode) finalValue : null;
- } catch (Exception e) {
+ if (finalValue instanceof InsertNode) {
+ return (InsertNode) finalValue;
+ }
+ final Pair<ByteBuffer, InsertNode> byteBufferInsertNodePair =
+ walEntryPosition.getByteBufferOrInsertNodeIfPossible();
+ return byteBufferInsertNodePair == null ? null :
byteBufferInsertNodePair.getRight();
+ } catch (final Exception e) {
logger.warn("Fail to get insert node via cache. {}", this, e);
throw e;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 438e6a897e2..1370745cea2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -64,8 +64,8 @@ public class WALEntryPosition {
* Try to read the wal entry directly from the cache. No need to check if
the wal entry is ready
* for read.
*/
- public Pair<ByteBuffer, InsertNode>
readByteBufferOrInsertNodeViaCacheDirectly() {
- return cache.getByteBufferOrInsertNode(this);
+ public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNodeIfPossible() {
+ return cache.getByteBufferOrInsertNodeIfPossible(this);
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index efba8703072..983363689c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -224,6 +224,12 @@ public class WALInsertNodeCache {
return pair;
}
+ public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNodeIfPossible(
+ final WALEntryPosition position) {
+ hasPipeRunning = true;
+ return lruCache.getIfPresent(position);
+ }
+
public void cacheInsertNodeIfNeeded(
final WALEntryPosition walEntryPosition, final InsertNode insertNode) {
// reduce memory usage
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 60cff71fbfa..b5df0d98266 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -111,6 +111,10 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
receiverId.set(RECEIVER_ID_GENERATOR.incrementAndGet());
+ Thread.currentThread()
+ .setName(
+ String.format(
+ "Pipe-Receiver-%s-%s:%s", receiverId.get(), getSenderHost(),
getSenderPort()));
// Clear the original receiver file dir if exists
if (receiverFileDirWithIdSuffix.get() != null) {