This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dfb7cf11f0 [text] Introduce TextLineReader to abstract implementations
dfb7cf11f0 is described below

commit dfb7cf11f06c668968732b8c04e647a330703444
Author: JingsongLi <[email protected]>
AuthorDate: Wed Nov 5 23:05:19 2025 +0800

    [text] Introduce TextLineReader to abstract implementations
---
 .../apache/paimon/format/csv/CsvFileReader.java    |  4 +-
 .../apache/paimon/format/csv/CsvFormatWriter.java  |  4 +-
 .../apache/paimon/format/json/JsonFileReader.java  |  4 +-
 .../paimon/format/json/JsonFormatWriter.java       |  4 +-
 .../paimon/format/text/CustomLineReader.java       | 88 ++++++++++++++++++++++
 .../paimon/format/text/StandardLineReader.java     | 47 ++++++++++++
 ...BaseTextFileReader.java => TextFileReader.java} | 81 ++------------------
 ...BaseTextFileWriter.java => TextFileWriter.java} |  5 +-
 .../apache/paimon/format/text/TextLineReader.java  | 42 +++++++++++
 ...FileReaderTest.java => TextFileReaderTest.java} |  6 +-
 10 files changed, 198 insertions(+), 87 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 0b7e305740..1d3ceeafce 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.format.csv;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileReader;
+import org.apache.paimon.format.text.TextFileReader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.RowType;
@@ -27,7 +27,7 @@ import org.apache.paimon.types.RowType;
 import java.io.IOException;
 
 /** CSV file reader implementation. */
