This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fe7660c1e8 [FLINK-39065][Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)] Support additional CsvParser.Feature options for CSV format 
deserialization
2fe7660c1e8 is described below

commit 2fe7660c1e8e18400568af5d2eb0f772b6889106
Author: Myracle <[email protected]>
AuthorDate: Wed Feb 11 11:56:56 2026 +0800

    [FLINK-39065][Formats (JSON, Avro, Parquet, ORC, SequenceFile)] Support 
additional CsvParser.Feature options for CSV format deserialization
---
 .../docs/connectors/table/formats/csv.md           |  59 ++++++++++-
 docs/content/docs/connectors/table/formats/csv.md  |  40 ++++++++
 .../org/apache/flink/formats/csv/CsvCommons.java   |  15 +++
 .../flink/formats/csv/CsvFileFormatFactory.java    |  46 ++++++++-
 .../apache/flink/formats/csv/CsvFormatFactory.java |  23 +++++
 .../apache/flink/formats/csv/CsvFormatOptions.java |  45 +++++++++
 .../csv/CsvRowDataDeserializationSchema.java       |  84 ++++++++++++++--
 .../flink/formats/csv/CsvFormatFactoryTest.java    | 111 ++++++++++++++++++++-
 8 files changed, 409 insertions(+), 14 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/csv.md 
b/docs/content.zh/docs/connectors/table/formats/csv.md
index 0fc45fb686f..348c7bd043f 100644
--- a/docs/content.zh/docs/connectors/table/formats/csv.md
+++ b/docs/content.zh/docs/connectors/table/formats/csv.md
@@ -67,16 +67,18 @@ Format 参数
     <thead>
       <tr>
         <th class="text-left" style="width: 25%">参数</th>
-        <th class="text-center" style="width: 10%">是否必选</th>
-        <th class="text-center" style="width: 10%">默认值</th>
+        <th class="text-center" style="width: 8%">是否必选</th>
+        <th class="text-center" style="width: 8%">是否可转发</th>
+        <th class="text-center" style="width: 7%">默认值</th>
         <th class="text-center" style="width: 10%">类型</th>
-        <th class="text-center" style="width: 45%">描述</th>
+        <th class="text-center" style="width: 42%">描述</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>format</h5></td>
       <td>必选</td>
+      <td>否</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>指定要使用的格式,这里应该是 <code>'csv'</code>。</td>
@@ -84,6 +86,7 @@ Format 参数
     <tr>
       <td><h5>csv.field-delimiter</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;"><code>,</code></td>
       <td>String</td>
       <td>字段分隔符 (默认<code>','</code>),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 
<code>'\t'</code> 代表制表符。
@@ -93,6 +96,7 @@ Format 参数
     <tr>
       <td><h5>csv.disable-quote-character</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>是否禁止对引用的值使用引号 (默认是 false)。 如果禁止,选项 
