This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ca700a44cd [core] format table: support line delimiter for csv and 
json (#6517)
ca700a44cd is described below

commit ca700a44cdc3ce3ac0cbfc1a57b0ca7a18659338
Author: jerry <[email protected]>
AuthorDate: Wed Nov 5 22:17:49 2025 +0800

    [core] format table: support line delimiter for csv and json (#6517)
---
 .../apache/paimon/format/csv/CsvFileReader.java    |   4 +-
 .../apache/paimon/format/json/JsonFileReader.java  |   2 +-
 .../paimon/format/json/JsonFormatWriter.java       |   4 +-
 .../paimon/format/text/BaseTextFileReader.java     |  82 +++++++-
 .../paimon/format/csv/CsvFileFormatTest.java       |  42 ++++
 .../paimon/format/json/JsonFileFormatTest.java     |   4 +-
 .../paimon/format/text/BaseTextFileReaderTest.java | 217 +++++++++++++++++++++
 .../paimon/spark/table/PaimonFormatTableTest.scala |  19 ++
 8 files changed, 365 insertions(+), 9 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index d8c9eef35a..0b7e305740 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -41,7 +41,7 @@ public class CsvFileReader extends BaseTextFileReader {
             RowType projectedRowType,
             CsvOptions options)
             throws IOException {
-        super(fileIO, filePath, projectedRowType);
+        super(fileIO, filePath, projectedRowType, options.lineDelimiter());
         this.includeHeader = options.includeHeader();
         this.csvParser =
                 new CsvParser(
@@ -59,7 +59,7 @@ public class CsvFileReader extends BaseTextFileReader {
     protected void setupReading() throws IOException {
         // Skip header if needed
         if (includeHeader && !headerSkipped) {
-            bufferedReader.readLine();
+            readLine();
             headerSkipped = true;
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 497d5eca43..313465280c 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -55,7 +55,7 @@ public class JsonFileReader extends BaseTextFileReader {
 
     public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType, 
JsonOptions options)
             throws IOException {
-        super(fileIO, filePath, rowType);
+        super(fileIO, filePath, rowType, options.getLineDelimiter());
         this.options = options;
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index 44e53431e2..a145e95419 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -49,7 +49,7 @@ public class JsonFormatWriter extends BaseTextFileWriter {
 
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
 
-    private final char lineDelimiter;
+    private final String lineDelimiter;
 
     public JsonFormatWriter(
             PositionOutputStream outputStream,
@@ -58,7 +58,7 @@ public class JsonFormatWriter extends BaseTextFileWriter {
             String compression)
             throws IOException {
         super(outputStream, rowType, compression);
-        this.lineDelimiter = options.getLineDelimiter().charAt(0);
+        this.lineDelimiter = options.getLineDelimiter();
     }
 
     @Override
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
index 87504379a1..22f187ee12 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.RowType;
 import javax.annotation.Nullable;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -36,18 +37,27 @@ import java.nio.charset.StandardCharsets;
 /** Base class for text-based file readers that provides common functionality. 
*/
 public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow> {
 
+    private static final int MAX_LINE_LENGTH = Integer.MAX_VALUE;
+
     private final Path filePath;
     private final InputStream decompressedStream;
     private final TextRecordIterator reader;
 
     protected final RowType rowType;
     protected final BufferedReader bufferedReader;
+    protected final byte[] recordDelimiterBytes;
 
     protected boolean readerClosed = false;
 
-    protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType 
rowType) throws IOException {
+    protected BaseTextFileReader(
+            FileIO fileIO, Path filePath, RowType rowType, String 
recordDelimiter)
+            throws IOException {
         this.filePath = filePath;
         this.rowType = rowType;
+        this.recordDelimiterBytes =
+                recordDelimiter != null && !"\n".equals(recordDelimiter)
+                        ? recordDelimiter.getBytes(StandardCharsets.UTF_8)
+                        : null;
         this.decompressedStream =
                 HadoopCompressionUtils.createDecompressedInputStream(
                         fileIO.newInputStream(filePath), filePath);
@@ -115,7 +125,7 @@ public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow
                 if (readerClosed) {
                     return null;
                 }
-                String nextLine = bufferedReader.readLine();
+                String nextLine = readLine();
                 if (nextLine == null) {
                     end = true;
                     return null;
@@ -144,4 +154,72 @@ public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow
             return Math.max(0, currentPosition - 1);
         }
     }
+
+    /**
+     * Reads a single line from the input stream, using either the default 
line delimiter or a
+     * custom delimiter.
+     *
+     * <p>This method supports multi-character custom delimiters by using a 
simple pattern matching
+     * algorithm. For standard delimiters (null or empty), it delegates to 
BufferedReader's
+     * readLine() for optimal performance.
+     *
+     * <p>The algorithm maintains a partial match index and accumulates bytes 
until:
+     *
+     * <ul>
+     *   <li>A complete delimiter is found (returns line without delimiter)
+     *   <li>End of stream is reached (returns accumulated data or null if 
empty)
+     *   <li>Maximum line length is exceeded (throws IOException)
+     * </ul>
+     *
+     * @return the next line as a string (without delimiter), or null if end 
of stream
+     * @throws IOException if an I/O error occurs or line exceeds maximum 
length
+     */
+    protected String readLine() throws IOException {
+        // Fast path: use BufferedReader for standard delimiters
+        if (recordDelimiterBytes == null || recordDelimiterBytes.length == 0) {
+            return bufferedReader.readLine();
+        }
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
+        int matchIndex = 0;
+
+        while (true) {
+            int b = decompressedStream.read();
+            if (b == -1) {
+                // End of stream: flush any partially matched delimiter bytes 
to output
+                if (matchIndex > 0) {
+                    out.write(recordDelimiterBytes, 0, matchIndex);
+                }
+                // Return null if nothing was read, otherwise return the 
accumulated line
+                return out.size() == 0 ? null : 
out.toString(StandardCharsets.UTF_8.name());
+            }
+
+            // Guard against extremely long lines that could cause memory 
issues
+            if (MAX_LINE_LENGTH - matchIndex < out.size()) {
+                throw new IOException("Line exceeds maximum length: " + 
MAX_LINE_LENGTH);
+            }
+
+            byte current = (byte) b;
+            if (current == recordDelimiterBytes[matchIndex]) {
+                // Current byte matches the next expected delimiter byte
+                matchIndex++;
+                if (matchIndex == recordDelimiterBytes.length) {
+                    // Complete delimiter found, return the line without the 
delimiter
+                    return out.toString(StandardCharsets.UTF_8.name());
+                }
+            } else if (matchIndex > 0) {
+                // Mismatch: handle partial matches
+                out.write(recordDelimiterBytes, 0, matchIndex);
+                if (current == recordDelimiterBytes[0]) {
+                    matchIndex = 1;
+                } else {
+                    out.write(current);
+                    matchIndex = 0;
+                }
+            } else {
+                // just add the current byte to output
+                out.write(current);
+            }
+        }
+    }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index a9a0c845ef..a325bdb415 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -239,6 +239,48 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
         }
     }
 
+    @Test
+    public void testCustomLineDelimiter() throws IOException {
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.INT().notNull(),
+                        DataTypes.STRING(),
+                        DataTypes.DOUBLE().notNull());
+
+        String[] customDelimiters = {"|||", "###", "<EOL>", "\t\t", "abc"};
+
+        List<InternalRow> testData =
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("ab"), 100.5),
+                        GenericRow.of(2, BinaryString.fromString("Bob"), 
200.75),
+                        GenericRow.of(3, BinaryString.fromString("Charlie"), 
300.25));
+
+        for (String delimiter : customDelimiters) {
+            Options options = new Options();
+            options.set(CsvOptions.LINE_DELIMITER, delimiter);
+
+            List<InternalRow> result =
+                    writeThenRead(
+                            options,
+                            rowType,
+                            rowType,
+                            testData,
+                            "test_custom_line_delim_" + delimiter.hashCode());
+
+            // Verify results
+            assertThat(result).hasSize(3);
+            assertThat(result.get(0).getInt(0)).isEqualTo(1);
+            assertThat(result.get(0).getString(1).toString()).isEqualTo("ab");
+            assertThat(result.get(0).getDouble(2)).isEqualTo(100.5);
+            assertThat(result.get(1).getInt(0)).isEqualTo(2);
+            assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob");
+            assertThat(result.get(1).getDouble(2)).isEqualTo(200.75);
+            assertThat(result.get(2).getInt(0)).isEqualTo(3);
+            
assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+            assertThat(result.get(2).getDouble(2)).isEqualTo(300.25);
+        }
+    }
+
     @Test
     public void testCsvQuoteCharacterWriteRead() throws IOException {
         RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), 
DataTypes.STRING());
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
index c230b3dabb..92ad9c0098 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
@@ -333,14 +333,14 @@ public class JsonFileFormatTest extends 
FormatReadWriteTest {
     }
 
     @Test
-    public void testJsonWriteReadWithDifferentLineDelimiters() throws 
IOException {
+    public void testWithCustomLineDelimiters() throws IOException {
         RowType rowType =
                 DataTypes.ROW(
                         DataTypes.INT().notNull(),
                         DataTypes.STRING(),
                         DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
 
-        String[] delimiters = {"\n", "\r", "\r\n"};
+        String[] delimiters = {"\n", "\r", "\r\n", "||", "###", "@@", "\t", 
"::"};
 
         // Create test data once (reused for all delimiters)
         List<InternalRow> testData =
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
new file mode 100644
index 0000000000..efb6cb1e4d
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BaseTextFileReader}. */
+public class BaseTextFileReaderTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private FileIO fileIO;
+    private Path testFile;
+    private RowType rowType;
+
+    @BeforeEach
+    public void setUp() {
+        fileIO = new LocalFileIO();
+        testFile = new Path(tempDir.toString(), "test.txt");
+        rowType = DataTypes.ROW(DataTypes.STRING());
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        if (fileIO.exists(testFile)) {
+            fileIO.delete(testFile, false);
+        }
+    }
+
+    @Test
+    public void testReadLineWithDefaultDelimiter() throws IOException {
+        // Write test data with \n delimiter
+        writeFile("line1\nline2\nline3");
+
+        List<String> lines = readAllLines(null);
+
+        assertThat(lines).hasSize(3);
+        assertThat(lines.get(0)).isEqualTo("line1");
+        assertThat(lines.get(1)).isEqualTo("line2");
+        assertThat(lines.get(2)).isEqualTo("line3");
+    }
+
+    @Test
+    public void testReadLineWithCarriageReturn() throws IOException {
+        writeFile("line1\rline2\rline3");
+
+        List<String> lines = readAllLines("\r");
+
+        assertThat(lines).hasSize(3);
+        assertThat(lines.get(0)).isEqualTo("line1");
+        assertThat(lines.get(1)).isEqualTo("line2");
+        assertThat(lines.get(2)).isEqualTo("line3");
+    }
+
+    @Test
+    public void testReadLineWithCRLF() throws IOException {
+        writeFile("line1\r\nline2\r\nline3");
+
+        List<String> lines = readAllLines("\r\n");
+
+        assertThat(lines).hasSize(3);
+        assertThat(lines.get(0)).isEqualTo("line1");
+        assertThat(lines.get(1)).isEqualTo("line2");
+        assertThat(lines.get(2)).isEqualTo("line3");
+    }
+
+    @Test
+    public void testReadLineWithCustomMultiCharDelimiter() throws IOException {
+        writeFile("||ab|aba||||line3||line4||");
+
+        List<String> lines = readAllLines("||");
+
+        assertThat(lines).hasSize(5);
+        assertThat(lines.get(0)).isEqualTo("");
+        assertThat(lines.get(1)).isEqualTo("ab|aba");
+        assertThat(lines.get(2)).isEqualTo("");
+        assertThat(lines.get(3)).isEqualTo("line3");
+        assertThat(lines.get(4)).isEqualTo("line4");
+    }
+
+    @Test
+    public void testReadLineWithEmptyFile() throws IOException {
+        writeFile("");
+
+        List<String> lines = readAllLines("||");
+
+        assertThat(lines).isEmpty();
+    }
+
+    @Test
+    public void test() throws IOException {
+        writeFile("ababcab");
+
+        List<String> lines = readAllLines("abc");
+
+        assertThat(lines).hasSize(2);
+    }
+
+    @Test
+    public void testReadLineWithSingleLine() throws IOException {
+        writeFile("single line");
+
+        List<String> lines = readAllLines("||");
+
+        assertThat(lines).hasSize(1);
+        assertThat(lines.get(0)).isEqualTo("single line");
+    }
+
+    @Test
+    public void testReadLineWithUnicodeDelimiter() throws IOException {
+        writeFile("line1§§line2§§line3");
+
+        List<String> lines = readAllLines("§§");
+
+        assertThat(lines).hasSize(3);
+        assertThat(lines.get(0)).isEqualTo("line1");
+        assertThat(lines.get(1)).isEqualTo("line2");
+        assertThat(lines.get(2)).isEqualTo("line3");
+    }
+
+    @Test
+    public void testReadLineWithOverlappingPattern() throws IOException {
+        // Test delimiter "aba" with content "xababa"
+        // Should correctly split at "aba" boundaries
+        writeFile("xababay");
+
+        List<String> lines = readAllLines("aba");
+
+        assertThat(lines).hasSize(2);
+        assertThat(lines.get(0)).isEqualTo("x");
+        assertThat(lines.get(1)).isEqualTo("bay");
+    }
+
+    @Test
+    public void testReadLineWithMultiByteUTF8Delimiter() throws IOException {
+        // Emoji delimiter
+        writeFile("line1😀line2😀line3");
+
+        List<String> lines = readAllLines("😀");
+
+        assertThat(lines).hasSize(3);
+        assertThat(lines.get(0)).isEqualTo("line1");
+        assertThat(lines.get(1)).isEqualTo("line2");
+        assertThat(lines.get(2)).isEqualTo("line3");
+    }
+
+    private void writeFile(String content) throws IOException {
+        try (PositionOutputStream out = fileIO.newOutputStream(testFile, 
false)) {
+            out.write(content.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    private List<String> readAllLines(@Nullable String delimiter) throws 
IOException {
+        List<String> lines = new ArrayList<>();
+        try (TestTextFileReader reader =
+                new TestTextFileReader(fileIO, testFile, rowType, delimiter)) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                lines.add(line);
+            }
+        }
+        return lines;
+    }
+
+    /** Concrete implementation of BaseTextFileReader for testing. */
+    private static class TestTextFileReader extends BaseTextFileReader {
+
+        public TestTextFileReader(
+                FileIO fileIO, Path filePath, RowType rowType, String 
recordDelimiter)
+                throws IOException {
+            super(fileIO, filePath, rowType, recordDelimiter);
+        }
+
+        @Nullable
+        @Override
+        protected InternalRow parseLine(String line) throws IOException {
+            // Not used in these tests
+            return null;
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 44c6abb297..d2b1104480 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -61,6 +61,25 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     }
   }
 
+  test("PaimonFormatTable table: csv custom line delimiter") {
+    val tableName = "paimon_format_test_csv_custom_lime_delimiter"
+    withTable(tableName) {
+      spark.sql(
+        s"""
+           |CREATE TABLE $tableName (age INT, name STRING)
+           |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 
'file.compression'='gzip', 'lineSep'='abc')
+           |""".stripMargin)
+      val table =
+        paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).asInstanceOf[FormatTable]
+      table.fileIO().mkdirs(new Path(table.location()))
+      spark.sql(s"INSERT INTO $tableName  VALUES (5, 'ab'), (7, 'Larry')")
+      checkAnswer(
+        spark.sql(s"SELECT age, name FROM $tableName ORDER BY age"),
+        Row(5, "ab") :: Row(7, "Larry") :: Nil
+      )
+    }
+  }
+
   test("PaimonFormatTable non partition table overwrite: csv") {
     val tableName = "paimon_non_partiiton_overwrite_test"
     withTable(tableName) {

Reply via email to