This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dfb7cf11f0 [text] Introduce TextLineReader to abstract implementations
dfb7cf11f0 is described below
commit dfb7cf11f06c668968732b8c04e647a330703444
Author: JingsongLi <[email protected]>
AuthorDate: Wed Nov 5 23:05:19 2025 +0800
[text] Introduce TextLineReader to abstract implementations
---
.../apache/paimon/format/csv/CsvFileReader.java | 4 +-
.../apache/paimon/format/csv/CsvFormatWriter.java | 4 +-
.../apache/paimon/format/json/JsonFileReader.java | 4 +-
.../paimon/format/json/JsonFormatWriter.java | 4 +-
.../paimon/format/text/CustomLineReader.java | 88 ++++++++++++++++++++++
.../paimon/format/text/StandardLineReader.java | 47 ++++++++++++
...BaseTextFileReader.java => TextFileReader.java} | 81 ++------------------
...BaseTextFileWriter.java => TextFileWriter.java} | 5 +-
.../apache/paimon/format/text/TextLineReader.java | 42 +++++++++++
...FileReaderTest.java => TextFileReaderTest.java} | 6 +-
10 files changed, 198 insertions(+), 87 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 0b7e305740..1d3ceeafce 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -19,7 +19,7 @@
package org.apache.paimon.format.csv;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileReader;
+import org.apache.paimon.format.text.TextFileReader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
@@ -27,7 +27,7 @@ import org.apache.paimon.types.RowType;
import java.io.IOException;
/** CSV file reader implementation. */
-public class CsvFileReader extends BaseTextFileReader {
+public class CsvFileReader extends TextFileReader {
private final boolean includeHeader;
private final CsvParser csvParser;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index c5c5885d47..81d81b874f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.csv;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileWriter;
+import org.apache.paimon.format.text.TextFileWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
@@ -33,7 +33,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** CSV format writer implementation. */
-public class CsvFormatWriter extends BaseTextFileWriter {
+public class CsvFormatWriter extends TextFileWriter {
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
// Performance optimization: Cache frequently used cast executors
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 313465280c..99b433805c 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -25,7 +25,7 @@ import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileReader;
+import org.apache.paimon.format.text.TextFileReader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.ArrayType;
@@ -47,7 +47,7 @@ import java.util.List;
import java.util.Map;
/** JSON file reader. */
-public class JsonFileReader extends BaseTextFileReader {
+public class JsonFileReader extends TextFileReader {
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index a145e95419..87580b5c87 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileWriter;
+import org.apache.paimon.format.text.TextFileWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
@@ -45,7 +45,7 @@ import java.util.List;
import java.util.Map;
/** Json format writer implementation. */
-public class JsonFormatWriter extends BaseTextFileWriter {
+public class JsonFormatWriter extends TextFileWriter {
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
new file mode 100644
index 0000000000..9a71ca585d
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** A {@link TextLineReader} to read lines by custom delimiter. */
+public class CustomLineReader implements TextLineReader {
+
+ private static final int MAX_LINE_LENGTH = Integer.MAX_VALUE;
+
+ private final InputStream inputStream;
+ private final byte[] delimiter;
+
+ public CustomLineReader(InputStream inputStream, byte[] delimiter) {
+ this.inputStream = inputStream;
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
+ int matchIndex = 0;
+
+ while (true) {
+ int b = inputStream.read();
+ if (b == -1) {
+ // End of stream: flush any partially matched delimiter bytes
to output
+ if (matchIndex > 0) {
+ out.write(delimiter, 0, matchIndex);
+ }
+ // Return null if nothing was read, otherwise return the
accumulated line
+ return out.size() == 0 ? null :
out.toString(StandardCharsets.UTF_8.name());
+ }
+
+ // Guard against extremely long lines that could cause memory
issues
+ if (MAX_LINE_LENGTH - matchIndex < out.size()) {
+ throw new IOException("Line exceeds maximum length: " +
MAX_LINE_LENGTH);
+ }
+
+ byte current = (byte) b;
+ if (current == delimiter[matchIndex]) {
+ // Current byte matches the next expected delimiter byte
+ matchIndex++;
+ if (matchIndex == delimiter.length) {
+ // Complete delimiter found, return the line without the
delimiter
+ return out.toString(StandardCharsets.UTF_8.name());
+ }
+ } else if (matchIndex > 0) {
+ // Mismatch: handle partial matches
+ out.write(delimiter, 0, matchIndex);
+ if (current == delimiter[0]) {
+ matchIndex = 1;
+ } else {
+ out.write(current);
+ matchIndex = 0;
+ }
+ } else {
+ // just add the current byte to output
+ out.write(current);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
new file mode 100644
index 0000000000..2c48c7149b
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/** A {@link TextLineReader} using {@link BufferedReader} to read by '\n'. */
+public class StandardLineReader implements TextLineReader {
+
+ private final BufferedReader bufferedReader;
+
+ public StandardLineReader(InputStream inputStream) {
+ this.bufferedReader =
+ new BufferedReader(new InputStreamReader(inputStream,
StandardCharsets.UTF_8));
+ ;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ return bufferedReader.readLine();
+ }
+
+ @Override
+ public void close() throws IOException {
+ bufferedReader.close();
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
similarity index 59%
rename from
paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
rename to
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
index 22f187ee12..982ff998b6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
@@ -27,43 +27,28 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
/** Base class for text-based file readers that provides common functionality.
*/
-public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow> {
-
- private static final int MAX_LINE_LENGTH = Integer.MAX_VALUE;
+public abstract class TextFileReader implements FileRecordReader<InternalRow> {
private final Path filePath;
- private final InputStream decompressedStream;
private final TextRecordIterator reader;
protected final RowType rowType;
- protected final BufferedReader bufferedReader;
- protected final byte[] recordDelimiterBytes;
+ protected final TextLineReader lineReader;
protected boolean readerClosed = false;
- protected BaseTextFileReader(
- FileIO fileIO, Path filePath, RowType rowType, String
recordDelimiter)
+ protected TextFileReader(FileIO fileIO, Path filePath, RowType rowType,
String delimiter)
throws IOException {
this.filePath = filePath;
this.rowType = rowType;
- this.recordDelimiterBytes =
- recordDelimiter != null && !"\n".equals(recordDelimiter)
- ? recordDelimiter.getBytes(StandardCharsets.UTF_8)
- : null;
- this.decompressedStream =
+ InputStream decompressedStream =
HadoopCompressionUtils.createDecompressedInputStream(
fileIO.newInputStream(filePath), filePath);
- this.bufferedReader =
- new BufferedReader(
- new InputStreamReader(this.decompressedStream,
StandardCharsets.UTF_8));
+ this.lineReader = TextLineReader.create(decompressedStream, delimiter);
this.reader = new TextRecordIterator();
}
@@ -101,13 +86,8 @@ public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow
@Override
public void close() throws IOException {
if (!readerClosed) {
- // Close the buffered reader first
- if (bufferedReader != null) {
- bufferedReader.close();
- }
- // Explicitly close the decompressed stream to prevent resource
leaks
- if (decompressedStream != null) {
- decompressedStream.close();
+ if (lineReader != null) {
+ lineReader.close();
}
readerClosed = true;
}
@@ -175,51 +155,6 @@ public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow
* @throws IOException if an I/O error occurs or line exceeds maximum
length
*/
protected String readLine() throws IOException {
- // Fast path: use BufferedReader for standard delimiters
- if (recordDelimiterBytes == null || recordDelimiterBytes.length == 0) {
- return bufferedReader.readLine();
- }
-
- ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
- int matchIndex = 0;
-
- while (true) {
- int b = decompressedStream.read();
- if (b == -1) {
- // End of stream: flush any partially matched delimiter bytes
to output
- if (matchIndex > 0) {
- out.write(recordDelimiterBytes, 0, matchIndex);
- }
- // Return null if nothing was read, otherwise return the
accumulated line
- return out.size() == 0 ? null :
out.toString(StandardCharsets.UTF_8.name());
- }
-
- // Guard against extremely long lines that could cause memory
issues
- if (MAX_LINE_LENGTH - matchIndex < out.size()) {
- throw new IOException("Line exceeds maximum length: " +
MAX_LINE_LENGTH);
- }
-
- byte current = (byte) b;
- if (current == recordDelimiterBytes[matchIndex]) {
- // Current byte matches the next expected delimiter byte
- matchIndex++;
- if (matchIndex == recordDelimiterBytes.length) {
- // Complete delimiter found, return the line without the
delimiter
- return out.toString(StandardCharsets.UTF_8.name());
- }
- } else if (matchIndex > 0) {
- // Mismatch: handle partial matches
- out.write(recordDelimiterBytes, 0, matchIndex);
- if (current == recordDelimiterBytes[0]) {
- matchIndex = 1;
- } else {
- out.write(current);
- matchIndex = 0;
- }
- } else {
- // just add the current byte to output
- out.write(current);
- }
- }
+ return lineReader.readLine();
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
similarity index 94%
rename from
paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileWriter.java
rename to
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
index e36acb318a..8cf56da017 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
@@ -31,14 +31,13 @@ import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
/** Base class for text-based format writers that provides common
functionality. */
-public abstract class BaseTextFileWriter implements FormatWriter {
+public abstract class TextFileWriter implements FormatWriter {
protected final PositionOutputStream outputStream;
protected final BufferedWriter writer;
protected final RowType rowType;
- protected BaseTextFileWriter(
- PositionOutputStream outputStream, RowType rowType, String
compression)
+ protected TextFileWriter(PositionOutputStream outputStream, RowType
rowType, String compression)
throws IOException {
this.outputStream = outputStream;
OutputStream compressedStream =
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
new file mode 100644
index 0000000000..15c671adc1
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** Reader to read lines. */
+public interface TextLineReader extends Closeable {
+
+ String readLine() throws IOException;
+
+ static TextLineReader create(InputStream inputStream, String delimiter) {
+ byte[] delimiterBytes =
+ delimiter != null && !"\n".equals(delimiter)
+ ? delimiter.getBytes(StandardCharsets.UTF_8)
+ : null;
+ if (delimiterBytes == null || delimiterBytes.length == 0) {
+ return new StandardLineReader(inputStream);
+ } else {
+ return new CustomLineReader(inputStream, delimiterBytes);
+ }
+ }
+}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
similarity index 97%
rename from
paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
rename to
paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
index efb6cb1e4d..2f87831fd5 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
@@ -40,8 +40,8 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link BaseTextFileReader}. */
-public class BaseTextFileReaderTest {
+/** Test for {@link TextFileReader}. */
+public class TextFileReaderTest {
@TempDir java.nio.file.Path tempDir;
@@ -199,7 +199,7 @@ public class BaseTextFileReaderTest {
}
/** Concrete implementation of BaseTextFileReader for testing. */
- private static class TestTextFileReader extends BaseTextFileReader {
+ private static class TestTextFileReader extends TextFileReader {
public TestTextFileReader(
FileIO fileIO, Path filePath, RowType rowType, String
recordDelimiter)