<code>'csv.quote-character'</code> 不能设置。</td>
@@ -100,6 +104,7 @@ Format 参数
     <tr>
       <td><h5>csv.quote-character</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;"><code>"</code></td>
       <td>String</td>
       <td>用于围住字段值的引号字符 (默认<code>"</code>)。</td>
@@ -107,6 +112,7 @@ Format 参数
     <tr>
       <td><h5>csv.allow-comments</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>是否允许忽略注释行(默认不允许),注释行以 <code>'#'</code> 作为起始字符。
@@ -116,13 +122,15 @@ Format 参数
     <tr>
       <td><h5>csv.ignore-parse-errors</h5></td>
       <td>可选</td>
+      <td>否</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
-    <td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为<code>null</code>。</td>
+      <td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为<code>null</code>。</td>
     </tr>
     <tr>
       <td><h5>csv.array-element-delimiter</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;"><code>;</code></td>
       <td>String</td>
       <td>分隔数组和行元素的字符串(默认<code>';'</code>)。</td>
@@ -130,6 +138,7 @@ Format 参数
     <tr>
       <td><h5>csv.escape-character</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>转义字符(默认关闭)。</td>
@@ -137,6 +146,7 @@ Format 参数
     <tr>
       <td><h5>csv.null-literal</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>指定识别成 null 值的字符串(默认禁用)。在输入端会将该字符串转为 null 值,在输出端会将 null 值转成该字符串。</td>
@@ -144,10 +154,51 @@ Format 参数
     <tr>
       <td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td>
       <td>可选</td>
+      <td>是</td>
       <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>设置将 Bigdecimal 
类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为 
'1E+5';设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法。</td>
     </tr>
+    <tr>
+      <td><h5>csv.trim-spaces</h5></td>
+      <td>可选</td>
+      <td>是</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>可选标志,用于修剪未加引号字段值的前后空格(默认禁用)。仅影响反序列化。</td>
+    </tr>
+    <tr>
+      <td><h5>csv.ignore-trailing-unmappable</h5></td>
+      <td>可选</td>
+      <td>是</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>可选标志,用于忽略末尾无法映射到 schema 的多余字段(默认禁用)。仅影响反序列化。</td>
+    </tr>
+    <tr>
+      <td><h5>csv.allow-trailing-comma</h5></td>
+      <td>可选</td>
+      <td>是</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>可选标志,用于允许最后一个字段值之后的尾部逗号(默认启用)。仅影响反序列化。</td>
+    </tr>
+    <tr>
+      <td><h5>csv.fail-on-missing-columns</h5></td>
+      <td>可选</td>
+      <td>是</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>可选标志,当一行数据的列数少于 schema 期望的列数时将报错失败(默认禁用)。仅影响反序列化。</td>
+    </tr>
+    <tr>
+      <td><h5>csv.empty-string-as-null</h5></td>
+      <td>可选</td>
+      <td>是</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>可选标志,用于将空字符串值视为 null(默认禁用)。仅影响反序列化。</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/formats/csv.md 
b/docs/content/docs/connectors/table/formats/csv.md
index 27389cc528e..7262757459a 100644
--- a/docs/content/docs/connectors/table/formats/csv.md
+++ b/docs/content/docs/connectors/table/formats/csv.md
@@ -160,6 +160,46 @@ Format Options
       <td>Boolean</td>
       <td>Enables representation of BigDecimal data type in scientific 
notation (default is true). For example, 100000 is encoded as 1E+5 by default, 
and will be written as 100000 if set this option to false. Note: Only when the 
value is not 0 and a multiple of 10 is converted to scientific notation.</td>
     </tr>
+    <tr>
+      <td><h5>csv.trim-spaces</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Optional flag to trim leading/trailing spaces from unquoted field 
values (disabled by default). Only affects deserialization.</td>
+    </tr>
+    <tr>
+      <td><h5>csv.ignore-trailing-unmappable</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Optional flag to ignore extra trailing fields that cannot be mapped 
to the schema (disabled by default). Only affects deserialization.</td>
+    </tr>
+    <tr>
+      <td><h5>csv.allow-trailing-comma</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Optional flag to allow a trailing comma after the last field value 
(enabled by default). Only affects deserialization.</td>
+    </tr>
+    <tr>
+      <td><h5>csv.fail-on-missing-columns</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Optional flag to fail when a row has fewer columns than the schema 
expects (disabled by default). Only affects deserialization.</td>
+    </tr>
+    <tr>
+      <td><h5>csv.empty-string-as-null</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Optional flag to treat empty string values as null (disabled by 
default). Only affects deserialization.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
index 9bf5520574d..64859b5524b 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
@@ -28,13 +28,18 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
 import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
 import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
 
 /** A class with common CSV format constants and utility methods. */
@@ -102,6 +107,11 @@ class CsvCommons {
         options.add(ESCAPE_CHARACTER);
         options.add(NULL_LITERAL);
         options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
+        options.add(TRIM_SPACES);
+        options.add(IGNORE_TRAILING_UNMAPPABLE);
+        options.add(ALLOW_TRAILING_COMMA);
+        options.add(FAIL_ON_MISSING_COLUMNS);
+        options.add(EMPTY_STRING_AS_NULL);
         return options;
     }
 
@@ -115,6 +125,11 @@ class CsvCommons {
         options.add(ESCAPE_CHARACTER);
         options.add(NULL_LITERAL);
         options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
+        options.add(TRIM_SPACES);
+        options.add(IGNORE_TRAILING_UNMAPPABLE);
+        options.add(ALLOW_TRAILING_COMMA);
+        options.add(FAIL_ON_MISSING_COLUMNS);
+        options.add(EMPTY_STRING_AS_NULL);
         return options;
     }
 }
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index d3a0d45b358..e58796bdb9c 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -46,11 +46,13 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.function.SerializableSupplier;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import org.apache.commons.text.StringEscapeUtils;
@@ -60,13 +62,18 @@ import java.util.List;
 import java.util.Set;
 
 import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
 import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
 import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** CSV format factory for file system. */
