This is an automated email from the ASF dual-hosted git repository.

jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f371835e36 [text] Text formats support split file to read by offset 
and len (#6568)
f371835e36 is described below

commit f371835e361ae07608626b5ea2588dda88b4417f
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 10 12:53:25 2025 +0800

    [text] Text formats support split file to read by offset and len (#6568)
---
 .../apache/paimon/format/FormatReaderFactory.java  |   8 +
 .../main/java/org/apache/paimon/utils/IOUtils.java |  12 +
 .../apache/paimon/format/csv/CsvFileFormat.java    |  17 +-
 .../apache/paimon/format/csv/CsvFileReader.java    |   8 +-
 .../apache/paimon/format/json/JsonFileFormat.java  |  19 +-
 .../apache/paimon/format/json/JsonFileReader.java  |  12 +-
 .../paimon/format/text/CustomLineReader.java       |   3 +
 .../paimon/format/text/StandardLineReader.java     | 183 +++++++++++++-
 .../apache/paimon/format/text/TextFileReader.java  |  21 +-
 .../apache/paimon/format/text/TextLineReader.java  |  13 +-
 .../paimon/format/text/StandardLineReaderTest.java | 279 +++++++++++++++++++++
 .../paimon/format/text/TextFileReaderTest.java     |   2 +-
 12 files changed, 552 insertions(+), 25 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index 8e64cc63a4..fae5f6d8a5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -34,6 +34,14 @@ public interface FormatReaderFactory {
 
     FileRecordReader<InternalRow> createReader(Context context) throws 
IOException;
 
+    default FileRecordReader<InternalRow> createReader(Context context, long 
offset, long length)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Format %s does not support create reader with offset 
and length.",
+                        getClass().getName()));
+    }
+
     /** Context for creating reader. */
     interface Context {
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
index 29352b4e81..e0e51053fd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
@@ -101,6 +101,18 @@ public final class IOUtils {
         return output.toByteArray();
     }
 
+    public static int readNBytes(InputStream in, byte[] b, int off, int len) 
throws IOException {
+        int n = 0;
+        while (n < len) {
+            int count = in.read(b, off + n, len - n);
+            if (count < 0) {
+                break;
+            }
+            n += count;
+        }
+        return n;
+    }
+
     /**
      * Reads len bytes in a loop.
      *
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
index 5ee4483b70..5e7b669559 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -119,7 +119,22 @@ public class CsvFileFormat extends FileFormat {
                     context.filePath(),
                     dataSchemaRowType,
                     projectedRowType,
-                    options);
+                    options,
+                    0,
+                    null);
+        }
+
+        @Override
+        public FileRecordReader<InternalRow> createReader(Context context, 
long offset, long length)
+                throws IOException {
+            return new CsvFileReader(
+                    context.fileIO(),
+                    context.filePath(),
+                    dataSchemaRowType,
+                    projectedRowType,
+                    options,
+                    offset,
+                    length);
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 1d3ceeafce..2c7477cc0d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -24,6 +24,8 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.RowType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /** CSV file reader implementation. */
@@ -39,9 +41,11 @@ public class CsvFileReader extends TextFileReader {
             Path filePath,
             RowType rowReadType,
             RowType projectedRowType,
-            CsvOptions options)
+            CsvOptions options,
+            long offset,
+            @Nullable Long length)
             throws IOException {
-        super(fileIO, filePath, projectedRowType, options.lineDelimiter());
+        super(fileIO, filePath, projectedRowType, options.lineDelimiter(), 
offset, length);
         this.includeHeader = options.includeHeader();
         this.csvParser =
                 new CsvParser(
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
index 1d6bb15930..c1892a944e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
@@ -25,8 +25,6 @@ import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.fs.CloseShieldOutputStream;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.FileRecordReader;
@@ -116,9 +114,20 @@ public class JsonFileFormat extends FileFormat {
 
         @Override
         public FileRecordReader<InternalRow> createReader(Context context) 
throws IOException {
-            FileIO fileIO = context.fileIO();
-            Path filePath = context.filePath();
-            return new JsonFileReader(fileIO, filePath, projectedRowType, 
options);
+            return new JsonFileReader(
+                    context.fileIO(), context.filePath(), projectedRowType, 
options, 0, null);
+        }
+
+        @Override
+        public FileRecordReader<InternalRow> createReader(Context context, 
long offset, long length)
+                throws IOException {
+            return new JsonFileReader(
+                    context.fileIO(),
+                    context.filePath(),
+                    projectedRowType,
+                    options,
+                    offset,
+                    length);
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 99b433805c..b10dd8f689 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -39,6 +39,8 @@ import org.apache.paimon.utils.JsonSerdeUtil;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
@@ -53,9 +55,15 @@ public class JsonFileReader extends TextFileReader {
 
     private final JsonOptions options;
 
-    public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType, 
JsonOptions options)
+    public JsonFileReader(
+            FileIO fileIO,
+            Path filePath,
+            RowType rowType,
+            JsonOptions options,
+            long offset,
+            @Nullable Long length)
             throws IOException {
-        super(fileIO, filePath, rowType, options.getLineDelimiter());
+        super(fileIO, filePath, rowType, options.getLineDelimiter(), offset, 
length);
         this.options = options;
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
index 9a71ca585d..1652798ed5 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.format.text;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,6 +38,7 @@ public class CustomLineReader implements TextLineReader {
         this.delimiter = delimiter;
     }
 
+    @Nullable
     @Override
     public String readLine() throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
index 4b5acb9b1e..49cfea4556 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
@@ -18,29 +18,196 @@
 
 package org.apache.paimon.format.text;
 
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
+import java.io.UnsupportedEncodingException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** A {@link TextLineReader} using {@link BufferedReader} to read by '\n'. */
 public class StandardLineReader implements TextLineReader {
 
-    private final BufferedReader bufferedReader;
+    private final InputStream in;
+    private final byte[] buffer;
+    private final @Nullable Long length;
+    private final ByteArrayOutputStream lineBuilder;
 
-    public StandardLineReader(InputStream inputStream) {
-        this.bufferedReader =
-                new BufferedReader(new InputStreamReader(inputStream, 
StandardCharsets.UTF_8));
+    private int bufferStart;
+    private int bufferEnd;
+    private int bufferPosition;
+    private boolean closed;
+
+    public StandardLineReader(InputStream in, long offset, @Nullable Long 
length)
+            throws IOException {
+        this.in = in;
+        this.buffer = new byte[8192];
+        this.length = length;
+        this.lineBuilder = new ByteArrayOutputStream();
+        if (offset == 0) {
+            readAtBeginning();
+        } else {
+            readAtOffset(offset);
+        }
     }
 
+    private void readAtBeginning() throws IOException {
+        fillBuffer();
+        // skipping UTF-8 BOM
+        if (bufferEnd >= 3
+                && buffer[0] == (byte) 0xEF
+                && (buffer[1] == (byte) 0xBB)
+                && (buffer[2] == (byte) 0xBF)) {
+            bufferStart = 3;
+            bufferPosition = 3;
+        }
+    }
+
+    private void readAtOffset(long offset) throws IOException {
+        if (!(in instanceof SeekableInputStream)) {
+            throw new IllegalStateException(
+                    "Current position only supported for uncompressed files");
+        }
+        ((SeekableInputStream) in).seek(offset);
+        skipFirstLine();
+    }
+
+    private void skipFirstLine() throws IOException {
+        while (!closed) {
+            if (reachLengthLimit()) {
+                close();
+                return;
+            }
+
+            // fill buffer if necessary
+            if (bufferPosition >= bufferEnd) {
+                fillBuffer();
+                if (closed) {
+                    return;
+                }
+            }
+
+            if (seekToStartOfLineTerminator()) {
+                seekPastLineTerminator();
+                return;
+            }
+        }
+    }
+
+    @Nullable
     @Override
     public String readLine() throws IOException {
-        return bufferedReader.readLine();
+        lineBuilder.reset();
+
+        if (reachLengthLimit()) {
+            close();
+            return null;
+        }
+
+        if (bufferPosition >= bufferEnd) {
+            fillBuffer();
+        }
+
+        while (!closed) {
+            if (seekToStartOfLineTerminator()) {
+                copyToLineBuilder();
+                seekPastLineTerminator();
+                return buildLine();
+            }
+
+            checkArgument(bufferPosition == bufferEnd, "expected to be at the 
end of the buffer");
+            copyToLineBuilder();
+            fillBuffer();
+        }
+        String line = buildLine();
+        if (line.isEmpty()) {
+            return null;
+        }
+        return line;
+    }
+
+    private boolean seekToStartOfLineTerminator() {
+        while (bufferPosition < bufferEnd) {
+            if (isEndOfLineCharacter(buffer[bufferPosition])) {
+                return true;
+            }
+            bufferPosition++;
+        }
+        return false;
+    }
+
+    private static boolean isEndOfLineCharacter(byte currentByte) {
+        return currentByte == '\n' || currentByte == '\r';
+    }
+
+    private void seekPastLineTerminator() throws IOException {
+        checkArgument(
+                isEndOfLineCharacter(buffer[bufferPosition]), "Stream is not 
at a line terminator");
+
+        // skip carriage return if present
+        if (buffer[bufferPosition] == '\r') {
+            bufferPosition++;
+
+            // fill buffer if necessary
+            if (bufferPosition >= bufferEnd) {
+                fillBuffer();
+                if (closed) {
+                    return;
+                }
+            }
+        }
+
+        // skip newline if present
+        if (buffer[bufferPosition] == '\n') {
+            bufferPosition++;
+        }
+        bufferStart = bufferPosition;
+    }
+
+    private void fillBuffer() throws IOException {
+        if (closed) {
+            return;
+        }
+        checkArgument(bufferPosition >= bufferEnd, "Buffer is not empty");
+        bufferStart = 0;
+        bufferPosition = 0;
+        bufferEnd = IOUtils.readNBytes(in, buffer, 0, buffer.length);
+        if (bufferEnd == 0) {
+            close();
+        }
+    }
+
+    private boolean reachLengthLimit() throws IOException {
+        if (length != null) {
+            if (!(in instanceof SeekableInputStream)) {
+                throw new IllegalStateException(
+                        "Current position only supported for uncompressed 
files");
+            }
+            int currentBufferSize = bufferEnd - bufferPosition;
+            long currentPosition = ((SeekableInputStream) in).getPos() - 
currentBufferSize;
+            return currentPosition > length;
+        }
+        return false;
+    }
+
+    private void copyToLineBuilder() {
+        lineBuilder.write(buffer, bufferStart, bufferPosition - bufferStart);
+    }
+
+    private String buildLine() throws UnsupportedEncodingException {
+        return lineBuilder.toString(UTF_8.name());
     }
 
     @Override
     public void close() throws IOException {
-        bufferedReader.close();
+        closed = true;
+        in.close();
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
index 982ff998b6..d9cbd0ba0e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
@@ -30,6 +30,8 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 
+import static 
org.apache.paimon.format.text.HadoopCompressionUtils.createDecompressedInputStream;
+
 /** Base class for text-based file readers that provides common functionality. 
*/
 public abstract class TextFileReader implements FileRecordReader<InternalRow> {
 
@@ -37,18 +39,25 @@ public abstract class TextFileReader implements 
FileRecordReader<InternalRow> {
     private final TextRecordIterator reader;
 
     protected final RowType rowType;
+    protected final long offset;
     protected final TextLineReader lineReader;
 
     protected boolean readerClosed = false;
 
-    protected TextFileReader(FileIO fileIO, Path filePath, RowType rowType, 
String delimiter)
+    protected TextFileReader(
+            FileIO fileIO,
+            Path filePath,
+            RowType rowType,
+            String delimiter,
+            long offset,
+            @Nullable Long length)
             throws IOException {
         this.filePath = filePath;
         this.rowType = rowType;
+        this.offset = offset;
         InputStream decompressedStream =
-                HadoopCompressionUtils.createDecompressedInputStream(
-                        fileIO.newInputStream(filePath), filePath);
-        this.lineReader = TextLineReader.create(decompressedStream, delimiter);
+                createDecompressedInputStream(fileIO.newInputStream(filePath), 
filePath);
+        this.lineReader = TextLineReader.create(decompressedStream, delimiter, 
offset, length);
         this.reader = new TextRecordIterator();
     }
 
@@ -131,6 +140,10 @@ public abstract class TextFileReader implements 
FileRecordReader<InternalRow> {
 
         @Override
         public long returnedPosition() {
+            if (offset > 0) {
+                throw new UnsupportedOperationException(
+                        "Cannot return position with reading offset.");
+            }
             return Math.max(0, currentPosition - 1);
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
index 15c671adc1..50fb00c66b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.format.text;
 
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,16 +28,23 @@ import java.nio.charset.StandardCharsets;
 /** Reader to read lines. */
 public interface TextLineReader extends Closeable {
 
+    @Nullable
     String readLine() throws IOException;
 
-    static TextLineReader create(InputStream inputStream, String delimiter) {
+    static TextLineReader create(
+            InputStream inputStream, String delimiter, long offset, @Nullable 
Long length)
+            throws IOException {
         byte[] delimiterBytes =
                 delimiter != null && !"\n".equals(delimiter)
                         ? delimiter.getBytes(StandardCharsets.UTF_8)
                         : null;
         if (delimiterBytes == null || delimiterBytes.length == 0) {
-            return new StandardLineReader(inputStream);
+            return new StandardLineReader(inputStream, offset, length);
         } else {
+            if (offset != 0 || length != null) {
+                throw new UnsupportedOperationException(
+                        "Custom line text file does not support offset and 
length.");
+            }
             return new CustomLineReader(inputStream, delimiterBytes);
         }
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
new file mode 100644
index 0000000000..f8b94f7cc6
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link StandardLineReader}. */
+public class StandardLineReaderTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private LocalFileIO fileIO;
+    private java.nio.file.Path testFile;
+
+    @BeforeEach
+    public void setUp() {
+        fileIO = new LocalFileIO();
+        testFile = tempDir.resolve("test.txt");
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        if (java.nio.file.Files.exists(testFile)) {
+            java.nio.file.Files.delete(testFile);
+        }
+    }
+
+    @Test
+    public void testReadLineWithLineFeedDelimiter() throws IOException {
+        // Test basic functionality with \n delimiter
+        String content = "line1\nline2\nline3";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        "\n",
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isEqualTo("line1");
+            assertThat(reader.readLine()).isEqualTo("line2");
+            assertThat(reader.readLine()).isEqualTo("line3");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithCarriageReturnDelimiter() throws IOException {
+        // Test basic functionality with \r delimiter
+        String content = "line1\rline2\rline3";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        "\r",
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isEqualTo("line1");
+            assertThat(reader.readLine()).isEqualTo("line2");
+            assertThat(reader.readLine()).isEqualTo("line3");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithCRLF() throws IOException {
+        // Test basic functionality with \r\n delimiter
+        String content = "line1\r\nline2\r\nline3";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        "\r\n",
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isEqualTo("line1");
+            assertThat(reader.readLine()).isEqualTo("line2");
+            assertThat(reader.readLine()).isEqualTo("line3");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithEmptyLines() throws IOException {
+        // Test reading empty lines
+        String content = "line1\n\nline3\n";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        null,
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isEqualTo("line1");
+            assertThat(reader.readLine()).isEqualTo("");
+            assertThat(reader.readLine()).isEqualTo("line3");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithBOM() throws IOException {
+        // Test reading file with BOM
+        byte[] bom = {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+        byte[] content = "line1\nline2".getBytes(StandardCharsets.UTF_8);
+        byte[] fullContent = new byte[bom.length + content.length];
+        System.arraycopy(bom, 0, fullContent, 0, bom.length);
+        System.arraycopy(content, 0, fullContent, bom.length, content.length);
+
+        try (TextLineReader reader =
+                TextLineReader.create(new ByteArrayInputStream(fullContent), 
null, 0, null)) {
+            assertThat(reader.readLine()).isEqualTo("line1");
+            assertThat(reader.readLine()).isEqualTo("line2");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithSplitAtBeginning() throws IOException {
+        // Test reading with split at the beginning
+        String content = "line1\nline2\nline3\nline4";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
+                        null,
+                        0,
+                        10L)) {
+            assertThat(reader.readLine()).isEqualTo("line1");
+            assertThat(reader.readLine()).isEqualTo("line2");
+            // Should stop reading after reaching the split length
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithSplitInMiddle() throws IOException {
+        // Test reading with split in the middle
+        String content = "line1\nline2\nline3\nline4";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
+                        null,
+                        3,
+                        null)) {
+            // Should skip the first line and start from the second line
+            assertThat(reader.readLine()).isEqualTo("line2");
+            assertThat(reader.readLine()).isEqualTo("line3");
+            assertThat(reader.readLine()).isEqualTo("line4");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithEmptyFile() throws IOException {
+        // Test reading an empty file
+        String content = "";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        null,
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithSingleLine() throws IOException {
+        // Test reading a single line without line terminator
+        String content = "single line";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        null,
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isEqualTo("single line");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithSingleLineAndNewline() throws IOException {
+        // Test reading a single line with line terminator
+        String content = "single line\n";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        null,
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isEqualTo("single line");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithVeryLongLine() throws IOException {
+        // Test reading a very long line
+        StringBuilder longLine = new StringBuilder();
+        for (int i = 0; i < 10000; i++) {
+            longLine.append("a");
+        }
+        String content = longLine.toString() + "\nline2";
+        try (TextLineReader reader =
+                TextLineReader.create(
+                        new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+                        null,
+                        0,
+                        null)) {
+            assertThat(reader.readLine()).isEqualTo(longLine.toString());
+            assertThat(reader.readLine()).isEqualTo("line2");
+            assertThat(reader.readLine()).isNull();
+        }
+    }
+
+    @Test
+    public void testReadLineWithSeekableInputStream() throws IOException {
+        // Test reading with seekable input stream
+        String content = "line1\nline2\nline3";
+        createFile(content);
+
+        try (SeekableInputStream inputStream =
+                fileIO.newInputStream(new 
org.apache.paimon.fs.Path(testFile.toString()))) {
+            try (TextLineReader reader = TextLineReader.create(inputStream, 
null, 3, null)) {
+                // Should skip the first line and start from the second line
+                assertThat(reader.readLine()).isEqualTo("line2");
+                assertThat(reader.readLine()).isEqualTo("line3");
+                assertThat(reader.readLine()).isNull();
+            }
+        }
+    }
+
+    @Test
+    public void testReadLineWithNonSeekableInputStreamAndNonZeroOffset() 
throws IOException {
+        // Test that non-seekable input stream with non-zero offset throws 
exception
+        String content = "line1\nline2\nline3";
+        ByteArrayInputStream inputStream =
+                new 
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
+
+        assertThatThrownBy(() -> new StandardLineReader(inputStream, 1, null))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("Current position only supported for uncompressed 
files");
+    }
+
+    /** Helper method to create a test file with the given content. */
+    private void createFile(String content) throws IOException {
+        try (PositionOutputStream out =
+                fileIO.newOutputStream(new 
org.apache.paimon.fs.Path(testFile.toString()), false)) {
+            out.write(content.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
index 2f87831fd5..c4ca1bf0b5 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
@@ -204,7 +204,7 @@ public class TextFileReaderTest {
         public TestTextFileReader(
                 FileIO fileIO, Path filePath, RowType rowType, String 
recordDelimiter)
                 throws IOException {
-            super(fileIO, filePath, rowType, recordDelimiter);
+            super(fileIO, filePath, rowType, recordDelimiter, 0, null);
         }
 
         @Nullable

Reply via email to