This is an automated email from the ASF dual-hosted git repository.
jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f371835e36 [text] Text formats support split file to read by offset
and len (#6568)
f371835e36 is described below
commit f371835e361ae07608626b5ea2588dda88b4417f
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 10 12:53:25 2025 +0800
[text] Text formats support split file to read by offset and len (#6568)
---
.../apache/paimon/format/FormatReaderFactory.java | 8 +
.../main/java/org/apache/paimon/utils/IOUtils.java | 12 +
.../apache/paimon/format/csv/CsvFileFormat.java | 17 +-
.../apache/paimon/format/csv/CsvFileReader.java | 8 +-
.../apache/paimon/format/json/JsonFileFormat.java | 19 +-
.../apache/paimon/format/json/JsonFileReader.java | 12 +-
.../paimon/format/text/CustomLineReader.java | 3 +
.../paimon/format/text/StandardLineReader.java | 183 +++++++++++++-
.../apache/paimon/format/text/TextFileReader.java | 21 +-
.../apache/paimon/format/text/TextLineReader.java | 13 +-
.../paimon/format/text/StandardLineReaderTest.java | 279 +++++++++++++++++++++
.../paimon/format/text/TextFileReaderTest.java | 2 +-
12 files changed, 552 insertions(+), 25 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index 8e64cc63a4..fae5f6d8a5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -34,6 +34,14 @@ public interface FormatReaderFactory {
FileRecordReader<InternalRow> createReader(Context context) throws
IOException;
+ default FileRecordReader<InternalRow> createReader(Context context, long
offset, long length)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Format %s does not support create reader with offset
and length.",
+ getClass().getName()));
+ }
+
/** Context for creating reader. */
interface Context {
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
index 29352b4e81..e0e51053fd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java
@@ -101,6 +101,18 @@ public final class IOUtils {
return output.toByteArray();
}
+ public static int readNBytes(InputStream in, byte[] b, int off, int len)
throws IOException {
+ int n = 0;
+ while (n < len) {
+ int count = in.read(b, off + n, len - n);
+ if (count < 0) {
+ break;
+ }
+ n += count;
+ }
+ return n;
+ }
+
/**
* Reads len bytes in a loop.
*
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
index 5ee4483b70..5e7b669559 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -119,7 +119,22 @@ public class CsvFileFormat extends FileFormat {
context.filePath(),
dataSchemaRowType,
projectedRowType,
- options);
+ options,
+ 0,
+ null);
+ }
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context,
long offset, long length)
+ throws IOException {
+ return new CsvFileReader(
+ context.fileIO(),
+ context.filePath(),
+ dataSchemaRowType,
+ projectedRowType,
+ options,
+ offset,
+ length);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 1d3ceeafce..2c7477cc0d 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -24,6 +24,8 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
/** CSV file reader implementation. */
@@ -39,9 +41,11 @@ public class CsvFileReader extends TextFileReader {
Path filePath,
RowType rowReadType,
RowType projectedRowType,
- CsvOptions options)
+ CsvOptions options,
+ long offset,
+ @Nullable Long length)
throws IOException {
- super(fileIO, filePath, projectedRowType, options.lineDelimiter());
+ super(fileIO, filePath, projectedRowType, options.lineDelimiter(),
offset, length);
this.includeHeader = options.includeHeader();
this.csvParser =
new CsvParser(
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
index 1d6bb15930..c1892a944e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
@@ -25,8 +25,6 @@ import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.CloseShieldOutputStream;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordReader;
@@ -116,9 +114,20 @@ public class JsonFileFormat extends FileFormat {
@Override
public FileRecordReader<InternalRow> createReader(Context context)
throws IOException {
- FileIO fileIO = context.fileIO();
- Path filePath = context.filePath();
- return new JsonFileReader(fileIO, filePath, projectedRowType,
options);
+ return new JsonFileReader(
+ context.fileIO(), context.filePath(), projectedRowType,
options, 0, null);
+ }
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context,
long offset, long length)
+ throws IOException {
+ return new JsonFileReader(
+ context.fileIO(),
+ context.filePath(),
+ projectedRowType,
+ options,
+ offset,
+ length);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 99b433805c..b10dd8f689 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -39,6 +39,8 @@ import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
@@ -53,9 +55,15 @@ public class JsonFileReader extends TextFileReader {
private final JsonOptions options;
- public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType,
JsonOptions options)
+ public JsonFileReader(
+ FileIO fileIO,
+ Path filePath,
+ RowType rowType,
+ JsonOptions options,
+ long offset,
+ @Nullable Long length)
throws IOException {
- super(fileIO, filePath, rowType, options.getLineDelimiter());
+ super(fileIO, filePath, rowType, options.getLineDelimiter(), offset,
length);
this.options = options;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
index 9a71ca585d..1652798ed5 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
@@ -18,6 +18,8 @@
package org.apache.paimon.format.text;
+import javax.annotation.Nullable;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -36,6 +38,7 @@ public class CustomLineReader implements TextLineReader {
this.delimiter = delimiter;
}
+ @Nullable
@Override
public String readLine() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
index 4b5acb9b1e..49cfea4556 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
@@ -18,29 +18,196 @@
package org.apache.paimon.format.text;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
+import java.io.UnsupportedEncodingException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** A {@link TextLineReader} using {@link BufferedReader} to read by '\n'. */
public class StandardLineReader implements TextLineReader {
- private final BufferedReader bufferedReader;
+ private final InputStream in;
+ private final byte[] buffer;
+ private final @Nullable Long length;
+ private final ByteArrayOutputStream lineBuilder;
- public StandardLineReader(InputStream inputStream) {
- this.bufferedReader =
- new BufferedReader(new InputStreamReader(inputStream,
StandardCharsets.UTF_8));
+ private int bufferStart;
+ private int bufferEnd;
+ private int bufferPosition;
+ private boolean closed;
+
+ public StandardLineReader(InputStream in, long offset, @Nullable Long
length)
+ throws IOException {
+ this.in = in;
+ this.buffer = new byte[8192];
+ this.length = length;
+ this.lineBuilder = new ByteArrayOutputStream();
+ if (offset == 0) {
+ readAtBeginning();
+ } else {
+ readAtOffset(offset);
+ }
}
+ private void readAtBeginning() throws IOException {
+ fillBuffer();
+ // skipping UTF-8 BOM
+ if (bufferEnd >= 3
+ && buffer[0] == (byte) 0xEF
+ && (buffer[1] == (byte) 0xBB)
+ && (buffer[2] == (byte) 0xBF)) {
+ bufferStart = 3;
+ bufferPosition = 3;
+ }
+ }
+
+ private void readAtOffset(long offset) throws IOException {
+ if (!(in instanceof SeekableInputStream)) {
+ throw new IllegalStateException(
+ "Current position only supported for uncompressed files");
+ }
+ ((SeekableInputStream) in).seek(offset);
+ skipFirstLine();
+ }
+
+ private void skipFirstLine() throws IOException {
+ while (!closed) {
+ if (reachLengthLimit()) {
+ close();
+ return;
+ }
+
+ // fill buffer if necessary
+ if (bufferPosition >= bufferEnd) {
+ fillBuffer();
+ if (closed) {
+ return;
+ }
+ }
+
+ if (seekToStartOfLineTerminator()) {
+ seekPastLineTerminator();
+ return;
+ }
+ }
+ }
+
+ @Nullable
@Override
public String readLine() throws IOException {
- return bufferedReader.readLine();
+ lineBuilder.reset();
+
+ if (reachLengthLimit()) {
+ close();
+ return null;
+ }
+
+ if (bufferPosition >= bufferEnd) {
+ fillBuffer();
+ }
+
+ while (!closed) {
+ if (seekToStartOfLineTerminator()) {
+ copyToLineBuilder();
+ seekPastLineTerminator();
+ return buildLine();
+ }
+
+ checkArgument(bufferPosition == bufferEnd, "expected to be at the
end of the buffer");
+ copyToLineBuilder();
+ fillBuffer();
+ }
+ String line = buildLine();
+ if (line.isEmpty()) {
+ return null;
+ }
+ return line;
+ }
+
+ private boolean seekToStartOfLineTerminator() {
+ while (bufferPosition < bufferEnd) {
+ if (isEndOfLineCharacter(buffer[bufferPosition])) {
+ return true;
+ }
+ bufferPosition++;
+ }
+ return false;
+ }
+
+ private static boolean isEndOfLineCharacter(byte currentByte) {
+ return currentByte == '\n' || currentByte == '\r';
+ }
+
+ private void seekPastLineTerminator() throws IOException {
+ checkArgument(
+ isEndOfLineCharacter(buffer[bufferPosition]), "Stream is not
at a line terminator");
+
+ // skip carriage return if present
+ if (buffer[bufferPosition] == '\r') {
+ bufferPosition++;
+
+ // fill buffer if necessary
+ if (bufferPosition >= bufferEnd) {
+ fillBuffer();
+ if (closed) {
+ return;
+ }
+ }
+ }
+
+ // skip newline if present
+ if (buffer[bufferPosition] == '\n') {
+ bufferPosition++;
+ }
+ bufferStart = bufferPosition;
+ }
+
+ private void fillBuffer() throws IOException {
+ if (closed) {
+ return;
+ }
+ checkArgument(bufferPosition >= bufferEnd, "Buffer is not empty");
+ bufferStart = 0;
+ bufferPosition = 0;
+ bufferEnd = IOUtils.readNBytes(in, buffer, 0, buffer.length);
+ if (bufferEnd == 0) {
+ close();
+ }
+ }
+
+ private boolean reachLengthLimit() throws IOException {
+ if (length != null) {
+ if (!(in instanceof SeekableInputStream)) {
+ throw new IllegalStateException(
+ "Current position only supported for uncompressed
files");
+ }
+ int currentBufferSize = bufferEnd - bufferPosition;
+ long currentPosition = ((SeekableInputStream) in).getPos() -
currentBufferSize;
+ return currentPosition > length;
+ }
+ return false;
+ }
+
+ private void copyToLineBuilder() {
+ lineBuilder.write(buffer, bufferStart, bufferPosition - bufferStart);
+ }
+
+ private String buildLine() throws UnsupportedEncodingException {
+ return lineBuilder.toString(UTF_8.name());
}
@Override
public void close() throws IOException {
- bufferedReader.close();
+ closed = true;
+ in.close();
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
index 982ff998b6..d9cbd0ba0e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
@@ -30,6 +30,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
+import static
org.apache.paimon.format.text.HadoopCompressionUtils.createDecompressedInputStream;
+
/** Base class for text-based file readers that provides common functionality.
*/
public abstract class TextFileReader implements FileRecordReader<InternalRow> {
@@ -37,18 +39,25 @@ public abstract class TextFileReader implements
FileRecordReader<InternalRow> {
private final TextRecordIterator reader;
protected final RowType rowType;
+ protected final long offset;
protected final TextLineReader lineReader;
protected boolean readerClosed = false;
- protected TextFileReader(FileIO fileIO, Path filePath, RowType rowType,
String delimiter)
+ protected TextFileReader(
+ FileIO fileIO,
+ Path filePath,
+ RowType rowType,
+ String delimiter,
+ long offset,
+ @Nullable Long length)
throws IOException {
this.filePath = filePath;
this.rowType = rowType;
+ this.offset = offset;
InputStream decompressedStream =
- HadoopCompressionUtils.createDecompressedInputStream(
- fileIO.newInputStream(filePath), filePath);
- this.lineReader = TextLineReader.create(decompressedStream, delimiter);
+ createDecompressedInputStream(fileIO.newInputStream(filePath),
filePath);
+ this.lineReader = TextLineReader.create(decompressedStream, delimiter,
offset, length);
this.reader = new TextRecordIterator();
}
@@ -131,6 +140,10 @@ public abstract class TextFileReader implements
FileRecordReader<InternalRow> {
@Override
public long returnedPosition() {
+ if (offset > 0) {
+ throw new UnsupportedOperationException(
+ "Cannot return position with reading offset.");
+ }
return Math.max(0, currentPosition - 1);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
index 15c671adc1..50fb00c66b 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
@@ -18,6 +18,8 @@
package org.apache.paimon.format.text;
+import javax.annotation.Nullable;
+
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -26,16 +28,23 @@ import java.nio.charset.StandardCharsets;
/** Reader to read lines. */
public interface TextLineReader extends Closeable {
+ @Nullable
String readLine() throws IOException;
- static TextLineReader create(InputStream inputStream, String delimiter) {
+ static TextLineReader create(
+ InputStream inputStream, String delimiter, long offset, @Nullable
Long length)
+ throws IOException {
byte[] delimiterBytes =
delimiter != null && !"\n".equals(delimiter)
? delimiter.getBytes(StandardCharsets.UTF_8)
: null;
if (delimiterBytes == null || delimiterBytes.length == 0) {
- return new StandardLineReader(inputStream);
+ return new StandardLineReader(inputStream, offset, length);
} else {
+ if (offset != 0 || length != null) {
+ throw new UnsupportedOperationException(
+ "Custom line text file does not support offset and
length.");
+ }
return new CustomLineReader(inputStream, delimiterBytes);
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
new file mode 100644
index 0000000000..f8b94f7cc6
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link StandardLineReader}. */
+public class StandardLineReaderTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private LocalFileIO fileIO;
+ private java.nio.file.Path testFile;
+
+ @BeforeEach
+ public void setUp() {
+ fileIO = new LocalFileIO();
+ testFile = tempDir.resolve("test.txt");
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ if (java.nio.file.Files.exists(testFile)) {
+ java.nio.file.Files.delete(testFile);
+ }
+ }
+
+ @Test
+ public void testReadLineWithLineFeedDelimiter() throws IOException {
+ // Test basic functionality with \n delimiter
+ String content = "line1\nline2\nline3";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ "\n",
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isEqualTo("line1");
+ assertThat(reader.readLine()).isEqualTo("line2");
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithCarriageReturnDelimiter() throws IOException {
+ // Test basic functionality with \r delimiter
+ String content = "line1\rline2\rline3";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ "\r",
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isEqualTo("line1");
+ assertThat(reader.readLine()).isEqualTo("line2");
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithCRLF() throws IOException {
+ // Test basic functionality with \r\n delimiter
+ String content = "line1\r\nline2\r\nline3";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ "\r\n",
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isEqualTo("line1");
+ assertThat(reader.readLine()).isEqualTo("line2");
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithEmptyLines() throws IOException {
+ // Test reading empty lines
+ String content = "line1\n\nline3\n";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ null,
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isEqualTo("line1");
+ assertThat(reader.readLine()).isEqualTo("");
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithBOM() throws IOException {
+ // Test reading file with BOM
+ byte[] bom = {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+ byte[] content = "line1\nline2".getBytes(StandardCharsets.UTF_8);
+ byte[] fullContent = new byte[bom.length + content.length];
+ System.arraycopy(bom, 0, fullContent, 0, bom.length);
+ System.arraycopy(content, 0, fullContent, bom.length, content.length);
+
+ try (TextLineReader reader =
+ TextLineReader.create(new ByteArrayInputStream(fullContent),
null, 0, null)) {
+ assertThat(reader.readLine()).isEqualTo("line1");
+ assertThat(reader.readLine()).isEqualTo("line2");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithSplitAtBeginning() throws IOException {
+ // Test reading with split at the beginning
+ String content = "line1\nline2\nline3\nline4";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
+ null,
+ 0,
+ 10L)) {
+ assertThat(reader.readLine()).isEqualTo("line1");
+ assertThat(reader.readLine()).isEqualTo("line2");
+ // Should stop reading after reaching the split length
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithSplitInMiddle() throws IOException {
+ // Test reading with split in the middle
+ String content = "line1\nline2\nline3\nline4";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
+ null,
+ 3,
+ null)) {
+ // Should skip the first line and start from the second line
+ assertThat(reader.readLine()).isEqualTo("line2");
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isEqualTo("line4");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithEmptyFile() throws IOException {
+ // Test reading an empty file
+ String content = "";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ null,
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithSingleLine() throws IOException {
+ // Test reading a single line without line terminator
+ String content = "single line";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ null,
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isEqualTo("single line");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithSingleLineAndNewline() throws IOException {
+ // Test reading a single line with line terminator
+ String content = "single line\n";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ null,
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isEqualTo("single line");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithVeryLongLine() throws IOException {
+ // Test reading a very long line
+ StringBuilder longLine = new StringBuilder();
+ for (int i = 0; i < 10000; i++) {
+ longLine.append("a");
+ }
+ String content = longLine.toString() + "\nline2";
+ try (TextLineReader reader =
+ TextLineReader.create(
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
+ null,
+ 0,
+ null)) {
+ assertThat(reader.readLine()).isEqualTo(longLine.toString());
+ assertThat(reader.readLine()).isEqualTo("line2");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithSeekableInputStream() throws IOException {
+ // Test reading with seekable input stream
+ String content = "line1\nline2\nline3";
+ createFile(content);
+
+ try (SeekableInputStream inputStream =
+ fileIO.newInputStream(new
org.apache.paimon.fs.Path(testFile.toString()))) {
+ try (TextLineReader reader = TextLineReader.create(inputStream,
null, 3, null)) {
+ // Should skip the first line and start from the second line
+ assertThat(reader.readLine()).isEqualTo("line2");
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+ }
+
+ @Test
+ public void testReadLineWithNonSeekableInputStreamAndNonZeroOffset()
throws IOException {
+ // Test that non-seekable input stream with non-zero offset throws
exception
+ String content = "line1\nline2\nline3";
+ ByteArrayInputStream inputStream =
+ new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
+
+ assertThatThrownBy(() -> new StandardLineReader(inputStream, 1, null))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Current position only supported for uncompressed
files");
+ }
+
+ /** Helper method to create a test file with the given content. */
+ private void createFile(String content) throws IOException {
+ try (PositionOutputStream out =
+ fileIO.newOutputStream(new
org.apache.paimon.fs.Path(testFile.toString()), false)) {
+ out.write(content.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
index 2f87831fd5..c4ca1bf0b5 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
@@ -204,7 +204,7 @@ public class TextFileReaderTest {
public TestTextFileReader(
FileIO fileIO, Path filePath, RowType rowType, String
recordDelimiter)
throws IOException {
- super(fileIO, filePath, rowType, recordDelimiter);
+ super(fileIO, filePath, rowType, recordDelimiter, 0, null);
}
@Nullable