This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 91fae29f2f [core] support csv parse mode (#6350)
91fae29f2f is described below
commit 91fae29f2fd1a88054e01abb638faef5e898fde2
Author: jerry <[email protected]>
AuthorDate: Mon Sep 29 23:30:48 2025 +0800
[core] support csv parse mode (#6350)
---
docs/content/concepts/spec/fileformat.md | 12 ++++
.../apache/paimon/format/csv/CsvFileReader.java | 39 +++++++++--
.../org/apache/paimon/format/csv/CsvOptions.java | 47 +++++++++++++
.../paimon/format/csv/CsvFileFormatTest.java | 78 +++++++++++++++++++---
.../paimon/spark/table/PaimonFormatTableTest.scala | 31 ++++++++-
5 files changed, 191 insertions(+), 16 deletions(-)
diff --git a/docs/content/concepts/spec/fileformat.md
b/docs/content/concepts/spec/fileformat.md
index 93debdca10..5c69b494cc 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -419,6 +419,18 @@ Format Options:
<td>String</td>
<td>Null literal string that is interpreted as a null value (disabled by
default).</td>
</tr>
+ <tr>
+ <td><h5>csv.mode</h5></td>
+ <td style="word-wrap: break-word;"><code>PERMISSIVE</code></td>
+ <td>String</td>
+ <td>Allows a mode for dealing with corrupt records during reading.
Currently supported values are <code>'PERMISSIVE'</code>,
<code>'DROPMALFORMED'</code> and <code>'FAILFAST'</code>:
+ <ul>
+ <li>Option <code>'PERMISSIVE'</code> sets malformed fields to null.</li>
+ <li>Option <code>'DROPMALFORMED'</code> ignores the whole corrupted
records.</li>
+ <li>Option <code>'FAILFAST'</code> throws an exception when it meets
corrupted records.</li>
+ </ul>
+ </td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 0e215aa3fb..daa9dc5659 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -44,6 +44,7 @@ public class CsvFileReader extends BaseTextFileReader {
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
private static final CsvMapper CSV_MAPPER = new CsvMapper();
+ private static final InternalRow DROP_ROW = new GenericRow(1);
// Performance optimization: Cache frequently used cast executors
private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
@@ -98,8 +99,25 @@ public class CsvFileReader extends BaseTextFileReader {
}
private class CsvRecordIterator extends BaseTextRecordIterator {
- // Inherits all functionality from BaseTextRecordIterator
- // No additional CSV-specific iterator logic needed
+ @Override
+ public InternalRow next() throws IOException {
+ while (true) {
+ if (readerClosed) {
+ return null;
+ }
+ String nextLine = bufferedReader.readLine();
+ if (nextLine == null) {
+ end = true;
+ return null;
+ }
+
+ currentPosition++;
+ InternalRow row = parseLine(nextLine);
+ if (row != DROP_ROW) {
+ return row;
+ }
+ }
+ }
}
protected static String[] parseCsvLineToArray(String line, CsvSchema
schema)
@@ -148,8 +166,21 @@ public class CsvFileReader extends BaseTextFileReader {
}
// Optimized field parsing with cached cast executors
- projectedValues[i] =
- parseFieldOptimized(field.trim(),
dataSchemaRowType.getTypeAt(readIndex));
+ try {
+ projectedValues[i] =
+ parseFieldOptimized(
+ field.trim(),
dataSchemaRowType.getTypeAt(readIndex));
+ } catch (Exception e) {
+ switch (formatOptions.mode()) {
+ case PERMISSIVE:
+ projectedValues[i] = null;
+ break;
+ case DROPMALFORMED:
+ return DROP_ROW;
+ case FAILFAST:
+ throw e;
+ }
+ }
} else {
projectedValues[i] = null; // Field not present in the CSV line
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
index 18bff8a474..03d0a382c0 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
@@ -21,6 +21,10 @@ package org.apache.paimon.format.csv;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+
+import static org.apache.paimon.options.description.TextElement.text;
/** Options for csv format. */
public class CsvOptions {
@@ -63,12 +67,21 @@ public class CsvOptions {
.defaultValue("")
.withDescription("The literal for null values in CSV
format");
+ public static final ConfigOption<Mode> MODE =
+ ConfigOptions.key("csv.mode")
+ .enumType(Mode.class)
+ .defaultValue(Mode.PERMISSIVE)
+ .withFallbackKeys("mode")
+ .withDescription(
+ "Allows a mode for dealing with corrupt records
during reading.");
+
private final String fieldDelimiter;
private final String lineDelimiter;
private final String nullLiteral;
private final boolean includeHeader;
private final String quoteCharacter;
private final String escapeCharacter;
+ private final Mode mode;
public CsvOptions(Options options) {
this.fieldDelimiter = options.get(FIELD_DELIMITER);
@@ -77,6 +90,7 @@ public class CsvOptions {
this.includeHeader = options.get(INCLUDE_HEADER);
this.quoteCharacter = options.get(QUOTE_CHARACTER);
this.escapeCharacter = options.get(ESCAPE_CHARACTER);
+ this.mode = options.get(MODE);
}
public String fieldDelimiter() {
@@ -102,4 +116,37 @@ public class CsvOptions {
public String escapeCharacter() {
return escapeCharacter;
}
+
+ public Mode mode() {
+ return mode;
+ }
+
+ /** Mode for dealing with corrupt records during reading. */
+ public enum Mode implements DescribedEnum {
+ PERMISSIVE("permissive", "Sets malformed fields to null."),
+ DROPMALFORMED("dropmalformed", "Ignores the whole corrupted records."),
+ FAILFAST("failfast", "Throws an exception when it meets corrupted
records.");
+
+ private final String value;
+ private final String description;
+
+ Mode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index a1ae1a2772..a64f30fcb5 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link CsvFileFormat}. */
public class CsvFileFormatTest extends FormatReadWriteTest {
@@ -444,6 +445,71 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
assertThat(result.get(3).isNullAt(3)).isTrue();
}
+ @Test
+ public void testCsvModeWriteRead() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING(),
DataTypes.DOUBLE());
+
+ // Test PERMISSIVE mode
+ Options permissiveOptions = new Options();
+ permissiveOptions.set(CsvOptions.MODE, CsvOptions.Mode.PERMISSIVE);
+ FileFormat format =
+ new CsvFileFormatFactory().create(new
FormatContext(permissiveOptions, 1024, 1024));
+ Path testFile = new Path(parent, "test_mode_" + UUID.randomUUID() +
".csv");
+
+ fileIO.writeFile(testFile, "1,Alice,aaaa,100.23\n2,Bob,200.75", false);
+ List<InternalRow> permissiveResult = read(format, rowType, rowType,
testFile);
+ assertThat(permissiveResult).hasSize(2);
+ assertThat(permissiveResult.get(0).getInt(0)).isEqualTo(1);
+
assertThat(permissiveResult.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(permissiveResult.get(0).isNullAt(2)).isTrue();
+ assertThat(permissiveResult.get(1).getInt(0)).isEqualTo(2);
+
assertThat(permissiveResult.get(1).getString(1).toString()).isEqualTo("Bob");
+ assertThat(permissiveResult.get(1).getDouble(2)).isEqualTo(200.75);
+
+ // Test DROPMALFORMED mode
+ Options dropMalformedOptions = new Options();
+ dropMalformedOptions.set(CsvOptions.MODE,
CsvOptions.Mode.DROPMALFORMED);
+ format =
+ new CsvFileFormatFactory()
+ .create(new FormatContext(dropMalformedOptions, 1024,
1024));
+ List<InternalRow> dropMalformedResult = read(format, rowType, rowType,
testFile);
+ assertThat(dropMalformedResult).hasSize(1);
+ assertThat(dropMalformedResult.get(0).getInt(0)).isEqualTo(2);
+
assertThat(dropMalformedResult.get(0).getString(1).toString()).isEqualTo("Bob");
+ assertThat(dropMalformedResult.get(0).getDouble(2)).isEqualTo(200.75);
+
+ // Test FAILFAST mode
+ Options failFastOptions = new Options();
+ failFastOptions.set(CsvOptions.MODE, CsvOptions.Mode.FAILFAST);
+ assertThatThrownBy(
+ () -> {
+ read(
+ new CsvFileFormatFactory()
+ .create(new
FormatContext(failFastOptions, 1024, 1024)),
+ rowType,
+ rowType,
+ testFile);
+ })
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ private List<InternalRow> read(
+ FileFormat format, RowType fullRowType, RowType readRowType, Path
testFile)
+ throws IOException {
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(fullRowType, readRowType, new
ArrayList<>())
+ .createReader(
+ new FormatReaderContext(
+ fileIO, testFile,
fileIO.getFileSize(testFile)))) {
+
+ InternalRowSerializer serializer = new
InternalRowSerializer(readRowType);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+ return result;
+ }
+ }
+
@Override
protected RowType rowTypeForFullTypesTest() {
RowType.Builder builder =
@@ -581,16 +647,6 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
writer.addElement(row);
}
}
- try (RecordReader<InternalRow> reader =
- format.createReaderFactory(fullRowType, rowType, new
ArrayList<>())
- .createReader(
- new FormatReaderContext(
- fileIO, testFile,
fileIO.getFileSize(testFile)))) {
-
- InternalRowSerializer serializer = new
InternalRowSerializer(rowType);
- List<InternalRow> result = new ArrayList<>();
- reader.forEachRemaining(row -> result.add(serializer.copy(row)));
- return result;
- }
+ return read(format, fullRowType, rowType, testFile);
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 62d48ac94c..c01898a7c8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -35,6 +35,34 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
sql("USE test_db")
}
+ test("PaimonFormatTableRead table: csv mode") {
+ val tableName = "paimon_format_test_csv_malformed"
+ withTable(tableName) {
+ sql(
+ s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV
OPTIONS ('" +
+ s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+ s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
+
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')
PARTITIONED BY (`ds` bigint)")
+ val table =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ val partition = 20250920
+ val csvFile =
+ new Path(
+ table.location(),
+
s"ds=$partition/part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
+ table.fileIO().writeFile(csvFile,
"1|asfasdfsdf|aaaa|10\n2|asfasdfsdf|11", false)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName"),
+ Seq(Row(1, "asfasdfsdf", null, partition), Row(2, "asfasdfsdf", 11,
partition))
+ )
+ sql(s"Alter table $tableName SET TBLPROPERTIES ('mode'='dropmalformed')")
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName"),
+ Seq(Row(2, "asfasdfsdf", 11, partition))
+ )
+ }
+ }
+
test("PaimonFormatTableRead table: csv with field-delimiter") {
val tableName = "paimon_format_test_csv_options"
withTable(tableName) {
@@ -53,7 +81,8 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
table.fileIO().writeFile(csvFile, "1|asfasdfsdf\n2|asfasdfsdf", false)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
- Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition)))
+ Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition))
+ )
}
}