This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 91fae29f2f [core] support csv parse mode (#6350)
91fae29f2f is described below

commit 91fae29f2fd1a88054e01abb638faef5e898fde2
Author: jerry <[email protected]>
AuthorDate: Mon Sep 29 23:30:48 2025 +0800

    [core] support csv parse mode (#6350)
---
 docs/content/concepts/spec/fileformat.md           | 12 ++++
 .../apache/paimon/format/csv/CsvFileReader.java    | 39 +++++++++--
 .../org/apache/paimon/format/csv/CsvOptions.java   | 47 +++++++++++++
 .../paimon/format/csv/CsvFileFormatTest.java       | 78 +++++++++++++++++++---
 .../paimon/spark/table/PaimonFormatTableTest.scala | 31 ++++++++-
 5 files changed, 191 insertions(+), 16 deletions(-)

diff --git a/docs/content/concepts/spec/fileformat.md 
b/docs/content/concepts/spec/fileformat.md
index 93debdca10..5c69b494cc 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -419,6 +419,18 @@ Format Options:
       <td>String</td>
       <td>Null literal string that is interpreted as a null value (disabled by 
default).</td>
     </tr>
+    <tr>
+      <td><h5>csv.mode</h5></td>
+      <td style="word-wrap: break-word;"><code>PERMISSIVE</code></td>
+      <td>String</td>
+      <td>Allows a mode for dealing with corrupt records during reading. 
Currently supported values are <code>'PERMISSIVE'</code>, 
<code>'DROPMALFORMED'</code> and <code>'FAILFAST'</code>:
+      <ul>
+      <li>Option <code>'PERMISSIVE'</code> sets malformed fields to null.</li>
+      <li>Option <code>'DROPMALFORMED'</code> ignores the whole corrupted 
records.</li>
+      <li>Option <code>'FAILFAST'</code> throws an exception when it meets 
corrupted records.</li>
+      </ul>
+      </td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 0e215aa3fb..daa9dc5659 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -44,6 +44,7 @@ public class CsvFileReader extends BaseTextFileReader {
 
     private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
     private static final CsvMapper CSV_MAPPER = new CsvMapper();
+    private static final InternalRow DROP_ROW = new GenericRow(1);
 
     // Performance optimization: Cache frequently used cast executors
     private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
@@ -98,8 +99,25 @@ public class CsvFileReader extends BaseTextFileReader {
     }
 
     private class CsvRecordIterator extends BaseTextRecordIterator {
-        // Inherits all functionality from BaseTextRecordIterator
-        // No additional CSV-specific iterator logic needed
+        @Override
+        public InternalRow next() throws IOException {
+            while (true) {
+                if (readerClosed) {
+                    return null;
+                }
+                String nextLine = bufferedReader.readLine();
+                if (nextLine == null) {
+                    end = true;
+                    return null;
+                }
+
+                currentPosition++;
+                InternalRow row = parseLine(nextLine);
+                if (row != DROP_ROW) {
+                    return row;
+                }
+            }
+        }
     }
 
     protected static String[] parseCsvLineToArray(String line, CsvSchema 
schema)
@@ -148,8 +166,21 @@ public class CsvFileReader extends BaseTextFileReader {
                 }
 
                 // Optimized field parsing with cached cast executors
-                projectedValues[i] =
-                        parseFieldOptimized(field.trim(), 
dataSchemaRowType.getTypeAt(readIndex));
+                try {
+                    projectedValues[i] =
+                            parseFieldOptimized(
+                                    field.trim(), 
dataSchemaRowType.getTypeAt(readIndex));
+                } catch (Exception e) {
+                    switch (formatOptions.mode()) {
+                        case PERMISSIVE:
+                            projectedValues[i] = null;
+                            break;
+                        case DROPMALFORMED:
+                            return DROP_ROW;
+                        case FAILFAST:
+                            throw e;
+                    }
+                }
             } else {
                 projectedValues[i] = null; // Field not present in the CSV line
             }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
index 18bff8a474..03d0a382c0 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
@@ -21,6 +21,10 @@ package org.apache.paimon.format.csv;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+
+import static org.apache.paimon.options.description.TextElement.text;
 
 /** Options for csv format. */
 public class CsvOptions {
@@ -63,12 +67,21 @@ public class CsvOptions {
                     .defaultValue("")
                     .withDescription("The literal for null values in CSV 
format");
 
+    public static final ConfigOption<Mode> MODE =
+            ConfigOptions.key("csv.mode")
+                    .enumType(Mode.class)
+                    .defaultValue(Mode.PERMISSIVE)
+                    .withFallbackKeys("mode")
+                    .withDescription(
+                            "Allows a mode for dealing with corrupt records 
during reading.");
+
     private final String fieldDelimiter;
     private final String lineDelimiter;
     private final String nullLiteral;
     private final boolean includeHeader;
     private final String quoteCharacter;
     private final String escapeCharacter;
+    private final Mode mode;
 
     public CsvOptions(Options options) {
         this.fieldDelimiter = options.get(FIELD_DELIMITER);
@@ -77,6 +90,7 @@ public class CsvOptions {
         this.includeHeader = options.get(INCLUDE_HEADER);
         this.quoteCharacter = options.get(QUOTE_CHARACTER);
         this.escapeCharacter = options.get(ESCAPE_CHARACTER);
+        this.mode = options.get(MODE);
     }
 
     public String fieldDelimiter() {
@@ -102,4 +116,37 @@ public class CsvOptions {
     public String escapeCharacter() {
         return escapeCharacter;
     }
+
+    public Mode mode() {
+        return mode;
+    }
+
+    /** Mode for dealing with corrupt records during reading. */
+    public enum Mode implements DescribedEnum {
+        PERMISSIVE("permissive", "Sets malformed fields to null."),
+        DROPMALFORMED("dropmalformed", "Ignores the whole corrupted records."),
+        FAILFAST("failfast", "Throws an exception when it meets corrupted 
records.");
+
+        private final String value;
+        private final String description;
+
+        Mode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+
+        public String getValue() {
+            return value;
+        }
+    }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index a1ae1a2772..a64f30fcb5 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.data.BinaryString.fromString;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link CsvFileFormat}. */
 public class CsvFileFormatTest extends FormatReadWriteTest {
@@ -444,6 +445,71 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
         assertThat(result.get(3).isNullAt(3)).isTrue();
     }
 
+    @Test
+    public void testCsvModeWriteRead() throws IOException {
+        RowType rowType =
+                DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING(), 
DataTypes.DOUBLE());
+
+        // Test PERMISSIVE mode
+        Options permissiveOptions = new Options();
+        permissiveOptions.set(CsvOptions.MODE, CsvOptions.Mode.PERMISSIVE);
+        FileFormat format =
+                new CsvFileFormatFactory().create(new 
FormatContext(permissiveOptions, 1024, 1024));
+        Path testFile = new Path(parent, "test_mode_" + UUID.randomUUID() + 
".csv");
+
+        fileIO.writeFile(testFile, "1,Alice,aaaa,100.23\n2,Bob,200.75", false);
+        List<InternalRow> permissiveResult = read(format, rowType, rowType, 
testFile);
+        assertThat(permissiveResult).hasSize(2);
+        assertThat(permissiveResult.get(0).getInt(0)).isEqualTo(1);
+        
assertThat(permissiveResult.get(0).getString(1).toString()).isEqualTo("Alice");
+        assertThat(permissiveResult.get(0).isNullAt(2)).isTrue();
+        assertThat(permissiveResult.get(1).getInt(0)).isEqualTo(2);
+        
assertThat(permissiveResult.get(1).getString(1).toString()).isEqualTo("Bob");
+        assertThat(permissiveResult.get(1).getDouble(2)).isEqualTo(200.75);
+
+        // Test DROPMALFORMED mode
+        Options dropMalformedOptions = new Options();
+        dropMalformedOptions.set(CsvOptions.MODE, 
CsvOptions.Mode.DROPMALFORMED);
+        format =
+                new CsvFileFormatFactory()
+                        .create(new FormatContext(dropMalformedOptions, 1024, 
1024));
+        List<InternalRow> dropMalformedResult = read(format, rowType, rowType, 
testFile);
+        assertThat(dropMalformedResult).hasSize(1);
+        assertThat(dropMalformedResult.get(0).getInt(0)).isEqualTo(2);
+        
assertThat(dropMalformedResult.get(0).getString(1).toString()).isEqualTo("Bob");
+        assertThat(dropMalformedResult.get(0).getDouble(2)).isEqualTo(200.75);
+
+        // Test FAILFAST mode
+        Options failFastOptions = new Options();
+        failFastOptions.set(CsvOptions.MODE, CsvOptions.Mode.FAILFAST);
+        assertThatThrownBy(
+                        () -> {
+                            read(
+                                    new CsvFileFormatFactory()
+                                            .create(new 
FormatContext(failFastOptions, 1024, 1024)),
+                                    rowType,
+                                    rowType,
+                                    testFile);
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    private List<InternalRow> read(
+            FileFormat format, RowType fullRowType, RowType readRowType, Path 
testFile)
+            throws IOException {
+        try (RecordReader<InternalRow> reader =
+                format.createReaderFactory(fullRowType, readRowType, new 
ArrayList<>())
+                        .createReader(
+                                new FormatReaderContext(
+                                        fileIO, testFile, 
fileIO.getFileSize(testFile)))) {
+
+            InternalRowSerializer serializer = new 
InternalRowSerializer(readRowType);
+            List<InternalRow> result = new ArrayList<>();
+            reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+            return result;
+        }
+    }
+
     @Override
     protected RowType rowTypeForFullTypesTest() {
         RowType.Builder builder =
@@ -581,16 +647,6 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
                 writer.addElement(row);
             }
         }
-        try (RecordReader<InternalRow> reader =
-                format.createReaderFactory(fullRowType, rowType, new 
ArrayList<>())
-                        .createReader(
-                                new FormatReaderContext(
-                                        fileIO, testFile, 
fileIO.getFileSize(testFile)))) {
-
-            InternalRowSerializer serializer = new 
InternalRowSerializer(rowType);
-            List<InternalRow> result = new ArrayList<>();
-            reader.forEachRemaining(row -> result.add(serializer.copy(row)));
-            return result;
-        }
+        return read(format, fullRowType, rowType, testFile);
     }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 62d48ac94c..c01898a7c8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -35,6 +35,34 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     sql("USE test_db")
   }
 
+  test("PaimonFormatTableRead table: csv mode") {
+    val tableName = "paimon_format_test_csv_malformed"
+    withTable(tableName) {
+      sql(
+        s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV 
OPTIONS ('" +
+          s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+          s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
+              
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}') 
PARTITIONED BY (`ds` bigint)")
+      val table =
+        paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).asInstanceOf[FormatTable]
+      val partition = 20250920
+      val csvFile =
+        new Path(
+          table.location(),
+          
s"ds=$partition/part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
+      table.fileIO().writeFile(csvFile, 
"1|asfasdfsdf|aaaa|10\n2|asfasdfsdf|11", false)
+      checkAnswer(
+        sql(s"SELECT * FROM $tableName"),
+        Seq(Row(1, "asfasdfsdf", null, partition), Row(2, "asfasdfsdf", 11, 
partition))
+      )
+      sql(s"Alter table $tableName SET TBLPROPERTIES ('mode'='dropmalformed')")
+      checkAnswer(
+        sql(s"SELECT * FROM $tableName"),
+        Seq(Row(2, "asfasdfsdf", 11, partition))
+      )
+    }
+  }
+
   test("PaimonFormatTableRead table: csv with field-delimiter") {
     val tableName = "paimon_format_test_csv_options"
     withTable(tableName) {
@@ -53,7 +81,8 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
       table.fileIO().writeFile(csvFile, "1|asfasdfsdf\n2|asfasdfsdf", false)
       checkAnswer(
         sql(s"SELECT * FROM $tableName"),
-        Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition)))
+        Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition))
+      )
     }
   }
 

Reply via email to