@@ -125,15 +132,14 @@ public class CsvFileFormatFactory implements 
BulkReaderFormatFactory, BulkWriter
             final RowType physicalRowType = (RowType) 
physicalDataType.getLogicalType();
             final CsvSchema schema = buildCsvSchema(physicalRowType, 
formatOptions);
 
-            final boolean ignoreParseErrors =
-                    formatOptions.getOptional(IGNORE_PARSE_ERRORS).isPresent();
+            final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
             final Converter<JsonNode, RowData, Void> converter =
                     (Converter)
                             new CsvToRowDataConverters(ignoreParseErrors)
                                     .createRowConverter(projectedRowType, 
true);
             CsvReaderFormat<RowData> csvReaderFormat =
                     new CsvReaderFormat<>(
-                            JacksonMapperFactory::createCsvMapper,
+                            createCsvMapperFactory(formatOptions),
                             ignored -> schema,
                             JsonNode.class,
                             converter,
@@ -217,4 +223,38 @@ public class CsvFileFormatFactory implements 
BulkReaderFormatFactory, BulkWriter
 
         return csvBuilder.build();
     }
+
+    /**
+     * Creates a {@link SerializableSupplier} for {@link CsvMapper} that 
applies the configured
+     * {@link CsvParser.Feature} options.
+     */
+    private static SerializableSupplier<CsvMapper> createCsvMapperFactory(
+            ReadableConfig formatOptions) {
+        final boolean trimSpaces = formatOptions.get(TRIM_SPACES);
+        final boolean ignoreTrailingUnmappable = 
formatOptions.get(IGNORE_TRAILING_UNMAPPABLE);
+        final boolean allowTrailingComma = 
formatOptions.get(ALLOW_TRAILING_COMMA);
+        final boolean failOnMissingColumns = 
formatOptions.get(FAIL_ON_MISSING_COLUMNS);
+        final boolean emptyStringAsNull = 
formatOptions.get(EMPTY_STRING_AS_NULL);
+
+        return () -> {
+            CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
+            configureFeature(mapper, CsvParser.Feature.TRIM_SPACES, 
trimSpaces);
+            configureFeature(
+                    mapper, CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE, 
ignoreTrailingUnmappable);
+            configureFeature(mapper, CsvParser.Feature.ALLOW_TRAILING_COMMA, 
allowTrailingComma);
+            configureFeature(
+                    mapper, CsvParser.Feature.FAIL_ON_MISSING_COLUMNS, 
failOnMissingColumns);
+            configureFeature(mapper, CsvParser.Feature.EMPTY_STRING_AS_NULL, 
emptyStringAsNull);
+            return mapper;
+        };
+    }
+
+    private static void configureFeature(
+            CsvMapper mapper, CsvParser.Feature feature, boolean enabled) {
+        if (enabled) {
+            mapper.enable(feature);
+        } else {
+            mapper.disable(feature);
+        }
+    }
 }
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index a54bd618df2..c3fef6f4036 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -45,13 +45,18 @@ import java.util.Collections;
 import java.util.Set;
 
 import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
 import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
 import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
 import static 