-public class CsvFileReader extends BaseTextFileReader {
+public class CsvFileReader extends TextFileReader {
 
     private final boolean includeHeader;
     private final CsvParser csvParser;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index c5c5885d47..81d81b874f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.csv;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileWriter;
+import org.apache.paimon.format.text.TextFileWriter;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
@@ -33,7 +33,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** CSV format writer implementation. */
-public class CsvFormatWriter extends BaseTextFileWriter {
+public class CsvFormatWriter extends TextFileWriter {
 
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
     // Performance optimization: Cache frequently used cast executors
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 313465280c..99b433805c 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -25,7 +25,7 @@ import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileReader;
+import org.apache.paimon.format.text.TextFileReader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.ArrayType;
@@ -47,7 +47,7 @@ import java.util.List;
 import java.util.Map;
 
 /** JSON file reader. */
-public class JsonFileReader extends BaseTextFileReader {
+public class JsonFileReader extends TextFileReader {
 
     private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index a145e95419..87580b5c87 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.BaseTextFileWriter;
+import org.apache.paimon.format.text.TextFileWriter;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
@@ -45,7 +45,7 @@ import java.util.List;
 import java.util.Map;
 
 /** Json format writer implementation. */
-public class JsonFormatWriter extends BaseTextFileWriter {
+public class JsonFormatWriter extends TextFileWriter {
 
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
new file mode 100644
index 0000000000..9a71ca585d
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/CustomLineReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** A {@link TextLineReader} to read lines by custom delimiter. */
+public class CustomLineReader implements TextLineReader {
+
+    private static final int MAX_LINE_LENGTH = Integer.MAX_VALUE;
+
+    private final InputStream inputStream;
+    private final byte[] delimiter;
+
+    public CustomLineReader(InputStream inputStream, byte[] delimiter) {
+        this.inputStream = inputStream;
+        this.delimiter = delimiter;
+    }
+
+    @Override
+    public String readLine() throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
+        int matchIndex = 0;
+
+        while (true) {
+            int b = inputStream.read();
+            if (b == -1) {
+                // End of stream: flush any partially matched delimiter bytes 
to output
+                if (matchIndex > 0) {
+                    out.write(delimiter, 0, matchIndex);
+                }
+                // Return null if nothing was read, otherwise return the 
accumulated line
+                return out.size() == 0 ? null : 
out.toString(StandardCharsets.UTF_8.name());
+            }
+
+            // Guard against extremely long lines that could cause memory 
issues
+            if (MAX_LINE_LENGTH - matchIndex < out.size()) {
+                throw new IOException("Line exceeds maximum length: " + 
MAX_LINE_LENGTH);
+            }
+
+            byte current = (byte) b;
+            if (current == delimiter[matchIndex]) {
+                // Current byte matches the next expected delimiter byte
+                matchIndex++;
+                if (matchIndex == delimiter.length) {
+                    // Complete delimiter found, return the line without the 
delimiter
+                    return out.toString(StandardCharsets.UTF_8.name());
+                }
+            } else if (matchIndex > 0) {
+                // Mismatch: handle partial matches
+                out.write(delimiter, 0, matchIndex);
+                if (current == delimiter[0]) {
+                    matchIndex = 1;
+                } else {
+                    out.write(current);
+                    matchIndex = 0;
+                }
+            } else {
+                // just add the current byte to output
+                out.write(current);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        inputStream.close();
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
new file mode 100644
index 0000000000..2c48c7149b
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/** A {@link TextLineReader} using {@link BufferedReader} to read by '\n'. */
+public class StandardLineReader implements TextLineReader {
+
+    private final BufferedReader bufferedReader;
+
+    public StandardLineReader(InputStream inputStream) {
+        this.bufferedReader =
+                new BufferedReader(new InputStreamReader(inputStream, 
StandardCharsets.UTF_8));
+        ;
+    }
+
+    @Override
+    public String readLine() throws IOException {
+        return bufferedReader.readLine();
+    }
+
+    @Override
+    public void close() throws IOException {
+        bufferedReader.close();
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
 b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
similarity index 59%
rename from 
paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
rename to 
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
index 22f187ee12..982ff998b6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
@@ -27,43 +27,28 @@ import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
 
 /** Base class for text-based file readers that provides common functionality. 
*/
-public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow> {
-
-    private static final int MAX_LINE_LENGTH = Integer.MAX_VALUE;
+public abstract class TextFileReader implements FileRecordReader<InternalRow> {
 
     private final Path filePath;
-    private final InputStream decompressedStream;
     private final TextRecordIterator reader;
 
     protected final RowType rowType;
-    protected final BufferedReader bufferedReader;
-    protected final byte[] recordDelimiterBytes;
+    protected final TextLineReader lineReader;
 
     protected boolean readerClosed = false;
 
-    protected BaseTextFileReader(
-            FileIO fileIO, Path filePath, RowType rowType, String 
recordDelimiter)
+    protected TextFileReader(FileIO fileIO, Path filePath, RowType rowType, 
String delimiter)
             throws IOException {
         this.filePath = filePath;
         this.rowType = rowType;
-        this.recordDelimiterBytes =
-                recordDelimiter != null && !"\n".equals(recordDelimiter)
-                        ? recordDelimiter.getBytes(StandardCharsets.UTF_8)
-                        : null;
-        this.decompressedStream =
+        InputStream decompressedStream =
                 HadoopCompressionUtils.createDecompressedInputStream(
                         fileIO.newInputStream(filePath), filePath);
-        this.bufferedReader =
-                new BufferedReader(
-                        new InputStreamReader(this.decompressedStream, 
StandardCharsets.UTF_8));
+        this.lineReader = TextLineReader.create(decompressedStream, delimiter);
         this.reader = new TextRecordIterator();
     }
 
@@ -101,13 +86,8 @@ public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow
     @Override
     public void close() throws IOException {
         if (!readerClosed) {
-            // Close the buffered reader first
-            if (bufferedReader != null) {
-                bufferedReader.close();
-            }
-            // Explicitly close the decompressed stream to prevent resource 
leaks
-            if (decompressedStream != null) {
-                decompressedStream.close();
+            if (lineReader != null) {
+                lineReader.close();
             }
             readerClosed = true;
         }
@@ -175,51 +155,6 @@ public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow
      * @throws IOException if an I/O error occurs or line exceeds maximum 
length
      */
     protected String readLine() throws IOException {
-        // Fast path: use BufferedReader for standard delimiters
-        if (recordDelimiterBytes == null || recordDelimiterBytes.length == 0) {
-            return bufferedReader.readLine();
-        }
-
-        ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
-        int matchIndex = 0;
-
-        while (true) {
-            int b = decompressedStream.read();
-            if (b == -1) {
-                // End of stream: flush any partially matched delimiter bytes 
to output
-                if (matchIndex > 0) {
-                    out.write(recordDelimiterBytes, 0, matchIndex);
-                }
-                // Return null if nothing was read, otherwise return the 
accumulated line
-                return out.size() == 0 ? null : 
out.toString(StandardCharsets.UTF_8.name());
-            }
-
-            // Guard against extremely long lines that could cause memory 
issues
-            if (MAX_LINE_LENGTH - matchIndex < out.size()) {
-                throw new IOException("Line exceeds maximum length: " + 
MAX_LINE_LENGTH);
-            }
-
-            byte current = (byte) b;
-            if (current == recordDelimiterBytes[matchIndex]) {
-                // Current byte matches the next expected delimiter byte
-                matchIndex++;
-                if (matchIndex == recordDelimiterBytes.length) {
-                    // Complete delimiter found, return the line without the 
delimiter
-                    return out.toString(StandardCharsets.UTF_8.name());
-                }
-            } else if (matchIndex > 0) {
-                // Mismatch: handle partial matches
-                out.write(recordDelimiterBytes, 0, matchIndex);
-                if (current == recordDelimiterBytes[0]) {
-                    matchIndex = 1;
-                } else {
-                    out.write(current);
-                    matchIndex = 0;
-                }
-            } else {
-                // just add the current byte to output
-                out.write(current);
-            }
-        }
+        return lineReader.readLine();
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileWriter.java
 b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
similarity index 94%
rename from 
paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileWriter.java
rename to 
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
index e36acb318a..8cf56da017 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
@@ -31,14 +31,13 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 
 /** Base class for text-based format writers that provides common 
functionality. */
-public abstract class BaseTextFileWriter implements FormatWriter {
+public abstract class TextFileWriter implements FormatWriter {
 
     protected final PositionOutputStream outputStream;
     protected final BufferedWriter writer;
     protected final RowType rowType;
 
-    protected BaseTextFileWriter(
-            PositionOutputStream outputStream, RowType rowType, String 
compression)
+    protected TextFileWriter(PositionOutputStream outputStream, RowType 
rowType, String compression)
             throws IOException {
         this.outputStream = outputStream;
         OutputStream compressedStream =
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
new file mode 100644
index 0000000000..15c671adc1
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextLineReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** Reader to read lines. */
+public interface TextLineReader extends Closeable {
+
+    String readLine() throws IOException;
+
+    static TextLineReader create(InputStream inputStream, String delimiter) {
+        byte[] delimiterBytes =
+                delimiter != null && !"\n".equals(delimiter)
+                        ? delimiter.getBytes(StandardCharsets.UTF_8)
+                        : null;
+        if (delimiterBytes == null || delimiterBytes.length == 0) {
+            return new StandardLineReader(inputStream);
+        } else {
+            return new CustomLineReader(inputStream, delimiterBytes);
+        }
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
similarity index 97%
rename from 
paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
rename to 
paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
index efb6cb1e4d..2f87831fd5 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
@@ -40,8 +40,8 @@ import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link BaseTextFileReader}. */
-public class BaseTextFileReaderTest {
+/** Test for {@link TextFileReader}. */
+public class TextFileReaderTest {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -199,7 +199,7 @@ public class BaseTextFileReaderTest {
     }
 
     /** Concrete implementation of BaseTextFileReader for testing. */
-    private static class TestTextFileReader extends BaseTextFileReader {
+    private static class TestTextFileReader extends TextFileReader {
 
         public TestTextFileReader(
                 FileIO fileIO, Path filePath, RowType rowType, String 
recordDelimiter)

Reply via email to