This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4747d5f243f Optimize local load TsFile piece dispatch (#17851)
4747d5f243f is described below

commit 4747d5f243f04d0ef293b29beb8209cb02c463e4
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 11:35:05 2026 +0800

    Optimize local load TsFile piece dispatch (#17851)
---
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |  8 +-
 .../load/splitter/AlignedChunkData.java            |  9 ++
 .../load/splitter/NonAlignedChunkData.java         | 16 ++++
 .../load/LoadTsFileDispatcherImplTest.java         | 98 ++++++++++++++++++++++
 .../load/splitter/ChunkDataDirectWriteTest.java    | 91 ++++++++++++++++++++
 5 files changed, 215 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index fcd884d696d..bd5356ccff4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
-import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -161,12 +160,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     PlanNode planNode = instance.getFragment().getPlanNodeTree();
 
     if (planNode instanceof LoadTsFilePieceNode) { // split
-      LoadTsFilePieceNode pieceNode =
-          (LoadTsFilePieceNode) 
PlanNodeType.deserialize(planNode.serializeToByteBuffer());
-      if (pieceNode == null) {
-        throw new FragmentInstanceDispatchException(
-            new 
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
-      }
+      LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) planNode;
       TSStatus resultStatus =
           StorageEngine.getInstance().writeLoadTsFileNode((DataRegionId) 
groupId, pieceNode, uuid);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index 2c9cea06852..d852ec3c9b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -290,6 +290,7 @@ public class AlignedChunkData implements ChunkData {
   }
 
   protected void writeTsFileData(TsFileIOWriter writer) throws IOException, 
PageException {
+    ensureDataReadyForWriting();
     final InputStream stream = new 
LoadTsFilePieceNode.ByteBufferInputStream(chunkData);
     if (needDecodeChunk) {
       writeChunkToWriter(stream, writer);
@@ -298,6 +299,14 @@ public class AlignedChunkData implements ChunkData {
     }
   }
 
+  private void ensureDataReadyForWriting() throws IOException {
+    if (chunkData != null) {
+      chunkData.rewind();
+      return;
+    }
+    chunkData = ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size());
+  }
+
   protected void deserializeTsFileDataByte(final InputStream stream) throws 
IOException {
     final int size = ReadWriteIOUtils.readInt(stream);
     if (stream instanceof LoadTsFilePieceNode.ByteBufferInputStream) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
index 2310b9cb95c..7b3c2fd3f95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 
 import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.file.header.ChunkHeader;
@@ -114,6 +115,7 @@ public class NonAlignedChunkData implements ChunkData {
 
   @Override
   public void writeToFileWriter(final TsFileIOWriter writer) throws 
IOException {
+    ensureDataReadyForWriting();
     if (chunk != null) {
       writer.writeChunk(chunk);
     } else {
@@ -121,6 +123,20 @@ public class NonAlignedChunkData implements ChunkData {
     }
   }
 
+  private void ensureDataReadyForWriting() throws IOException {
+    if (chunk != null || chunkWriter != null) {
+      return;
+    }
+
+    try {
+      deserializeTsFileData(
+          new LoadTsFilePieceNode.ByteBufferInputStream(
+              ByteBuffer.wrap(byteStream.getBuf(), 0, byteStream.size())));
+    } catch (final PageException e) {
+      throw new IOException(e);
+    }
+  }
+
   @Override
   public void serialize(final DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(getType().ordinal(), stream);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
new file mode 100644
index 00000000000..2da982fd23d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImplTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.scheduler.load;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.util.Collections;
+
+@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", 
"javax.management.*"})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StorageEngine.class)
+public class LoadTsFileDispatcherImplTest {
+
+  @Test
+  public void testDispatchLocallyPieceNodeSkipsSerdeRoundTrip() throws 
Exception {
+    final StorageEngine storageEngine = Mockito.mock(StorageEngine.class);
+    PowerMockito.mockStatic(StorageEngine.class);
+    PowerMockito.when(StorageEngine.getInstance()).thenReturn(storageEngine);
+
+    final LoadTsFileDispatcherImpl dispatcher = new 
LoadTsFileDispatcherImpl(null, false);
+    dispatcher.setUuid("test-uuid");
+
+    final LoadTsFilePieceNode pieceNode =
+        new LoadTsFilePieceNode(new PlanNodeId("piece"), new 
File("test.tsfile"));
+    final FragmentInstance instance = createFragmentInstance(pieceNode);
+
+    Mockito.when(
+            storageEngine.writeLoadTsFileNode(
+                Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), 
Mockito.eq("test-uuid")))
+        .thenReturn(RpcUtils.SUCCESS_STATUS);
+
+    dispatcher.dispatchLocally(instance);
+
+    Mockito.verify(storageEngine)
+        .writeLoadTsFileNode(
+            Mockito.eq(new DataRegionId(1)), Mockito.same(pieceNode), 
Mockito.eq("test-uuid"));
+  }
+
+  private static FragmentInstance createFragmentInstance(final 
LoadTsFilePieceNode pieceNode) {
+    final PlanFragmentId fragmentId = new PlanFragmentId("test", 0);
+    final FragmentInstance instance =
+        new FragmentInstance(
+            new PlanFragment(fragmentId, pieceNode),
+            fragmentId.genFragmentInstanceId(),
+            null,
+            null,
+            0,
+            null,
+            false,
+            false);
+    final TConsensusGroupId consensusGroupId = new 
DataRegionId(1).convertToTConsensusGroupId();
+    instance.setExecutorAndHost(
+        new StorageExecutor(
+            new TRegionReplicaSet(
+                consensusGroupId,
+                Collections.singletonList(
+                    new TDataNodeLocation().setInternalEndPoint(new 
TEndPoint("127.0.0.1", 1))))));
+    return instance;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
new file mode 100644
index 00000000000..c824a6c0ce9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkDataDirectWriteTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.splitter;
+
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+
+public class ChunkDataDirectWriteTest {
+
+  @Test
+  public void testNonAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws 
Exception {
+    final NonAlignedChunkData chunkData = createNonAlignedChunkData();
+    chunkData.setNotDecode();
+    final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
+    
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
+    chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
+
+    final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
+    chunkData.writeToFileWriter(writer);
+
+    Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
+  }
+
+  @Test
+  public void testAlignedChunkDataCanWriteWithoutSerdeRoundTrip() throws 
Exception {
+    final AlignedChunkData chunkData = createAlignedChunkData();
+    chunkData.setNotDecode();
+    final IChunkMetadata chunkMetadata = Mockito.mock(IChunkMetadata.class);
+    
Mockito.doReturn(createInt32Statistics()).when(chunkMetadata).getStatistics();
+    chunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata);
+
+    final TsFileIOWriter writer = Mockito.mock(TsFileIOWriter.class);
+    chunkData.writeToFileWriter(writer);
+
+    Mockito.verify(writer).writeChunk(Mockito.any(Chunk.class));
+  }
+
+  private static Statistics<?> createInt32Statistics() {
+    final Statistics<?> statistics = 
Statistics.getStatsByType(TSDataType.INT32);
+    statistics.update(1L, 1);
+    return statistics;
+  }
+
+  private static NonAlignedChunkData createNonAlignedChunkData() {
+    final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1");
+    return (NonAlignedChunkData)
+        ChunkData.createChunkData(false, device, createChunkHeader(), new 
TTimePartitionSlot(0L));
+  }
+
+  private static AlignedChunkData createAlignedChunkData() {
+    final IDeviceID device = new StringArrayDeviceID("root", "sg", "d1");
+    return (AlignedChunkData)
+        ChunkData.createChunkData(true, device, createChunkHeader(), new 
TTimePartitionSlot(0L));
+  }
+
+  private static ChunkHeader createChunkHeader() {
+    return new ChunkHeader(
+        "temperature", 0, TSDataType.INT32, CompressionType.UNCOMPRESSED, 
TSEncoding.PLAIN, 0);
+  }
+}

Reply via email to