org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
 
 /**
@@ -179,6 +184,24 @@ public final class CsvFormatFactory
                 .ifPresent(schemaBuilder::setEscapeCharacter);
 
         
formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral);
+
+        
formatOptions.getOptional(TRIM_SPACES).ifPresent(schemaBuilder::setTrimSpaces);
+
+        formatOptions
+                .getOptional(IGNORE_TRAILING_UNMAPPABLE)
+                .ifPresent(schemaBuilder::setIgnoreTrailingUnmappable);
+
+        formatOptions
+                .getOptional(ALLOW_TRAILING_COMMA)
+                .ifPresent(schemaBuilder::setAllowTrailingComma);
+
+        formatOptions
+                .getOptional(FAIL_ON_MISSING_COLUMNS)
+                .ifPresent(schemaBuilder::setFailOnMissingColumns);
+
+        formatOptions
+                .getOptional(EMPTY_STRING_AS_NULL)
+                .ifPresent(schemaBuilder::setEmptyStringAsNull);
     }
 
     private static void configureSerializationSchema(
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
index c9beae1ca7e..ee624a48e0b 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
@@ -94,5 +94,50 @@ public class CsvFormatOptions {
                     .withDescription(
                             "Enables representation of BigDecimal data type in 
scientific notation (default is true). For example, 100000 is encoded as 1E+5 
by default, and will be written as 100000 if set this option to false. Note: 
Only when the value is not 0 and a multiple of 10 is converted to scientific 
notation.");
 
+    public static final ConfigOption<Boolean> TRIM_SPACES =
+            ConfigOptions.key("trim-spaces")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to trim leading/trailing spaces 
from "
+                                    + "unquoted field values (disabled by 
default). "
+                                    + "Only affects deserialization.");
+
+    public static final ConfigOption<Boolean> IGNORE_TRAILING_UNMAPPABLE =
+            ConfigOptions.key("ignore-trailing-unmappable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to ignore extra trailing fields 
that "
+                                    + "cannot be mapped to the schema 
(disabled by default). "
+                                    + "Only affects deserialization.");
+
+    public static final ConfigOption<Boolean> ALLOW_TRAILING_COMMA =
+            ConfigOptions.key("allow-trailing-comma")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Optional flag to allow a trailing comma after the 
"
+                                    + "last field value (enabled by default). "
+                                    + "Only affects deserialization.");
+
+    public static final ConfigOption<Boolean> FAIL_ON_MISSING_COLUMNS =
+            ConfigOptions.key("fail-on-missing-columns")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to fail when a row has fewer 
columns "
+                                    + "than the schema expects (disabled by 
default). "
+                                    + "Only affects deserialization.");
+
+    public static final ConfigOption<Boolean> EMPTY_STRING_AS_NULL =
+            ConfigOptions.key("empty-string-as-null")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to treat empty string values as 
null "
+                                    + "(disabled by default). "
+                                    + "Only affects deserialization.");
+
     private CsvFormatOptions() {}
 }
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index b77312d41b8..97829689642 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -28,13 +28,17 @@ import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * Deserialization schema from CSV to Flink Table & SQL internal data 
structures.
@@ -64,21 +68,37 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
     /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
     private final boolean ignoreParseErrors;
 
+    /** Set of CsvParser.Feature to explicitly enable. */
+    private final Set<CsvParser.Feature> enabledFeatures;
+
+    /** Set of CsvParser.Feature to explicitly disable. */
+    private final Set<CsvParser.Feature> disabledFeatures;
+
     private CsvRowDataDeserializationSchema(
             TypeInformation<RowData> resultTypeInfo,
             CsvSchema csvSchema,
             CsvToRowDataConverters.CsvToRowDataConverter runtimeConverter,
-            boolean ignoreParseErrors) {
+            boolean ignoreParseErrors,
+            Set<CsvParser.Feature> enabledFeatures,
+            Set<CsvParser.Feature> disabledFeatures) {
         this.resultTypeInfo = resultTypeInfo;
         this.runtimeConverter = runtimeConverter;
         this.csvSchema = csvSchema;
         this.ignoreParseErrors = ignoreParseErrors;
+        this.enabledFeatures = enabledFeatures;
+        this.disabledFeatures = disabledFeatures;
     }
 
     @Override
     public void open(InitializationContext context) {
-        this.objectReader =
-                
JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
+        CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
+        for (CsvParser.Feature feature : enabledFeatures) {
+            csvMapper.enable(feature);
+        }
+        for (CsvParser.Feature feature : disabledFeatures) {
+            csvMapper.disable(feature);
+        }
+        this.objectReader = 
csvMapper.readerFor(JsonNode.class).with(csvSchema);
     }
 
     /** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
@@ -89,6 +109,10 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
         private final TypeInformation<RowData> resultTypeInfo;
         private CsvSchema csvSchema;
         private boolean ignoreParseErrors;
+        private final Set<CsvParser.Feature> enabledFeatures =
+                EnumSet.noneOf(CsvParser.Feature.class);
+        private final Set<CsvParser.Feature> disabledFeatures =
+                EnumSet.noneOf(CsvParser.Feature.class);
 
         /**
          * Creates a CSV deserialization schema for the given {@link 
TypeInformation} with optional
@@ -165,12 +189,56 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
             return this;
         }
 
+        public Builder setTrimSpaces(boolean trimSpaces) {
+            setFeature(CsvParser.Feature.TRIM_SPACES, trimSpaces);
+            return this;
+        }
+
+        public Builder setIgnoreTrailingUnmappable(boolean ignore) {
+            setFeature(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE, ignore);
+            return this;
+        }
+
+        public Builder setAllowTrailingComma(boolean allow) {
+            setFeature(CsvParser.Feature.ALLOW_TRAILING_COMMA, allow);
+            return this;
+        }
+
+        public Builder setFailOnMissingColumns(boolean fail) {
+            setFeature(CsvParser.Feature.FAIL_ON_MISSING_COLUMNS, fail);
+            return this;
+        }
+
+        public Builder setEmptyStringAsNull(boolean emptyAsNull) {
+            setFeature(CsvParser.Feature.EMPTY_STRING_AS_NULL, emptyAsNull);
+            return this;
+        }
+
+        private void setFeature(CsvParser.Feature feature, boolean enabled) {
+            if (enabled) {
+                enabledFeatures.add(feature);
+                disabledFeatures.remove(feature);
+            } else {
+                disabledFeatures.add(feature);
+                enabledFeatures.remove(feature);
+            }
+        }
+
         public CsvRowDataDeserializationSchema build() {
             CsvToRowDataConverters.CsvToRowDataConverter runtimeConverter =
                     new CsvToRowDataConverters(ignoreParseErrors)
                             .createRowConverter(rowResultType, true);
             return new CsvRowDataDeserializationSchema(
-                    resultTypeInfo, csvSchema, runtimeConverter, 
ignoreParseErrors);
+                    resultTypeInfo,
+                    csvSchema,
+                    runtimeConverter,
+                    ignoreParseErrors,
+                    enabledFeatures.isEmpty()
+                            ? EnumSet.noneOf(CsvParser.Feature.class)
+                            : EnumSet.copyOf(enabledFeatures),
+                    disabledFeatures.isEmpty()
+                            ? EnumSet.noneOf(CsvParser.Feature.class)
+                            : EnumSet.copyOf(disabledFeatures));
         }
     }
 
@@ -221,7 +289,9 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
                         .equals(otherSchema.getArrayElementSeparator())
                 && csvSchema.getQuoteChar() == otherSchema.getQuoteChar()
                 && csvSchema.getEscapeChar() == otherSchema.getEscapeChar()
-                && Arrays.equals(csvSchema.getNullValue(), 
otherSchema.getNullValue());
+                && Arrays.equals(csvSchema.getNullValue(), 
otherSchema.getNullValue())
+                && enabledFeatures.equals(that.enabledFeatures)
+                && disabledFeatures.equals(that.disabledFeatures);
     }
 
     @Override
@@ -234,6 +304,8 @@ public final class CsvRowDataDeserializationSchema 
implements DeserializationSch
                 csvSchema.getArrayElementSeparator(),
                 csvSchema.getQuoteChar(),
                 csvSchema.getEscapeChar(),
-                Arrays.hashCode(csvSchema.getNullValue()));
+                Arrays.hashCode(csvSchema.getNullValue()),
+                enabledFeatures,
+                disabledFeatures);
     }
 }
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index d4db9a4d3de..fbe1c993c74 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.formats.csv;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
@@ -71,9 +74,16 @@ class CsvFormatFactoryTest {
                         .setArrayElementDelimiter("|")
                         .setEscapeCharacter('\\')
                         .setNullLiteral("n/a")
+                        .setTrimSpaces(true)
+                        .setEmptyStringAsNull(true)
                         .build();
         open(expectedDeser);
-        final Map<String, String> options = getAllOptions();
+        final Map<String, String> options =
+                getModifiedOptions(
+                        opts -> {
+                            opts.put("csv.trim-spaces", "true");
+                            opts.put("csv.empty-string-as-null", "true");
+                        });
         DeserializationSchema<RowData> actualDeser = 
createDeserializationSchema(options);
         assertThat(actualDeser).isEqualTo(expectedDeser);
 
@@ -347,6 +357,105 @@ class CsvFormatFactoryTest {
         assertThat(deserialized).isEqualTo(expected);
     }
 
+    @Test
+    void testTrimSpaces() throws IOException {
+        final Map<String, String> options =
+                getModifiedOptions(opts -> opts.put("csv.trim-spaces", 
"true"));
+        DeserializationSchema<RowData> deserializationSchema = 
createDeserializationSchema(options);
+        RowData actual = deserializationSchema.deserialize(" abc 
;123;false".getBytes());
+        GenericRowData expected = GenericRowData.of(fromString("abc"), 123, 
false);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    void testEmptyStringAsNull() throws IOException {
+        final Map<String, String> options =
+                getModifiedOptions(opts -> 
opts.put("csv.empty-string-as-null", "true"));
+        DeserializationSchema<RowData> deserializationSchema = 
createDeserializationSchema(options);
+        RowData actual = 
deserializationSchema.deserialize(";123;false".getBytes());
+        assertThat(actual.isNullAt(0)).isTrue();
+        assertThat(actual.getInt(1)).isEqualTo(123);
+        assertThat(actual.getBoolean(2)).isFalse();
+    }
+
+    @Test
+    void testAllowTrailingComma() throws IOException {
+        final Map<String, String> options =
+                getModifiedOptions(opts -> 
opts.put("csv.allow-trailing-comma", "true"));
+        DeserializationSchema<RowData> deserializationSchema = 
createDeserializationSchema(options);
+        RowData actual = 
deserializationSchema.deserialize("abc;123;false;".getBytes());
+        GenericRowData expected = GenericRowData.of(fromString("abc"), 123, 
false);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    void testIgnoreTrailingUnmappable() throws IOException {
+        final Map<String, String> options =
+                getModifiedOptions(opts -> 
opts.put("csv.ignore-trailing-unmappable", "true"));
+        DeserializationSchema<RowData> deserializationSchema = 
createDeserializationSchema(options);
+        RowData actual = 
deserializationSchema.deserialize("abc;123;false;extra".getBytes());
+        GenericRowData expected = GenericRowData.of(fromString("abc"), 123, 
false);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    void testFailOnMissingColumns() {
+        final Map<String, String> options =
+                getModifiedOptions(
+                        opts -> {
+                            opts.put("csv.fail-on-missing-columns", "true");
+                            opts.remove("csv.ignore-parse-errors");
+                        });
+        DeserializationSchema<RowData> deserializationSchema = 
createDeserializationSchema(options);
+        assertThatThrownBy(() -> 
deserializationSchema.deserialize("abc;123".getBytes()))
+                .isInstanceOf(IOException.class);
+    }
+
+    @Test
+    void testDeserializationSchemaEqualityWithFeatures() {
+        final CsvRowDataDeserializationSchema schema1 =
+                new CsvRowDataDeserializationSchema.Builder(
+                                PHYSICAL_TYPE, 
InternalTypeInfo.of(PHYSICAL_TYPE))
+                        .setTrimSpaces(true)
+                        .setEmptyStringAsNull(true)
+                        .build();
+        final CsvRowDataDeserializationSchema schema2 =
+                new CsvRowDataDeserializationSchema.Builder(
+                                PHYSICAL_TYPE, 
InternalTypeInfo.of(PHYSICAL_TYPE))
+                        .setTrimSpaces(true)
+                        .setEmptyStringAsNull(true)
+                        .build();
+        final CsvRowDataDeserializationSchema schema3 =
+                new CsvRowDataDeserializationSchema.Builder(
+                                PHYSICAL_TYPE, 
InternalTypeInfo.of(PHYSICAL_TYPE))
+                        .setTrimSpaces(false)
+                        .build();
+
+        assertThat(schema1).isEqualTo(schema2);
+        assertThat(schema1).isNotEqualTo(schema3);
+    }
+
+    @Test
+    void testBulkFormatWithParserFeatures() {
+        final Configuration formatOptions = new Configuration();
+        formatOptions.set(CsvFormatOptions.FIELD_DELIMITER, ";");
+        formatOptions.set(CsvFormatOptions.TRIM_SPACES, true);
+        formatOptions.set(CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE, true);
+        formatOptions.set(CsvFormatOptions.ALLOW_TRAILING_COMMA, true);
+        formatOptions.set(CsvFormatOptions.EMPTY_STRING_AS_NULL, true);
+
+        CsvFileFormatFactory.CsvBulkDecodingFormat bulkDecodingFormat =
+                new CsvFileFormatFactory.CsvBulkDecodingFormat(formatOptions);
+
+        // Verify the bulk format can be created without errors
+        BulkFormat<RowData, FileSourceSplit> bulkFormat =
+                bulkDecodingFormat.createRuntimeDecoder(
+                        ScanRuntimeProviderContext.INSTANCE,
+                        PHYSICAL_DATA_TYPE,
+                        Projection.all(PHYSICAL_DATA_TYPE).toNestedIndexes());
+        assertThat(bulkFormat).isNotNull();
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------


Reply via email to