This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 073625ad9c1 Fix short reads in fixed-length deserialization (#17870) 
(#18007)
073625ad9c1 is described below

commit 073625ad9c19e491eefad5296c3761f92574a176
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 24 10:02:29 2026 +0800

    Fix short reads in fixed-length deserialization (#17870) (#18007)
    
    * Fix short reads in fixed-length deserialization
    
    * Fix tag log append EOF handling
    
    * spotless
    
    ---------
    
    (cherry picked from commit aa2acd6b4f550f1138348b5d04283b18521b63d9)
---
 .../confignode/persistence/ProcedureInfo.java      |  13 +-
 .../writelog/io/SingleFileLogReader.java           |   5 +-
 .../datastructure/SerializableList.java            |   3 +-
 .../logfile/FakeCRC32Deserializer.java             |   5 +-
 .../mtree/impl/pbtree/schemafile/SchemaFile.java   |   3 +-
 .../pbtree/schemafile/log/SchemaFileLogReader.java |   7 +-
 .../pbtree/schemafile/pagemgr/PageIOChannel.java   |   3 +-
 .../schemaengine/schemaregion/tag/TagLogFile.java  |  12 +-
 .../dataregion/wal/io/WALFileVersion.java          |   4 +-
 .../dataregion/wal/io/WALInputStream.java          |  32 +++--
 .../dataregion/wal/io/WALMetaData.java             |  12 +-
 .../dataregion/wal/recover/WALRepairWriter.java    |   3 +-
 .../load/splitter/AlignedChunkData.java            |   5 +-
 .../response/SubscriptionEventTsFileResponse.java  |  11 +-
 .../iotdb/db/utils/sort/FileSpillerReader.java     |   4 +-
 .../logfile/FakeCRC32DeserializerTest.java         | 108 +++++++++++++++
 .../schemaregion/tag/TagLogFileTest.java           |  57 ++++++++
 .../index/impl/TimeWindowStateProgressIndex.java   |   9 +-
 .../commons/executable/ExecutableManager.java      |   3 +-
 .../queue/serializer/PlainQueueSerializer.java     |   3 +-
 .../pipe/sink/protocol/IoTDBAirGapSink.java        |   5 +-
 .../org/apache/iotdb/commons/utils/IOUtils.java    |  24 +++-
 .../apache/iotdb/commons/utils/IOUtilsTest.java    | 147 +++++++++++++++++++++
 23 files changed, 414 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index 5589fdd3799..605bd65c8a0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
@@ -163,8 +164,16 @@ public class ProcedureInfo implements SnapshotProcessor {
     try (FileInputStream fis = new 
FileInputStream(procedureFilePath.toFile())) {
       Procedure procedure = null;
       try (FileChannel channel = fis.getChannel()) {
-        ByteBuffer byteBuffer = 
ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
-        if (channel.read(byteBuffer) > 0) {
+        final long fileSize = channel.size();
+        if (fileSize > PROCEDURE_LOAD_BUFFER_SIZE) {
+          throw new IOException(
+              String.format(
+                  "Procedure file %s exceeds the load buffer limit %s, actual 
size %s",
+                  procedureFilePath, PROCEDURE_LOAD_BUFFER_SIZE, fileSize));
+        }
+        ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileSize);
+        if (fileSize > 0) {
+          IOUtils.readFully(channel, byteBuffer);
           byteBuffer.flip();
           procedure = ProcedureFactory.getInstance().create(byteBuffer);
           byteBuffer.clear();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
index f67cfdc286a..8e8df07cf36 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/writelog/io/SingleFileLogReader.java
@@ -75,10 +75,7 @@ public class SingleFileLogReader implements ILogReader {
       }
       buffer = new byte[logSize];
 
-      int readLen = logStream.read(buffer, 0, logSize);
-      if (readLen < logSize) {
-        throw new IOException("Reach eof");
-      }
+      logStream.readFully(buffer, 0, logSize);
 
       final long checkSum = logStream.readLong();
       checkSummer.reset();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
index c6b7cee0f3a..3870975fa1e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/SerializableList.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.transformation.datastructure;
 
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.TemporaryQueryDataFileService;
 
@@ -60,7 +61,7 @@ public interface SerializableList {
     }
     init();
     ByteBuffer byteBuffer = 
ByteBuffer.allocate(recorder.getSerializedByteLength());
-    recorder.getFileChannel().read(byteBuffer);
+    IOUtils.readFully(recorder.getFileChannel(), byteBuffer);
     byteBuffer.flip();
     deserialize(byteBuffer);
     recorder.closeFile();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
index 46ce88825ec..9ecca265a9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32Deserializer.java
@@ -27,7 +27,6 @@ import javax.validation.constraints.NotNull;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -67,9 +66,7 @@ public class FakeCRC32Deserializer<T> implements 
IDeserializer<T> {
     }
 
     byte[] logBuffer = new byte[logLength];
-    if (logLength < inputStream.read(logBuffer, 0, logLength)) {
-      throw new EOFException();
-    }
+    dataInputStream.readFully(logBuffer, 0, logLength);
 
     T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer));
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
index 692736474ba..1501f50acc1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/SchemaFile.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
 import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -360,7 +361,7 @@ public class SchemaFile implements ISchemaFile {
       lastSGAddr = 0L;
       pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
     } else {
-      channel.read(headerContent);
+      IOUtils.readFully(channel, headerContent);
       headerContent.clear();
       lastPageIndex = ReadWriteIOUtils.readInt(headerContent);
       dataTTL = ReadWriteIOUtils.readLong(headerContent);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
index c723e38dfaf..227ea27f505 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/log/SchemaFileLogReader.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafil
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -91,8 +93,9 @@ public class SchemaFileLogReader {
         }
       }
 
-      // corrupted within one entry
-      if (inputStream.read(tempBytes, 1, tempBytes.length - 1) < 
tempBytes.length - 2) {
+      try {
+        new DataInputStream(inputStream).readFully(tempBytes, 1, 
tempBytes.length - 1);
+      } catch (EOFException e) {
         throw new SchemaFileLogCorruptedException(logFile.getAbsolutePath(), 
"incomplete entry.");
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
index 044b7339a82..4e9d6524f19 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/schemafile/pagemgr/PageIOChannel.java
@@ -19,6 +19,7 @@
 package 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;
 
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.utils.IOUtils;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
@@ -107,7 +108,7 @@ public class PageIOChannel {
     if (!readChannel.isOpen()) {
       readChannel = FileChannel.open(pmtFile.toPath(), 
StandardOpenOption.READ);
     }
-    readChannel.read(dst, getPageAddress(pageIndex));
+    IOUtils.readFully(readChannel, dst, getPageAddress(pageIndex));
   }
 
   // region Flush Strategy
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
index 9cf9d51cf21..512c9fcea57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFile.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.schemaengine.schemaregion.tag;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
 import org.apache.commons.io.FileUtils;
@@ -114,7 +115,7 @@ public class TagLogFile implements AutoCloseable {
       throws IOException {
     // Read the first block
     ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
-    fileChannel.read(byteBuffer, position);
+    IOUtils.readFully(fileChannel, byteBuffer, position);
     byteBuffer.flip();
     if (byteBuffer.limit() > 0) { // This indicates that there is data at this 
position
       int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
@@ -129,7 +130,7 @@ public class TagLogFile implements AutoCloseable {
           // read one offset, then use filechannel's read to read it
           byteBuffers.position(MAX_LENGTH * i);
           byteBuffers.limit(MAX_LENGTH * (i + 1));
-          fileChannel.read(byteBuffers, nextPosition);
+          IOUtils.readFully(fileChannel, byteBuffers, nextPosition);
           byteBuffers.position(4 + i * Long.BYTES);
         }
         byteBuffers.limit(byteBuffers.capacity());
@@ -144,7 +145,10 @@ public class TagLogFile implements AutoCloseable {
     blockOffset.add(position);
     // Read the first block
     ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
-    fileChannel.read(byteBuffer, position);
+    if (position == fileChannel.size()) {
+      return blockOffset;
+    }
+    IOUtils.readFully(fileChannel, byteBuffer, position);
     byteBuffer.flip();
     if (byteBuffer.limit() > 0) { // This indicates that there is data at this 
position
       int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
@@ -167,7 +171,7 @@ public class TagLogFile implements AutoCloseable {
             // read
             blockBuffer.position(MAX_LENGTH * i);
             blockBuffer.limit(MAX_LENGTH * (i + 1));
-            fileChannel.read(blockBuffer, blockOffset.get(i));
+            IOUtils.readFully(fileChannel, blockBuffer, blockOffset.get(i));
             blockBuffer.position(4 + i * Long.BYTES);
           }
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
index e3d374551b1..8a6d2fbf890 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.commons.utils.IOUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -63,7 +65,7 @@ public enum WALFileVersion {
           continue;
         }
         ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length);
-        channel.read(buffer);
+        IOUtils.readFully(channel, buffer);
         buffer.flip();
         String versionString = new String(buffer.array(), 
StandardCharsets.UTF_8);
         if (version.versionString.equals(versionString)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 1827bfc9365..a56c145fe2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.utils.MmapUtil;
 
@@ -85,7 +86,8 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       if (version == WALFileVersion.V2) {
         // New Version
         ByteBuffer magicStringBuffer = 
ByteBuffer.allocate(version.getVersionBytes().length);
-        channel.read(magicStringBuffer, channel.size() - 
version.getVersionBytes().length);
+        IOUtils.readFully(
+            channel, magicStringBuffer, channel.size() - 
version.getVersionBytes().length);
         magicStringBuffer.flip();
         if 
(logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)
             || !new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
@@ -105,7 +107,8 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
         }
         // Old version
         ByteBuffer magicStringBuffer = 
ByteBuffer.allocate(version.getVersionBytes().length);
-        channel.read(magicStringBuffer, channel.size() - 
version.getVersionBytes().length);
+        IOUtils.readFully(
+            channel, magicStringBuffer, channel.size() - 
version.getVersionBytes().length);
         magicStringBuffer.flip();
         if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
             .equals(version.getVersionString())) {
@@ -117,7 +120,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
         }
       }
       // Read the metadata size
-      channel.read(metadataSizeBuf, position);
+      IOUtils.readFully(channel, metadataSizeBuf, position);
       metadataSizeBuf.flip();
       int metadataSize = metadataSizeBuf.getInt();
       endOffset = channel.size() - version.getVersionBytes().length - 
Integer.BYTES - metadataSize;
@@ -237,9 +240,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       compressedBuffer.clear();
       // limit the buffer to prevent it from reading too much byte than 
expected
       compressedBuffer.limit(segmentInfo.dataInDiskSize);
-      if (readWALBufferFromChannel(compressedBuffer) != 
segmentInfo.dataInDiskSize) {
-        throw new IOException("Unexpected end of file");
-      }
+      readWALBufferFullyFromChannel(compressedBuffer);
       compressedBuffer.flip();
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
       uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
@@ -255,9 +256,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       // limit the buffer to prevent it from reading too much byte than 
expected
       dataBuffer.limit(segmentInfo.dataInDiskSize);
 
-      if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) {
-        throw new IOException("Unexpected end of file");
-      }
+      readWALBufferFullyFromChannel(dataBuffer);
     }
     dataBuffer.flip();
   }
@@ -301,7 +300,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
 
       if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
         compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
-        readWALBufferFromChannel(compressedBuffer);
+        readWALBufferFullyFromChannel(compressedBuffer);
         compressedBuffer.flip();
         IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
@@ -310,7 +309,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
         compressedBuffer = null;
       } else {
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
-        readWALBufferFromChannel(dataBuffer);
+        readWALBufferFullyFromChannel(dataBuffer);
         dataBuffer.flip();
       }
 
@@ -349,7 +348,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
 
   private SegmentInfo getNextSegmentInfo() throws IOException {
     segmentHeaderWithoutCompressedSizeBuffer.clear();
-    channel.read(segmentHeaderWithoutCompressedSizeBuffer);
+    readWALBufferFullyFromChannel(segmentHeaderWithoutCompressedSizeBuffer);
     segmentHeaderWithoutCompressedSizeBuffer.flip();
     SegmentInfo info = new SegmentInfo();
     info.compressionType =
@@ -357,7 +356,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt();
     if (info.compressionType != CompressionType.UNCOMPRESSED) {
       compressedSizeBuffer.clear();
-      readWALBufferFromChannel(compressedSizeBuffer);
+      readWALBufferFullyFromChannel(compressedSizeBuffer);
       compressedSizeBuffer.flip();
       info.uncompressedSize = compressedSizeBuffer.getInt();
     } else {
@@ -373,6 +372,13 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     return size;
   }
 
+  private void readWALBufferFullyFromChannel(ByteBuffer buffer) throws 
IOException {
+    long startTime = System.nanoTime();
+    int size = buffer.remaining();
+    IOUtils.readFully(channel, buffer);
+    WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() - 
startTime);
+  }
+
   private void uncompressWALBuffer(
       ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor 
unCompressor)
       throws IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index ba9211656ef..9c707c788a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException;
 import org.apache.iotdb.db.utils.SerializedSize;
@@ -143,12 +144,12 @@ public class WALMetaData implements SerializedSize {
       ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
       WALFileVersion version = WALFileVersion.getVersion(channel);
       position = channel.size() - Integer.BYTES - 
(version.getVersionBytes().length);
-      channel.read(metadataSizeBuf, position);
+      IOUtils.readFully(channel, metadataSizeBuf, position);
       metadataSizeBuf.flip();
       // load metadata
       int metadataSize = metadataSizeBuf.getInt();
       ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
-      channel.read(metadataBuf, position - metadataSize);
+      IOUtils.readFully(channel, metadataBuf, position - metadataSize);
       metadataBuf.flip();
       metaData = WALMetaData.deserialize(metadataBuf);
       // versions before V1.3, should recover memTable ids from entries
@@ -157,8 +158,8 @@ public class WALMetaData implements SerializedSize {
         for (int size : metaData.buffersSize) {
           channel.position(offset);
           ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
-          channel.read(buffer);
-          buffer.clear();
+          IOUtils.readFully(channel, buffer);
+          buffer.flip();
           metaData.memTablesId.add(buffer.getLong());
           offset += size;
         }
@@ -175,7 +176,8 @@ public class WALMetaData implements SerializedSize {
 
   private static boolean isValidMagicString(FileChannel channel) throws 
IOException {
     ByteBuffer magicStringBytes = 
ByteBuffer.allocate(WALFileVersion.V2.getVersionBytes().length);
-    channel.read(magicStringBytes, channel.size() - 
WALFileVersion.V2.getVersionBytes().length);
+    IOUtils.readFully(
+        channel, magicStringBytes, channel.size() - 
WALFileVersion.V2.getVersionBytes().length);
     magicStringBytes.flip();
     String magicString = new String(magicStringBytes.array(), 
StandardCharsets.UTF_8);
     return magicString.equals(WALFileVersion.V2.getVersionString())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
index d1b27060c90..46598561a5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
 
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
@@ -65,7 +66,7 @@ public class WALRepairWriter {
     }
     try (FileChannel channel = FileChannel.open(logFile.toPath(), 
StandardOpenOption.READ)) {
       ByteBuffer magicStringBytes = ByteBuffer.allocate(size);
-      channel.read(magicStringBytes, channel.size() - size);
+      IOUtils.readFully(channel, magicStringBytes, channel.size() - size);
       magicStringBytes.flip();
       return new String(magicStringBytes.array(), StandardCharsets.UTF_8);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index 8567d30660d..1395eaa420d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -44,6 +44,7 @@ import org.apache.tsfile.write.writer.TsFileIOWriter;
 import javax.annotation.Nonnull;
 
 import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -304,9 +305,7 @@ public class AlignedChunkData implements ChunkData {
   protected void deserializeTsFileDataByte(final InputStream stream) throws 
IOException {
     final int size = ReadWriteIOUtils.readInt(stream);
     this.chunkData = new byte[size];
-    if (size != stream.read(chunkData)) {
-      throw new IOException("TsFileData byte array read error, size 
mismatch.");
-    }
+    new DataInputStream(stream).readFully(chunkData);
   }
 
   private void deserializeEntireChunk(final InputStream stream, final 
TsFileIOWriter writer)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
index 5cc7f40cd79..3d94e1a4933 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/response/SubscriptionEventTsFileResponse.java
@@ -189,20 +189,13 @@ public class SubscriptionEventTsFileResponse extends 
SubscriptionEventExtendable
           
PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(bufferSize);
       final byte[] readBuffer = new byte[(int) bufferSize];
 
-      final int readLength = reader.read(readBuffer);
-      if (readLength != bufferSize) {
-        memoryBlock.close();
-        throw new SubscriptionException(
-            String.format(
-                "inconsistent read length (broken invariant), expected: %s, 
actual: %s",
-                bufferSize, readLength));
-      }
+      reader.readFully(readBuffer);
 
       // generate subscription poll response with piece payload
       final CachedSubscriptionPollResponse response =
           new CachedSubscriptionPollResponse(
               SubscriptionPollResponseType.FILE_PIECE.getType(),
-              new FilePiecePayload(tsFile.getName(), writingOffset + 
readLength, readBuffer),
+              new FilePiecePayload(tsFile.getName(), writingOffset + 
bufferSize, readBuffer),
               commitContext);
 
       // set fixed memory block for response
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
index 98ca07dcd03..e622194265a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.utils.sort;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.IOUtils;
 import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -93,10 +94,11 @@ public class FileSpillerReader implements SortReader {
       if (readLen == -1) {
         return -1;
       }
+      IOUtils.readFully(fileChannel, bytes);
       bytes.flip();
       int capacity = bytes.getInt();
       ByteBuffer tsBlockBytes = ByteBuffer.allocate(capacity);
-      fileChannel.read(tsBlockBytes);
+      IOUtils.readFully(fileChannel, tsBlockBytes);
       tsBlockBytes.flip();
       TsBlock cachedTsBlock = serde.deserialize(tsBlockBytes);
       cacheBlocks.add(cachedTsBlock);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
new file mode 100644
index 00000000000..b9f58311977
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/logfile/FakeCRC32DeserializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.logfile;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class FakeCRC32DeserializerTest {
+
+  @Test
+  public void deserializeReadsCompletePayloadAfterShortRead() throws 
IOException {
+    byte[] payload = new byte[] {1, 2, 3, 4};
+
+    byte[] deserialized =
+        new FakeCRC32Deserializer<>(new ByteBufferDeserializer())
+            .deserialize(new OneByteAtATimeInputStream(serialize(payload, 
true)));
+
+    Assert.assertArrayEquals(payload, deserialized);
+  }
+
+  @Test
+  public void deserializeThrowsWhenPayloadIsTruncated() throws IOException {
+    byte[] bytes = serialize(new byte[] {1, 2}, false, false);
+
+    Assert.assertThrows(
+        EOFException.class,
+        () ->
+            new FakeCRC32Deserializer<>(new ByteBufferDeserializer())
+                .deserialize(new OneByteAtATimeInputStream(bytes)));
+  }
+
+  private static byte[] serialize(byte[] payload, boolean complete) throws 
IOException {
+    return serialize(payload, complete, true);
+  }
+
+  private static byte[] serialize(byte[] payload, boolean complete, boolean 
writeValidationCode)
+      throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try (DataOutputStream dataOutputStream = new 
DataOutputStream(outputStream)) {
+      dataOutputStream.writeInt(complete ? payload.length : payload.length + 
1);
+      dataOutputStream.write(payload);
+      if (writeValidationCode) {
+        dataOutputStream.writeLong(0L);
+      }
+    }
+    return outputStream.toByteArray();
+  }
+
+  private static class ByteBufferDeserializer implements IDeserializer<byte[]> 
{
+
+    @Override
+    public byte[] deserialize(ByteBuffer buffer) {
+      byte[] bytes = new byte[buffer.remaining()];
+      buffer.get(bytes);
+      return bytes;
+    }
+  }
+
+  private static class OneByteAtATimeInputStream extends InputStream {
+
+    private final byte[] bytes;
+    private int index;
+
+    private OneByteAtATimeInputStream(byte[] bytes) {
+      this.bytes = bytes;
+    }
+
+    @Override
+    public int read() {
+      return index < bytes.length ? bytes[index++] & 0xFF : -1;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+      if (len == 0) {
+        return 0;
+      }
+      if (index >= bytes.length) {
+        return -1;
+      }
+      b[off] = bytes[index++];
+      return 1;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
new file mode 100644
index 00000000000..9f3688d76fa
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/schemaregion/tag/TagLogFileTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.schemaengine.schemaregion.tag;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+
+public class TagLogFileTest {
+
+  private File tempDir;
+
+  @After
+  public void tearDown() throws Exception {
+    if (tempDir != null) {
+      FileUtils.deleteDirectory(tempDir);
+    }
+  }
+
+  @Test
+  public void writeAppendsFirstRecordWithoutReadingPastFileEnd() throws 
Exception {
+    tempDir = Files.createTempDirectory("tag-log-file").toFile();
+    Map<String, String> tags = Collections.singletonMap("tag", "value");
+    Map<String, String> attributes = Collections.singletonMap("attr", "value");
+
+    try (TagLogFile tagLogFile = new TagLogFile(tempDir.getAbsolutePath(), 
"tag.log")) {
+      long offset = tagLogFile.write(tags, attributes);
+      Pair<Map<String, String>, Map<String, String>> result = 
tagLogFile.read(offset);
+
+      Assert.assertEquals(tags, result.left);
+      Assert.assertEquals(attributes, result.right);
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
index bb1b2c1ce5e..02139d2058d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
@@ -29,6 +29,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -279,13 +280,7 @@ public class TimeWindowStateProgressIndex extends 
ProgressIndex {
         continue;
       }
       final byte[] body = new byte[length];
-      final int readLen = stream.read(body);
-      if (readLen != length) {
-        throw new IOException(
-            String.format(
-                "The intended read length is %s but %s is actually read when 
deserializing TimeProgressIndex, ProgressIndex: %s",
-                length, readLen, timeWindowStateProgressIndex));
-      }
+      new DataInputStream(stream).readFully(body);
       final ByteBuffer dstBuffer = ByteBuffer.wrap(body);
       timeWindowStateProgressIndex.timeSeries2TimestampWindowBufferPairMap.put(
           timeSeries, new Pair<>(timestamp, dstBuffer));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
index fc739b58ea6..011a99e8a1d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.executable;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.trigger.exception.TriggerJarTooLargeException;
+import org.apache.iotdb.commons.utils.IOUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.tsfile.fileSystem.FSFactoryProducer;
@@ -214,7 +215,7 @@ public class ExecutableManager {
             String.format("Size of file exceed %d bytes", Integer.MAX_VALUE));
       }
       ByteBuffer byteBuffer = ByteBuffer.allocate((int) size);
-      fileChannel.read(byteBuffer);
+      IOUtils.readFully(fileChannel, byteBuffer);
       byteBuffer.flip();
       return byteBuffer;
     } catch (Exception e) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
index e2279ce06f4..7798acf28e0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/serializer/PlainQueueSerializer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.datastructure.queue.serializer;
 
 import 
org.apache.iotdb.commons.pipe.datastructure.queue.ConcurrentIterableLinkedQueue;
+import org.apache.iotdb.commons.utils.IOUtils;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -68,7 +69,7 @@ public class PlainQueueSerializer<E> implements 
QueueSerializer<E> {
         }
         int capacity = ReadWriteIOUtils.readInt(inputStream);
         ByteBuffer buffer = ByteBuffer.allocate(capacity);
-        channel.read(buffer);
+        IOUtils.readFully(channel, buffer);
         buffer.flip();
         E element = elementDeserializationFunction.apply(buffer);
         if (element == null) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index 498baeda582..f2c54cd2c94 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -332,8 +333,8 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
     outputStream.flush();
 
     final byte[] response = new byte[1];
-    final int size = socket.getInputStream().read(response);
-    return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
+    new DataInputStream(socket.getInputStream()).readFully(response);
+    return Arrays.equals(AirGapOneByteResponse.OK, response);
   }
 
   protected boolean send(final AirGapSocket socket, final byte[] bytes) throws 
IOException {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
index f96ae8443b9..047dd6bfea5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java
@@ -28,10 +28,12 @@ import com.google.common.base.Supplier;
 import org.apache.tsfile.utils.Pair;
 
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -94,6 +96,26 @@ public class IOUtils {
     outputStream.write(encodingBuffer.array(), 0, Integer.BYTES);
   }
 
+  public static void readFully(FileChannel fileChannel, ByteBuffer buffer) 
throws IOException {
+    while (buffer.hasRemaining()) {
+      if (fileChannel.read(buffer) <= 0) {
+        throw new EOFException();
+      }
+    }
+  }
+
+  public static void readFully(FileChannel fileChannel, ByteBuffer buffer, 
long position)
+      throws IOException {
+    long currentPosition = position;
+    while (buffer.hasRemaining()) {
+      final int readBytes = fileChannel.read(buffer, currentPosition);
+      if (readBytes <= 0) {
+        throw new EOFException();
+      }
+      currentPosition += readBytes;
+    }
+  }
+
   /**
    * Read a string from the given stream.
    *
@@ -120,7 +142,7 @@ public class IOUtils {
         strBuffer = new byte[length];
       }
 
-      inputStream.read(strBuffer, 0, length);
+      inputStream.readFully(strBuffer, 0, length);
       return new String(strBuffer, 0, length, encoding);
     }
     return null;
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
new file mode 100644
index 00000000000..bf2cf546628
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/IOUtilsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IOUtilsTest {
+
+  private static final String ENCODING = "UTF-8";
+
+  @Test
+  public void readStringReadsCompletePayloadAfterShortRead() throws 
IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    IOUtils.writeString(outputStream, "abcdefg", ENCODING, null);
+
+    try (DataInputStream inputStream =
+        new DataInputStream(new 
OneByteAtATimeInputStream(outputStream.toByteArray()))) {
+      Assert.assertEquals("abcdefg", IOUtils.readString(inputStream, ENCODING, 
null));
+    }
+  }
+
+  @Test
+  public void readStringThrowsWhenPayloadIsTruncated() throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    IOUtils.writeInt(outputStream, 7, null);
+    outputStream.write(new byte[] {'a', 'b', 'c'});
+
+    try (DataInputStream inputStream =
+        new DataInputStream(new 
OneByteAtATimeInputStream(outputStream.toByteArray()))) {
+      Assert.assertThrows(
+          EOFException.class, () -> IOUtils.readString(inputStream, ENCODING, 
null));
+    }
+  }
+
+  @Test
+  public void readFullyReadsCompleteByteBufferAfterShortChannelRead() throws 
IOException {
+    byte[] bytes = new byte[] {1, 2, 3};
+    FileChannel channel = mockOneByteAtATimeChannel(bytes);
+    ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+
+    IOUtils.readFully(channel, buffer);
+
+    Assert.assertArrayEquals(bytes, buffer.array());
+  }
+
+  @Test
+  public void 
readFullyReadsCompleteByteBufferFromPositionAfterShortChannelRead()
+      throws IOException {
+    byte[] bytes = new byte[] {1, 2, 3, 4, 5};
+    FileChannel channel = mockOneByteAtATimeChannel(bytes);
+    ByteBuffer buffer = ByteBuffer.allocate(3);
+
+    IOUtils.readFully(channel, buffer, 2);
+
+    Assert.assertArrayEquals(new byte[] {3, 4, 5}, buffer.array());
+  }
+
+  @Test
+  public void readFullyThrowsWhenChannelIsTruncated() throws IOException {
+    FileChannel channel = mockOneByteAtATimeChannel(new byte[] {1, 2});
+    ByteBuffer buffer = ByteBuffer.allocate(3);
+
+    Assert.assertThrows(EOFException.class, () -> IOUtils.readFully(channel, 
buffer));
+  }
+
+  private static FileChannel mockOneByteAtATimeChannel(byte[] bytes) throws 
IOException {
+    FileChannel channel = Mockito.mock(FileChannel.class);
+    AtomicInteger index = new AtomicInteger();
+    Mockito.when(channel.read(Mockito.any(ByteBuffer.class)))
+        .thenAnswer(
+            invocation -> {
+              ByteBuffer buffer = invocation.getArgument(0);
+              int currentIndex = index.getAndIncrement();
+              if (currentIndex >= bytes.length) {
+                return -1;
+              }
+              buffer.put(bytes[currentIndex]);
+              return 1;
+            });
+    Mockito.when(channel.read(Mockito.any(ByteBuffer.class), 
Mockito.anyLong()))
+        .thenAnswer(
+            invocation -> {
+              ByteBuffer buffer = invocation.getArgument(0);
+              long position = invocation.getArgument(1);
+              if (position >= bytes.length) {
+                return -1;
+              }
+              buffer.put(bytes[(int) position]);
+              return 1;
+            });
+    return channel;
+  }
+
+  private static class OneByteAtATimeInputStream extends InputStream {
+
+    private final byte[] bytes;
+    private int index;
+
+    private OneByteAtATimeInputStream(byte[] bytes) {
+      this.bytes = bytes;
+    }
+
+    @Override
+    public int read() {
+      return index < bytes.length ? bytes[index++] & 0xFF : -1;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+      if (len == 0) {
+        return 0;
+      }
+      if (index >= bytes.length) {
+        return -1;
+      }
+      b[off] = bytes[index++];
+      return 1;
+    }
+  }
+}


Reply via email to