This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8d1404ca30 [core] Fix offset bug in StandardLineReader (#6577)
8d1404ca30 is described below
commit 8d1404ca302775cbe4f4ec5faffb07aa628bd763
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 10 16:51:25 2025 +0800
[core] Fix offset bug in StandardLineReader (#6577)
---
.../paimon/format/text/StandardLineReader.java | 4 +-
.../paimon/format/text/StandardLineReaderTest.java | 202 ++++++++-------------
2 files changed, 75 insertions(+), 131 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
index eb4f020e43..a2f0684667 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
@@ -36,6 +36,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class StandardLineReader implements TextLineReader {
private final InputStream in;
+ private final long offset;
private final byte[] buffer;
private final @Nullable Long length;
private final ByteArrayOutputStream lineBuilder;
@@ -48,6 +49,7 @@ public class StandardLineReader implements TextLineReader {
public StandardLineReader(InputStream in, long offset, @Nullable Long
length)
throws IOException {
this.in = in;
+ this.offset = offset;
this.buffer = new byte[8192];
this.length = length;
this.lineBuilder = new ByteArrayOutputStream();
@@ -193,7 +195,7 @@ public class StandardLineReader implements TextLineReader {
}
int currentBufferSize = bufferEnd - bufferPosition;
long currentPosition = ((SeekableInputStream) in).getPos() -
currentBufferSize;
- return currentPosition > length;
+ return currentPosition > length + offset;
}
return false;
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
index 30ef14688d..463ae132e3 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java
@@ -19,44 +19,24 @@
package org.apache.paimon.format.text;
import org.apache.paimon.fs.ByteArraySeekableStream;
-import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link StandardLineReader}. */
public class StandardLineReaderTest {
- @TempDir java.nio.file.Path tempDir;
-
- private LocalFileIO fileIO;
- private java.nio.file.Path testFile;
-
- @BeforeEach
- public void setUp() {
- fileIO = new LocalFileIO();
- testFile = tempDir.resolve("test.txt");
- }
-
- @AfterEach
- public void tearDown() throws IOException {
- if (java.nio.file.Files.exists(testFile)) {
- java.nio.file.Files.delete(testFile);
- }
- }
-
/**
* After the stream is closed, we cannot use getPos, so we need to ensure
that it is manually
* closed.
@@ -85,15 +65,10 @@ public class StandardLineReaderTest {
}
@Test
- public void testReadLineWithLineFeedDelimiter() throws IOException {
+ public void testReadLineNoSplitting() throws IOException {
// Test basic functionality with \n delimiter
String content = "line1\nline2\nline3";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- "\n",
- 0,
- null)) {
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isEqualTo("line1");
assertThat(reader.readLine()).isEqualTo("line2");
assertThat(reader.readLine()).isEqualTo("line3");
@@ -105,12 +80,7 @@ public class StandardLineReaderTest {
public void testReadLineWithCarriageReturnDelimiter() throws IOException {
// Test basic functionality with \r delimiter
String content = "line1\rline2\rline3";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- "\r",
- 0,
- null)) {
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isEqualTo("line1");
assertThat(reader.readLine()).isEqualTo("line2");
assertThat(reader.readLine()).isEqualTo("line3");
@@ -122,12 +92,7 @@ public class StandardLineReaderTest {
public void testReadLineWithCRLF() throws IOException {
// Test basic functionality with \r\n delimiter
String content = "line1\r\nline2\r\nline3";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- "\r\n",
- 0,
- null)) {
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isEqualTo("line1");
assertThat(reader.readLine()).isEqualTo("line2");
assertThat(reader.readLine()).isEqualTo("line3");
@@ -139,12 +104,7 @@ public class StandardLineReaderTest {
public void testReadLineWithEmptyLines() throws IOException {
// Test reading empty lines
String content = "line1\n\nline3\n";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- null,
- 0,
- null)) {
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isEqualTo("line1");
assertThat(reader.readLine()).isEqualTo("");
assertThat(reader.readLine()).isEqualTo("line3");
@@ -161,8 +121,7 @@ public class StandardLineReaderTest {
System.arraycopy(bom, 0, fullContent, 0, bom.length);
System.arraycopy(content, 0, fullContent, bom.length, content.length);
- try (TextLineReader reader =
- TextLineReader.create(new ByteArrayInputStream(fullContent),
null, 0, null)) {
+ try (TextLineReader reader = create(fullContent, 0, null)) {
assertThat(reader.readLine()).isEqualTo("line1");
assertThat(reader.readLine()).isEqualTo("line2");
assertThat(reader.readLine()).isNull();
@@ -171,63 +130,83 @@ public class StandardLineReaderTest {
@Test
public void testReadLineWithSplitAtBeginning() throws IOException {
- // Test reading with split at the beginning
String content = "line1\nline2\nline3\nline4";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
- null,
- 0,
- 10L)) {
+ try (TextLineReader reader = create(content, 0, 10L)) {
assertThat(reader.readLine()).isEqualTo("line1");
assertThat(reader.readLine()).isEqualTo("line2");
- // Should stop reading after reaching the split length
+ assertThat(reader.readLine()).isNull();
+ }
+ try (TextLineReader reader = create(content, 10, 6L)) {
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isNull();
+ }
+ try (TextLineReader reader = create(content, 16, 7L)) {
+ assertThat(reader.readLine()).isEqualTo("line4");
assertThat(reader.readLine()).isNull();
}
}
@Test
- public void testReadLineWithSplitInMiddle() throws IOException {
- // Test reading with split in the middle
+ public void testReadLineWithSplitInEndTwoLines() throws IOException {
String content = "line1\nline2\nline3\nline4";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArraySeekableStream(content.getBytes(StandardCharsets.UTF_8)),
- null,
- 3,
- null)) {
- // Should skip the first line and start from the second line
+ try (TextLineReader reader = create(content, 10, 13L)) {
+ assertThat(reader.readLine()).isEqualTo("line3");
+ assertThat(reader.readLine()).isEqualTo("line4");
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithSplitInEndEmpty() throws IOException {
+ String content = "line1\nline2\nline3\nline4";
+ try (TextLineReader reader = create(content, 0, 18L)) {
+ assertThat(reader.readLine()).isEqualTo("line1");
assertThat(reader.readLine()).isEqualTo("line2");
assertThat(reader.readLine()).isEqualTo("line3");
assertThat(reader.readLine()).isEqualTo("line4");
assertThat(reader.readLine()).isNull();
}
+ try (TextLineReader reader = create(content, 18, 5L)) {
+ assertThat(reader.readLine()).isNull();
+ }
+ }
+
+ @Test
+ public void testReadLineWithRandomSplit() throws IOException {
+ String content = "line1\nline2\nline3\nline4";
+ int splitSize = ThreadLocalRandom.current().nextInt(content.length())
+ 1;
+ int remainLen = content.length();
+ List<String> lines = new ArrayList<>();
+ while (remainLen > 0) {
+ int len = Math.min(remainLen, splitSize);
+ try (TextLineReader reader =
+ create(content, content.length() - remainLen, (long) len))
{
+ while (true) {
+ String line = reader.readLine();
+ if (line != null) {
+ lines.add(line);
+ } else {
+ break;
+ }
+ }
+ }
+ remainLen -= len;
+ }
+ assertThat(lines).containsExactly("line1", "line2", "line3", "line4");
}
@Test
public void testReadLineWithEmptyFile() throws IOException {
- // Test reading an empty file
String content = "";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- null,
- 0,
- null)) {
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isNull();
}
}
@Test
public void testReadLineWithSingleLine() throws IOException {
- // Test reading a single line without line terminator
String content = "single line";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- null,
- 0,
- null)) {
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isEqualTo("single line");
assertThat(reader.readLine()).isNull();
}
@@ -235,14 +214,8 @@ public class StandardLineReaderTest {
@Test
public void testReadLineWithSingleLineAndNewline() throws IOException {
- // Test reading a single line with line terminator
String content = "single line\n";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- null,
- 0,
- null)) {
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isEqualTo("single line");
assertThat(reader.readLine()).isNull();
}
@@ -255,53 +228,22 @@ public class StandardLineReaderTest {
for (int i = 0; i < 10000; i++) {
longLine.append("a");
}
- String content = longLine.toString() + "\nline2";
- try (TextLineReader reader =
- TextLineReader.create(
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),
- null,
- 0,
- null)) {
+ String content = longLine + "\nline2";
+ try (TextLineReader reader = create(content, 0, null)) {
assertThat(reader.readLine()).isEqualTo(longLine.toString());
assertThat(reader.readLine()).isEqualTo("line2");
assertThat(reader.readLine()).isNull();
}
}
- @Test
- public void testReadLineWithSeekableInputStream() throws IOException {
- // Test reading with seekable input stream
- String content = "line1\nline2\nline3";
- createFile(content);
-
- try (SeekableInputStream inputStream =
- fileIO.newInputStream(new
org.apache.paimon.fs.Path(testFile.toString()))) {
- try (TextLineReader reader = TextLineReader.create(inputStream,
null, 3, null)) {
- // Should skip the first line and start from the second line
- assertThat(reader.readLine()).isEqualTo("line2");
- assertThat(reader.readLine()).isEqualTo("line3");
- assertThat(reader.readLine()).isNull();
- }
- }
- }
-
- @Test
- public void testReadLineWithNonSeekableInputStreamAndNonZeroOffset()
throws IOException {
- // Test that non-seekable input stream with non-zero offset throws
exception
- String content = "line1\nline2\nline3";
- ByteArrayInputStream inputStream =
- new
ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
-
- assertThatThrownBy(() -> new StandardLineReader(inputStream, 1, null))
- .isInstanceOf(IllegalStateException.class)
- .hasMessage("Current position only supported for uncompressed
files");
+ private TextLineReader create(String content, long offset, @Nullable Long
length)
+ throws IOException {
+ return create(content.getBytes(StandardCharsets.UTF_8), offset,
length);
}
- /** Helper method to create a test file with the given content. */
- private void createFile(String content) throws IOException {
- try (PositionOutputStream out =
- fileIO.newOutputStream(new
org.apache.paimon.fs.Path(testFile.toString()), false)) {
- out.write(content.getBytes(StandardCharsets.UTF_8));
- }
+ private TextLineReader create(byte[] content, long offset, @Nullable Long
length)
+ throws IOException {
+ ByteArraySeekableStream input = new ByteArraySeekableStream(content);
+ return TextLineReader.create(input, null, offset, length);
}
}