This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ca700a44cd [core] format table: support line delimiter for csv and
json (#6517)
ca700a44cd is described below
commit ca700a44cdc3ce3ac0cbfc1a57b0ca7a18659338
Author: jerry <[email protected]>
AuthorDate: Wed Nov 5 22:17:49 2025 +0800
[core] format table: support line delimiter for csv and json (#6517)
---
.../apache/paimon/format/csv/CsvFileReader.java | 4 +-
.../apache/paimon/format/json/JsonFileReader.java | 2 +-
.../paimon/format/json/JsonFormatWriter.java | 4 +-
.../paimon/format/text/BaseTextFileReader.java | 82 +++++++-
.../paimon/format/csv/CsvFileFormatTest.java | 42 ++++
.../paimon/format/json/JsonFileFormatTest.java | 4 +-
.../paimon/format/text/BaseTextFileReaderTest.java | 217 +++++++++++++++++++++
.../paimon/spark/table/PaimonFormatTableTest.scala | 19 ++
8 files changed, 365 insertions(+), 9 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index d8c9eef35a..0b7e305740 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -41,7 +41,7 @@ public class CsvFileReader extends BaseTextFileReader {
RowType projectedRowType,
CsvOptions options)
throws IOException {
- super(fileIO, filePath, projectedRowType);
+ super(fileIO, filePath, projectedRowType, options.lineDelimiter());
this.includeHeader = options.includeHeader();
this.csvParser =
new CsvParser(
@@ -59,7 +59,7 @@ public class CsvFileReader extends BaseTextFileReader {
protected void setupReading() throws IOException {
// Skip header if needed
if (includeHeader && !headerSkipped) {
- bufferedReader.readLine();
+ readLine();
headerSkipped = true;
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 497d5eca43..313465280c 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -55,7 +55,7 @@ public class JsonFileReader extends BaseTextFileReader {
public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType,
JsonOptions options)
throws IOException {
- super(fileIO, filePath, rowType);
+ super(fileIO, filePath, rowType, options.getLineDelimiter());
this.options = options;
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index 44e53431e2..a145e95419 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -49,7 +49,7 @@ public class JsonFormatWriter extends BaseTextFileWriter {
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
- private final char lineDelimiter;
+ private final String lineDelimiter;
public JsonFormatWriter(
PositionOutputStream outputStream,
@@ -58,7 +58,7 @@ public class JsonFormatWriter extends BaseTextFileWriter {
String compression)
throws IOException {
super(outputStream, rowType, compression);
- this.lineDelimiter = options.getLineDelimiter().charAt(0);
+ this.lineDelimiter = options.getLineDelimiter();
}
@Override
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
index 87504379a1..22f187ee12 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/BaseTextFileReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -36,18 +37,27 @@ import java.nio.charset.StandardCharsets;
/** Base class for text-based file readers that provides common functionality.
*/
public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow> {
+ private static final int MAX_LINE_LENGTH = Integer.MAX_VALUE;
+
private final Path filePath;
private final InputStream decompressedStream;
private final TextRecordIterator reader;
protected final RowType rowType;
protected final BufferedReader bufferedReader;
+ protected final byte[] recordDelimiterBytes;
protected boolean readerClosed = false;
- protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType
rowType) throws IOException {
+ protected BaseTextFileReader(
+ FileIO fileIO, Path filePath, RowType rowType, String
recordDelimiter)
+ throws IOException {
this.filePath = filePath;
this.rowType = rowType;
+ this.recordDelimiterBytes =
+ recordDelimiter != null && !"\n".equals(recordDelimiter)
+ ? recordDelimiter.getBytes(StandardCharsets.UTF_8)
+ : null;
this.decompressedStream =
HadoopCompressionUtils.createDecompressedInputStream(
fileIO.newInputStream(filePath), filePath);
@@ -115,7 +125,7 @@ public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow
if (readerClosed) {
return null;
}
- String nextLine = bufferedReader.readLine();
+ String nextLine = readLine();
if (nextLine == null) {
end = true;
return null;
@@ -144,4 +154,72 @@ public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow
return Math.max(0, currentPosition - 1);
}
}
+
+ /**
+ * Reads a single line from the input stream, using either the default
line delimiter or a
+ * custom delimiter.
+ *
+ * <p>This method supports multi-character custom delimiters by using a
simple pattern matching
+ * algorithm. For standard delimiters (null or empty), it delegates to
BufferedReader's
+ * readLine() for optimal performance.
+ *
+ * <p>The algorithm maintains a partial match index and accumulates bytes
until:
+ *
+ * <ul>
+ * <li>A complete delimiter is found (returns line without delimiter)
+ * <li>End of stream is reached (returns accumulated data or null if
empty)
+ * <li>Maximum line length is exceeded (throws IOException)
+ * </ul>
+ *
+ * @return the next line as a string (without delimiter), or null if end
of stream
+ * @throws IOException if an I/O error occurs or line exceeds maximum
length
+ */
+ protected String readLine() throws IOException {
+ // Fast path: use BufferedReader for standard delimiters
+ if (recordDelimiterBytes == null || recordDelimiterBytes.length == 0) {
+ return bufferedReader.readLine();
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
+ int matchIndex = 0;
+
+ while (true) {
+ int b = decompressedStream.read();
+ if (b == -1) {
+ // End of stream: flush any partially matched delimiter bytes
to output
+ if (matchIndex > 0) {
+ out.write(recordDelimiterBytes, 0, matchIndex);
+ }
+ // Return null if nothing was read, otherwise return the
accumulated line
+ return out.size() == 0 ? null :
out.toString(StandardCharsets.UTF_8.name());
+ }
+
+ // Guard against extremely long lines that could cause memory
issues
+ if (MAX_LINE_LENGTH - matchIndex < out.size()) {
+ throw new IOException("Line exceeds maximum length: " +
MAX_LINE_LENGTH);
+ }
+
+ byte current = (byte) b;
+ if (current == recordDelimiterBytes[matchIndex]) {
+ // Current byte matches the next expected delimiter byte
+ matchIndex++;
+ if (matchIndex == recordDelimiterBytes.length) {
+ // Complete delimiter found, return the line without the
delimiter
+ return out.toString(StandardCharsets.UTF_8.name());
+ }
+ } else if (matchIndex > 0) {
+ // Mismatch: handle partial matches
+ out.write(recordDelimiterBytes, 0, matchIndex);
+ if (current == recordDelimiterBytes[0]) {
+ matchIndex = 1;
+ } else {
+ out.write(current);
+ matchIndex = 0;
+ }
+ } else {
+ // just add the current byte to output
+ out.write(current);
+ }
+ }
+ }
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index a9a0c845ef..a325bdb415 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -239,6 +239,48 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
}
}
+ @Test
+ public void testCustomLineDelimiter() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.STRING(),
+ DataTypes.DOUBLE().notNull());
+
+ String[] customDelimiters = {"|||", "###", "<EOL>", "\t\t", "abc"};
+
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("ab"), 100.5),
+ GenericRow.of(2, BinaryString.fromString("Bob"),
200.75),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
300.25));
+
+ for (String delimiter : customDelimiters) {
+ Options options = new Options();
+ options.set(CsvOptions.LINE_DELIMITER, delimiter);
+
+ List<InternalRow> result =
+ writeThenRead(
+ options,
+ rowType,
+ rowType,
+ testData,
+ "test_custom_line_delim_" + delimiter.hashCode());
+
+ // Verify results
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(0).getString(1).toString()).isEqualTo("ab");
+ assertThat(result.get(0).getDouble(2)).isEqualTo(100.5);
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob");
+ assertThat(result.get(1).getDouble(2)).isEqualTo(200.75);
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+ assertThat(result.get(2).getDouble(2)).isEqualTo(300.25);
+ }
+ }
+
@Test
public void testCsvQuoteCharacterWriteRead() throws IOException {
RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.STRING());
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
index c230b3dabb..92ad9c0098 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
@@ -333,14 +333,14 @@ public class JsonFileFormatTest extends
FormatReadWriteTest {
}
@Test
- public void testJsonWriteReadWithDifferentLineDelimiters() throws
IOException {
+ public void testWithCustomLineDelimiters() throws IOException {
RowType rowType =
DataTypes.ROW(
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
- String[] delimiters = {"\n", "\r", "\r\n"};
+ String[] delimiters = {"\n", "\r", "\r\n", "||", "###", "@@", "\t",
"::"};
// Create test data once (reused for all delimiters)
List<InternalRow> testData =
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
new file mode 100644
index 0000000000..efb6cb1e4d
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/text/BaseTextFileReaderTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BaseTextFileReader}. */
+public class BaseTextFileReaderTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private FileIO fileIO;
+ private Path testFile;
+ private RowType rowType;
+
+ @BeforeEach
+ public void setUp() {
+ fileIO = new LocalFileIO();
+ testFile = new Path(tempDir.toString(), "test.txt");
+ rowType = DataTypes.ROW(DataTypes.STRING());
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ if (fileIO.exists(testFile)) {
+ fileIO.delete(testFile, false);
+ }
+ }
+
+ @Test
+ public void testReadLineWithDefaultDelimiter() throws IOException {
+ // Write test data with \n delimiter
+ writeFile("line1\nline2\nline3");
+
+ List<String> lines = readAllLines(null);
+
+ assertThat(lines).hasSize(3);
+ assertThat(lines.get(0)).isEqualTo("line1");
+ assertThat(lines.get(1)).isEqualTo("line2");
+ assertThat(lines.get(2)).isEqualTo("line3");
+ }
+
+ @Test
+ public void testReadLineWithCarriageReturn() throws IOException {
+ writeFile("line1\rline2\rline3");
+
+ List<String> lines = readAllLines("\r");
+
+ assertThat(lines).hasSize(3);
+ assertThat(lines.get(0)).isEqualTo("line1");
+ assertThat(lines.get(1)).isEqualTo("line2");
+ assertThat(lines.get(2)).isEqualTo("line3");
+ }
+
+ @Test
+ public void testReadLineWithCRLF() throws IOException {
+ writeFile("line1\r\nline2\r\nline3");
+
+ List<String> lines = readAllLines("\r\n");
+
+ assertThat(lines).hasSize(3);
+ assertThat(lines.get(0)).isEqualTo("line1");
+ assertThat(lines.get(1)).isEqualTo("line2");
+ assertThat(lines.get(2)).isEqualTo("line3");
+ }
+
+ @Test
+ public void testReadLineWithCustomMultiCharDelimiter() throws IOException {
+ writeFile("||ab|aba||||line3||line4||");
+
+ List<String> lines = readAllLines("||");
+
+ assertThat(lines).hasSize(5);
+ assertThat(lines.get(0)).isEqualTo("");
+ assertThat(lines.get(1)).isEqualTo("ab|aba");
+ assertThat(lines.get(2)).isEqualTo("");
+ assertThat(lines.get(3)).isEqualTo("line3");
+ assertThat(lines.get(4)).isEqualTo("line4");
+ }
+
+ @Test
+ public void testReadLineWithEmptyFile() throws IOException {
+ writeFile("");
+
+ List<String> lines = readAllLines("||");
+
+ assertThat(lines).isEmpty();
+ }
+
+ @Test
+ public void test() throws IOException {
+ writeFile("ababcab");
+
+ List<String> lines = readAllLines("abc");
+
+ assertThat(lines).hasSize(2);
+ }
+
+ @Test
+ public void testReadLineWithSingleLine() throws IOException {
+ writeFile("single line");
+
+ List<String> lines = readAllLines("||");
+
+ assertThat(lines).hasSize(1);
+ assertThat(lines.get(0)).isEqualTo("single line");
+ }
+
+ @Test
+ public void testReadLineWithUnicodeDelimiter() throws IOException {
+ writeFile("line1§§line2§§line3");
+
+ List<String> lines = readAllLines("§§");
+
+ assertThat(lines).hasSize(3);
+ assertThat(lines.get(0)).isEqualTo("line1");
+ assertThat(lines.get(1)).isEqualTo("line2");
+ assertThat(lines.get(2)).isEqualTo("line3");
+ }
+
+ @Test
+ public void testReadLineWithOverlappingPattern() throws IOException {
+ // Test delimiter "aba" with content "xababa"
+ // Should correctly split at "aba" boundaries
+ writeFile("xababay");
+
+ List<String> lines = readAllLines("aba");
+
+ assertThat(lines).hasSize(2);
+ assertThat(lines.get(0)).isEqualTo("x");
+ assertThat(lines.get(1)).isEqualTo("bay");
+ }
+
+ @Test
+ public void testReadLineWithMultiByteUTF8Delimiter() throws IOException {
+ // Emoji delimiter
+ writeFile("line1😀line2😀line3");
+
+ List<String> lines = readAllLines("😀");
+
+ assertThat(lines).hasSize(3);
+ assertThat(lines.get(0)).isEqualTo("line1");
+ assertThat(lines.get(1)).isEqualTo("line2");
+ assertThat(lines.get(2)).isEqualTo("line3");
+ }
+
+ private void writeFile(String content) throws IOException {
+ try (PositionOutputStream out = fileIO.newOutputStream(testFile,
false)) {
+ out.write(content.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ private List<String> readAllLines(@Nullable String delimiter) throws
IOException {
+ List<String> lines = new ArrayList<>();
+ try (TestTextFileReader reader =
+ new TestTextFileReader(fileIO, testFile, rowType, delimiter)) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
+ }
+ return lines;
+ }
+
+ /** Concrete implementation of BaseTextFileReader for testing. */
+ private static class TestTextFileReader extends BaseTextFileReader {
+
+ public TestTextFileReader(
+ FileIO fileIO, Path filePath, RowType rowType, String
recordDelimiter)
+ throws IOException {
+ super(fileIO, filePath, rowType, recordDelimiter);
+ }
+
+ @Nullable
+ @Override
+ protected InternalRow parseLine(String line) throws IOException {
+ // Not used in these tests
+ return null;
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 44c6abb297..d2b1104480 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -61,6 +61,25 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
+ test("PaimonFormatTable table: csv custom line delimiter") {
+ val tableName = "paimon_format_test_csv_custom_lime_delimiter"
+ withTable(tableName) {
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (age INT, name STRING)
+ |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon',
'file.compression'='gzip', 'lineSep'='abc')
+ |""".stripMargin)
+ val table =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ table.fileIO().mkdirs(new Path(table.location()))
+ spark.sql(s"INSERT INTO $tableName VALUES (5, 'ab'), (7, 'Larry')")
+ checkAnswer(
+ spark.sql(s"SELECT age, name FROM $tableName ORDER BY age"),
+ Row(5, "ab") :: Row(7, "Larry") :: Nil
+ )
+ }
+ }
+
test("PaimonFormatTable non partition table overwrite: csv") {
val tableName = "paimon_non_partiiton_overwrite_test"
withTable(tableName) {