This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d1404ca30 [core] Fix offset bug in StandardLineReader (#6577)
8d1404ca30 is described below

commit 8d1404ca302775cbe4f4ec5faffb07aa628bd763
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 10 16:51:25 2025 +0800

    [core] Fix offset bug in StandardLineReader (#6577)
---
 .../paimon/format/text/StandardLineReader.java     |   4 +-
 .../paimon/format/text/StandardLineReaderTest.java | 202 ++++++++-------------
 2 files changed, 75 insertions(+), 131 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
index eb4f020e43..a2f0684667 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
@@ -36,6 +36,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class StandardLineReader implements TextLineReader {
 
     private final InputStream in;
+    private final long offset;
     private final byte[] buffer;
     private final @Nullable Long length;
     private final ByteArrayOutputStream lineBuilder;
@@ -48,6 +49,7 @@ public class StandardLineReader implements TextLineReader {
     public StandardLineReader(InputStream in, long offset, @Nullable Long 
length)
             throws IOException {
         this.in = in;
+        this.offset = offset;
         this.buffer = new byte[8192];
         this.length = length;
         this.lineBuilder = new ByteArrayOutputStream();
@@ -193,7 +195,7 @@ public class StandardLineReader implements TextLineReader {
             }
             int currentBufferSize = bufferEnd - bufferPosition;
             long currentPosition = ((SeekableInputStream) in).getPos() - 
currentBufferSize;
-            return currentPosition > length;
+            return currentPosition > length + offset;
         }
         return false;
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
index 30ef14688d..463ae132e3 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
@@ -19,44 +19,24 @@
 package org.apache.paimon.format.text;
 
 import org.apache.paimon.fs.ByteArraySeekableStream;
-import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.fs.local.LocalFileIO;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link StandardLineReader}. */
 public class StandardLineReaderTest {
 
-    @TempDir java.nio.file.Path tempDir;
-
-    private LocalFileIO fileIO;
-    private java.nio.file.Path testFile;
-
-    @BeforeEach
-    public void setUp() {
-        fileIO = new LocalFileIO();
-        testFile = tempDir.resolve("test.txt");
-    }
-
-    @AfterEach
-    public void tearDown() throws IOException {
-        if (java.nio.file.Files.exists(testFile)) {
-            java.nio.file.Files.delete(testFile);
-        }
-    }
-
     /**
      * After the stream is closed, we cannot use getPos, so we need to ensure 
that it is manually
      * closed.
@@ -85,15 +65,10 @@ public class StandardLineReaderTest {
     }
 
     @Test
-    public void testReadLineWithLineFeedDelimiter() throws IOException {
+    public void testReadLineNoSplitting() throws IOException {
         // Test basic functionality with \n delimiter
         String content = "line1\nline2\nline3";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        "\n",
-                        0,
-                        null)) {
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isEqualTo("line1");
             assertThat(reader.readLine()).isEqualTo("line2");
             assertThat(reader.readLine()).isEqualTo("line3");
@@ -105,12 +80,7 @@ public class StandardLineReaderTest {
     public void testReadLineWithCarriageReturnDelimiter() throws IOException {
         // Test basic functionality with \r delimiter
         String content = "line1\rline2\rline3";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        "\r",
-                        0,
-                        null)) {
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isEqualTo("line1");
             assertThat(reader.readLine()).isEqualTo("line2");
             assertThat(reader.readLine()).isEqualTo("line3");
@@ -122,12 +92,7 @@ public class StandardLineReaderTest {
     public void testReadLineWithCRLF() throws IOException {
         // Test basic functionality with \r\n delimiter
         String content = "line1\r\nline2\r\nline3";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        "\r\n",
-                        0,
-                        null)) {
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isEqualTo("line1");
             assertThat(reader.readLine()).isEqualTo("line2");
             assertThat(reader.readLine()).isEqualTo("line3");
@@ -139,12 +104,7 @@ public class StandardLineReaderTest {
     public void testReadLineWithEmptyLines() throws IOException {
         // Test reading empty lines
         String content = "line1\n\nline3\n";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        null,
-                        0,
-                        null)) {
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isEqualTo("line1");
             assertThat(reader.readLine()).isEqualTo("");
             assertThat(reader.readLine()).isEqualTo("line3");
@@ -161,8 +121,7 @@ public class StandardLineReaderTest {
         System.arraycopy(bom, 0, fullContent, 0, bom.length);
         System.arraycopy(content, 0, fullContent, bom.length, content.length);
 
-        try (TextLineReader reader =
-                TextLineReader.create(new ByteArrayInputStream(fullContent), 
null, 0, null)) {
+        try (TextLineReader reader = create(fullContent, 0, null)) {
             assertThat(reader.readLine()).isEqualTo("line1");
             assertThat(reader.readLine()).isEqualTo("line2");
             assertThat(reader.readLine()).isNull();
@@ -171,63 +130,83 @@ public class StandardLineReaderTest {
 
     @Test
     public void testReadLineWithSplitAtBeginning() throws IOException {
-        // Test reading with split at the beginning
         String content = "line1\nline2\nline3\nline4";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
-                        null,
-                        0,
-                        10L)) {
+        try (TextLineReader reader = create(content, 0, 10L)) {
             assertThat(reader.readLine()).isEqualTo("line1");
             assertThat(reader.readLine()).isEqualTo("line2");
-            // Should stop reading after reaching the split length
+            assertThat(reader.readLine()).isNull();
+        }
+        try (TextLineReader reader = create(content, 10, 6L)) {
+            assertThat(reader.readLine()).isEqualTo("line3");
+            assertThat(reader.readLine()).isNull();
+        }
+        try (TextLineReader reader = create(content, 16, 7L)) {
+            assertThat(reader.readLine()).isEqualTo("line4");
             assertThat(reader.readLine()).isNull();
         }
     }
 
     @Test
-    public void testReadLineWithSplitInMiddle() throws IOException {
-        // Test reading with split in the middle
+    public void testReadLineWithSplitInEndTwoLines() throws IOException {
         String content = "line1\nline2\nline3\nline4";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
-                        null,
-                        3,
-                        null)) {
-            // Should skip the first line and start from the second line
+        try (TextLineReader reader = create(content, 10, 13L)) {
+            assertThat(reader.readLine()).isEqualTo("line3");
+            assertThat(reader.readLine()).isEqualTo("line4");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithSplitInEndEmpty() throws IOException {
+        String content = "line1\nline2\nline3\nline4";
+        try (TextLineReader reader = create(content, 0, 18L)) {
+            assertThat(reader.readLine()).isEqualTo("line1");
             assertThat(reader.readLine()).isEqualTo("line2");
             assertThat(reader.readLine()).isEqualTo("line3");
             assertThat(reader.readLine()).isEqualTo("line4");
             assertThat(reader.readLine()).isNull();
         }
+        try (TextLineReader reader = create(content, 18, 5L)) {
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithRandomSplit() throws IOException {
+        String content = "line1\nline2\nline3\nline4";
+        int splitSize = ThreadLocalRandom.current().nextInt(content.length()) 
+ 1;
+        int remainLen = content.length();
+        List<String> lines = new ArrayList<>();
+        while (remainLen > 0) {
+            int len = Math.min(remainLen, splitSize);
+            try (TextLineReader reader =
+                    create(content, content.length() - remainLen, (long) len)) 
{
+                while (true) {
+                    String line = reader.readLine();
+                    if (line != null) {
+                        lines.add(line);
+                    } else {
+                        break;
+                    }
+                }
+            }
+            remainLen -= len;
+        }
+        assertThat(lines).containsExactly("line1", "line2", "line3", "line4");
     }
 
     @Test
     public void testReadLineWithEmptyFile() throws IOException {
-        // Test reading an empty file
         String content = "";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        null,
-                        0,
-                        null)) {
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isNull();
         }
     }
 
     @Test
     public void testReadLineWithSingleLine() throws IOException {
-        // Test reading a single line without line terminator
         String content = "single line";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        null,
-                        0,
-                        null)) {
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isEqualTo("single line");
             assertThat(reader.readLine()).isNull();
         }
@@ -235,14 +214,8 @@ public class StandardLineReaderTest {
 
     @Test
     public void testReadLineWithSingleLineAndNewline() throws IOException {
-        // Test reading a single line with line terminator
         String content = "single line\n";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        null,
-                        0,
-                        null)) {
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isEqualTo("single line");
             assertThat(reader.readLine()).isNull();
         }
@@ -255,53 +228,22 @@ public class StandardLineReaderTest {
         for (int i = 0; i < 10000; i++) {
             longLine.append("a");
         }
-        String content = longLine.toString() + "\nline2";
-        try (TextLineReader reader =
-                TextLineReader.create(
-                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
-                        null,
-                        0,
-                        null)) {
+        String content = longLine + "\nline2";
+        try (TextLineReader reader = create(content, 0, null)) {
             assertThat(reader.readLine()).isEqualTo(longLine.toString());
             assertThat(reader.readLine()).isEqualTo("line2");
             assertThat(reader.readLine()).isNull();
         }
     }
 
-    @Test
-    public void testReadLineWithSeekableInputStream() throws IOException {
-        // Test reading with seekable input stream
-        String content = "line1\nline2\nline3";
-        createFile(content);
-
-        try (SeekableInputStream inputStream =
-                fileIO.newInputStream(new 
org.apache.paimon.fs.Path(testFile.toString()))) {
-            try (TextLineReader reader = TextLineReader.create(inputStream, 
null, 3, null)) {
-                // Should skip the first line and start from the second line
-                assertThat(reader.readLine()).isEqualTo("line2");
-                assertThat(reader.readLine()).isEqualTo("line3");
-                assertThat(reader.readLine()).isNull();
-            }
-        }
-    }
-
-    @Test
-    public void testReadLineWithNonSeekableInputStreamAndNonZeroOffset() 
throws IOException {
-        // Test that non-seekable input stream with non-zero offset throws 
exception
-        String content = "line1\nline2\nline3";
-        ByteArrayInputStream inputStream =
-                new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
-
-        assertThatThrownBy(() -> new StandardLineReader(inputStream, 1, null))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessage("Current position only supported for uncompressed 
files");
+    private TextLineReader create(String content, long offset, @Nullable Long 
length)
+            throws IOException {
+        return create(content.getBytes(StandardCharsets.UTF_8), offset, 
length);
     }
 
-    /** Helper method to create a test file with the given content. */
-    private void createFile(String content) throws IOException {
-        try (PositionOutputStream out =
-                fileIO.newOutputStream(new 
org.apache.paimon.fs.Path(testFile.toString()), false)) {
-            out.write(content.getBytes(StandardCharsets.UTF_8));
-        }
+    private TextLineReader create(byte[] content, long offset, @Nullable Long 
length)
+            throws IOException {
+        ByteArraySeekableStream input = new ByteArraySeekableStream(content);
+        return TextLineReader.create(input, null, offset, length);
     }
 }

Reply via email to