This is an automated email from the ASF dual-hosted git repository.
junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2fe7660c1e8 [FLINK-39065][Formats (JSON, Avro, Parquet, ORC,
SequenceFile)] Support additional CsvParser.Feature options for CSV format
deserialization
2fe7660c1e8 is described below
commit 2fe7660c1e8e18400568af5d2eb0f772b6889106
Author: Myracle <[email protected]>
AuthorDate: Wed Feb 11 11:56:56 2026 +0800
[FLINK-39065][Formats (JSON, Avro, Parquet, ORC, SequenceFile)] Support
additional CsvParser.Feature options for CSV format deserialization
---
.../docs/connectors/table/formats/csv.md | 59 ++++++++++-
docs/content/docs/connectors/table/formats/csv.md | 40 ++++++++
.../org/apache/flink/formats/csv/CsvCommons.java | 15 +++
.../flink/formats/csv/CsvFileFormatFactory.java | 46 ++++++++-
.../apache/flink/formats/csv/CsvFormatFactory.java | 23 +++++
.../apache/flink/formats/csv/CsvFormatOptions.java | 45 +++++++++
.../csv/CsvRowDataDeserializationSchema.java | 84 ++++++++++++++--
.../flink/formats/csv/CsvFormatFactoryTest.java | 111 ++++++++++++++++++++-
8 files changed, 409 insertions(+), 14 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/formats/csv.md
b/docs/content.zh/docs/connectors/table/formats/csv.md
index 0fc45fb686f..348c7bd043f 100644
--- a/docs/content.zh/docs/connectors/table/formats/csv.md
+++ b/docs/content.zh/docs/connectors/table/formats/csv.md
@@ -67,16 +67,18 @@ Format 参数
<thead>
<tr>
<th class="text-left" style="width: 25%">参数</th>
- <th class="text-center" style="width: 10%">是否必选</th>
- <th class="text-center" style="width: 10%">默认值</th>
+ <th class="text-center" style="width: 8%">是否必选</th>
+ <th class="text-center" style="width: 8%">是否可转发</th>
+ <th class="text-center" style="width: 7%">默认值</th>
<th class="text-center" style="width: 10%">类型</th>
- <th class="text-center" style="width: 45%">描述</th>
+ <th class="text-center" style="width: 42%">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>必选</td>
+ <td>否</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>指定要使用的格式,这里应该是 <code>'csv'</code>。</td>
@@ -84,6 +86,7 @@ Format 参数
<tr>
<td><h5>csv.field-delimiter</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;"><code>,</code></td>
<td>String</td>
<td>字段分隔符 (默认<code>','</code>),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如
<code>'\t'</code> 代表制表符。
@@ -93,6 +96,7 @@ Format 参数
<tr>
<td><h5>csv.disable-quote-character</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否禁止对引用的值使用引号 (默认是 false)。 如果禁止,选项
<code>'csv.quote-character'</code> 不能设置。</td>
@@ -100,6 +104,7 @@ Format 参数
<tr>
<td><h5>csv.quote-character</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;"><code>"</code></td>
<td>String</td>
<td>用于围住字段值的引号字符 (默认<code>"</code>)。</td>
@@ -107,6 +112,7 @@ Format 参数
<tr>
<td><h5>csv.allow-comments</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否允许忽略注释行(默认不允许),注释行以 <code>'#'</code> 作为起始字符。
@@ -116,13 +122,15 @@ Format 参数
<tr>
<td><h5>csv.ignore-parse-errors</h5></td>
<td>可选</td>
+ <td>否</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为<code>null</code>。</td>
+ <td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为<code>null</code>。</td>
</tr>
<tr>
<td><h5>csv.array-element-delimiter</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;"><code>;</code></td>
<td>String</td>
<td>分隔数组和行元素的字符串(默认<code>';'</code>)。</td>
@@ -130,6 +138,7 @@ Format 参数
<tr>
<td><h5>csv.escape-character</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>转义字符(默认关闭)。</td>
@@ -137,6 +146,7 @@ Format 参数
<tr>
<td><h5>csv.null-literal</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>指定识别成 null 值的字符串(默认禁用)。在输入端会将该字符串转为 null 值,在输出端会将 null 值转成该字符串。</td>
@@ -144,10 +154,51 @@ Format 参数
<tr>
<td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td>
<td>可选</td>
+ <td>是</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>设置将 Bigdecimal
类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为
'1E+5';设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法。</td>
</tr>
+ <tr>
+ <td><h5>csv.trim-spaces</h5></td>
+ <td>可选</td>
+ <td>是</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>可选标志,用于修剪未加引号字段值的前后空格(默认禁用)。仅影响反序列化。</td>
+ </tr>
+ <tr>
+ <td><h5>csv.ignore-trailing-unmappable</h5></td>
+ <td>可选</td>
+ <td>是</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>可选标志,用于忽略末尾无法映射到 schema 的多余字段(默认禁用)。仅影响反序列化。</td>
+ </tr>
+ <tr>
+ <td><h5>csv.allow-trailing-comma</h5></td>
+ <td>可选</td>
+ <td>是</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>可选标志,用于允许最后一个字段值之后的尾部逗号(默认启用)。仅影响反序列化。</td>
+ </tr>
+ <tr>
+ <td><h5>csv.fail-on-missing-columns</h5></td>
+ <td>可选</td>
+ <td>是</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>可选标志,当一行数据的列数少于 schema 期望的列数时将报错失败(默认禁用)。仅影响反序列化。</td>
+ </tr>
+ <tr>
+ <td><h5>csv.empty-string-as-null</h5></td>
+ <td>可选</td>
+ <td>是</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>可选标志,用于将空字符串值视为 null(默认禁用)。仅影响反序列化。</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/table/formats/csv.md
b/docs/content/docs/connectors/table/formats/csv.md
index 27389cc528e..7262757459a 100644
--- a/docs/content/docs/connectors/table/formats/csv.md
+++ b/docs/content/docs/connectors/table/formats/csv.md
@@ -160,6 +160,46 @@ Format Options
<td>Boolean</td>
<td>Enables representation of BigDecimal data type in scientific
notation (default is true). For example, 100000 is encoded as 1E+5 by default,
and will be written as 100000 if set this option to false. Note: Only when the
value is not 0 and a multiple of 10 is converted to scientific notation.</td>
</tr>
+ <tr>
+ <td><h5>csv.trim-spaces</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Optional flag to trim leading/trailing spaces from unquoted field
values (disabled by default). Only affects deserialization.</td>
+ </tr>
+ <tr>
+ <td><h5>csv.ignore-trailing-unmappable</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Optional flag to ignore extra trailing fields that cannot be mapped
to the schema (disabled by default). Only affects deserialization.</td>
+ </tr>
+ <tr>
+ <td><h5>csv.allow-trailing-comma</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Optional flag to allow a trailing comma after the last field value
(enabled by default). Only affects deserialization.</td>
+ </tr>
+ <tr>
+ <td><h5>csv.fail-on-missing-columns</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Optional flag to fail when a row has fewer columns than the schema
expects (disabled by default). Only affects deserialization.</td>
+ </tr>
+ <tr>
+ <td><h5>csv.empty-string-as-null</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Optional flag to treat empty string values as null (disabled by
default). Only affects deserialization.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
index 9bf5520574d..64859b5524b 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
@@ -28,13 +28,18 @@ import java.util.HashSet;
import java.util.Set;
import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
import static
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
import static
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
import static
org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
/** A class with common CSV format constants and utility methods. */
@@ -102,6 +107,11 @@ class CsvCommons {
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
+ options.add(TRIM_SPACES);
+ options.add(IGNORE_TRAILING_UNMAPPABLE);
+ options.add(ALLOW_TRAILING_COMMA);
+ options.add(FAIL_ON_MISSING_COLUMNS);
+ options.add(EMPTY_STRING_AS_NULL);
return options;
}
@@ -115,6 +125,11 @@ class CsvCommons {
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
+ options.add(TRIM_SPACES);
+ options.add(IGNORE_TRAILING_UNMAPPABLE);
+ options.add(ALLOW_TRAILING_COMMA);
+ options.add(FAIL_ON_MISSING_COLUMNS);
+ options.add(EMPTY_STRING_AS_NULL);
return options;
}
}
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index d3a0d45b358..e58796bdb9c 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -46,11 +46,13 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.function.SerializableSupplier;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.commons.text.StringEscapeUtils;
@@ -60,13 +62,18 @@ import java.util.List;
import java.util.Set;
import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
import static
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
import static
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** CSV format factory for file system. */
@@ -125,15 +132,14 @@ public class CsvFileFormatFactory implements
BulkReaderFormatFactory, BulkWriter
final RowType physicalRowType = (RowType)
physicalDataType.getLogicalType();
final CsvSchema schema = buildCsvSchema(physicalRowType,
formatOptions);
- final boolean ignoreParseErrors =
- formatOptions.getOptional(IGNORE_PARSE_ERRORS).isPresent();
+ final boolean ignoreParseErrors =
formatOptions.get(IGNORE_PARSE_ERRORS);
final Converter<JsonNode, RowData, Void> converter =
(Converter)
new CsvToRowDataConverters(ignoreParseErrors)
.createRowConverter(projectedRowType,
true);
CsvReaderFormat<RowData> csvReaderFormat =
new CsvReaderFormat<>(
- JacksonMapperFactory::createCsvMapper,
+ createCsvMapperFactory(formatOptions),
ignored -> schema,
JsonNode.class,
converter,
@@ -217,4 +223,38 @@ public class CsvFileFormatFactory implements
BulkReaderFormatFactory, BulkWriter
return csvBuilder.build();
}
+
+ /**
+ * Creates a {@link SerializableSupplier} for {@link CsvMapper} that
applies the configured
+ * {@link CsvParser.Feature} options.
+ */
+ private static SerializableSupplier<CsvMapper> createCsvMapperFactory(
+ ReadableConfig formatOptions) {
+ final boolean trimSpaces = formatOptions.get(TRIM_SPACES);
+ final boolean ignoreTrailingUnmappable =
formatOptions.get(IGNORE_TRAILING_UNMAPPABLE);
+ final boolean allowTrailingComma =
formatOptions.get(ALLOW_TRAILING_COMMA);
+ final boolean failOnMissingColumns =
formatOptions.get(FAIL_ON_MISSING_COLUMNS);
+ final boolean emptyStringAsNull =
formatOptions.get(EMPTY_STRING_AS_NULL);
+
+ return () -> {
+ CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
+ configureFeature(mapper, CsvParser.Feature.TRIM_SPACES,
trimSpaces);
+ configureFeature(
+ mapper, CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE,
ignoreTrailingUnmappable);
+ configureFeature(mapper, CsvParser.Feature.ALLOW_TRAILING_COMMA,
allowTrailingComma);
+ configureFeature(
+ mapper, CsvParser.Feature.FAIL_ON_MISSING_COLUMNS,
failOnMissingColumns);
+ configureFeature(mapper, CsvParser.Feature.EMPTY_STRING_AS_NULL,
emptyStringAsNull);
+ return mapper;
+ };
+ }
+
+ private static void configureFeature(
+ CsvMapper mapper, CsvParser.Feature feature, boolean enabled) {
+ if (enabled) {
+ mapper.enable(feature);
+ } else {
+ mapper.disable(feature);
+ }
+ }
}
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index a54bd618df2..c3fef6f4036 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -45,13 +45,18 @@ import java.util.Collections;
import java.util.Set;
import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_TRAILING_COMMA;
import static
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
import static
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.EMPTY_STRING_AS_NULL;
import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.FAIL_ON_MISSING_COLUMNS;
import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
+import static
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.TRIM_SPACES;
import static
org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
/**
@@ -179,6 +184,24 @@ public final class CsvFormatFactory
.ifPresent(schemaBuilder::setEscapeCharacter);
formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral);
+
+
formatOptions.getOptional(TRIM_SPACES).ifPresent(schemaBuilder::setTrimSpaces);
+
+ formatOptions
+ .getOptional(IGNORE_TRAILING_UNMAPPABLE)
+ .ifPresent(schemaBuilder::setIgnoreTrailingUnmappable);
+
+ formatOptions
+ .getOptional(ALLOW_TRAILING_COMMA)
+ .ifPresent(schemaBuilder::setAllowTrailingComma);
+
+ formatOptions
+ .getOptional(FAIL_ON_MISSING_COLUMNS)
+ .ifPresent(schemaBuilder::setFailOnMissingColumns);
+
+ formatOptions
+ .getOptional(EMPTY_STRING_AS_NULL)
+ .ifPresent(schemaBuilder::setEmptyStringAsNull);
}
private static void configureSerializationSchema(
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
index c9beae1ca7e..ee624a48e0b 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
@@ -94,5 +94,50 @@ public class CsvFormatOptions {
.withDescription(
"Enables representation of BigDecimal data type in
scientific notation (default is true). For example, 100000 is encoded as 1E+5
by default, and will be written as 100000 if set this option to false. Note:
Only when the value is not 0 and a multiple of 10 is converted to scientific
notation.");
+ public static final ConfigOption<Boolean> TRIM_SPACES =
+ ConfigOptions.key("trim-spaces")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to trim leading/trailing spaces
from "
+ + "unquoted field values (disabled by
default). "
+ + "Only affects deserialization.");
+
+ public static final ConfigOption<Boolean> IGNORE_TRAILING_UNMAPPABLE =
+ ConfigOptions.key("ignore-trailing-unmappable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to ignore extra trailing fields
that "
+ + "cannot be mapped to the schema
(disabled by default). "
+ + "Only affects deserialization.");
+
+ public static final ConfigOption<Boolean> ALLOW_TRAILING_COMMA =
+ ConfigOptions.key("allow-trailing-comma")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Optional flag to allow a trailing comma after the
"
+ + "last field value (enabled by default). "
+ + "Only affects deserialization.");
+
+ public static final ConfigOption<Boolean> FAIL_ON_MISSING_COLUMNS =
+ ConfigOptions.key("fail-on-missing-columns")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to fail when a row has fewer
columns "
+ + "than the schema expects (disabled by
default). "
+ + "Only affects deserialization.");
+
+ public static final ConfigOption<Boolean> EMPTY_STRING_AS_NULL =
+ ConfigOptions.key("empty-string-as-null")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Optional flag to treat empty string values as
null "
+ + "(disabled by default). "
+ + "Only affects deserialization.");
+
private CsvFormatOptions() {}
}
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index b77312d41b8..97829689642 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -28,13 +28,17 @@ import org.apache.flink.util.jackson.JacksonMapperFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Objects;
+import java.util.Set;
/**
* Deserialization schema from CSV to Flink Table & SQL internal data
structures.
@@ -64,21 +68,37 @@ public final class CsvRowDataDeserializationSchema
implements DeserializationSch
/** Flag indicating whether to ignore invalid fields/rows (default: throw
an exception). */
private final boolean ignoreParseErrors;
+ /** Set of CsvParser.Feature to explicitly enable. */
+ private final Set<CsvParser.Feature> enabledFeatures;
+
+ /** Set of CsvParser.Feature to explicitly disable. */
+ private final Set<CsvParser.Feature> disabledFeatures;
+
private CsvRowDataDeserializationSchema(
TypeInformation<RowData> resultTypeInfo,
CsvSchema csvSchema,
CsvToRowDataConverters.CsvToRowDataConverter runtimeConverter,
- boolean ignoreParseErrors) {
+ boolean ignoreParseErrors,
+ Set<CsvParser.Feature> enabledFeatures,
+ Set<CsvParser.Feature> disabledFeatures) {
this.resultTypeInfo = resultTypeInfo;
this.runtimeConverter = runtimeConverter;
this.csvSchema = csvSchema;
this.ignoreParseErrors = ignoreParseErrors;
+ this.enabledFeatures = enabledFeatures;
+ this.disabledFeatures = disabledFeatures;
}
@Override
public void open(InitializationContext context) {
- this.objectReader =
-
JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
+ for (CsvParser.Feature feature : enabledFeatures) {
+ csvMapper.enable(feature);
+ }
+ for (CsvParser.Feature feature : disabledFeatures) {
+ csvMapper.disable(feature);
+ }
+ this.objectReader =
csvMapper.readerFor(JsonNode.class).with(csvSchema);
}
/** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
@@ -89,6 +109,10 @@ public final class CsvRowDataDeserializationSchema
implements DeserializationSch
private final TypeInformation<RowData> resultTypeInfo;
private CsvSchema csvSchema;
private boolean ignoreParseErrors;
+ private final Set<CsvParser.Feature> enabledFeatures =
+ EnumSet.noneOf(CsvParser.Feature.class);
+ private final Set<CsvParser.Feature> disabledFeatures =
+ EnumSet.noneOf(CsvParser.Feature.class);
/**
* Creates a CSV deserialization schema for the given {@link
TypeInformation} with optional
@@ -165,12 +189,56 @@ public final class CsvRowDataDeserializationSchema
implements DeserializationSch
return this;
}
+ public Builder setTrimSpaces(boolean trimSpaces) {
+ setFeature(CsvParser.Feature.TRIM_SPACES, trimSpaces);
+ return this;
+ }
+
+ public Builder setIgnoreTrailingUnmappable(boolean ignore) {
+ setFeature(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE, ignore);
+ return this;
+ }
+
+ public Builder setAllowTrailingComma(boolean allow) {
+ setFeature(CsvParser.Feature.ALLOW_TRAILING_COMMA, allow);
+ return this;
+ }
+
+ public Builder setFailOnMissingColumns(boolean fail) {
+ setFeature(CsvParser.Feature.FAIL_ON_MISSING_COLUMNS, fail);
+ return this;
+ }
+
+ public Builder setEmptyStringAsNull(boolean emptyAsNull) {
+ setFeature(CsvParser.Feature.EMPTY_STRING_AS_NULL, emptyAsNull);
+ return this;
+ }
+
+ private void setFeature(CsvParser.Feature feature, boolean enabled) {
+ if (enabled) {
+ enabledFeatures.add(feature);
+ disabledFeatures.remove(feature);
+ } else {
+ disabledFeatures.add(feature);
+ enabledFeatures.remove(feature);
+ }
+ }
+
public CsvRowDataDeserializationSchema build() {
CsvToRowDataConverters.CsvToRowDataConverter runtimeConverter =
new CsvToRowDataConverters(ignoreParseErrors)
.createRowConverter(rowResultType, true);
return new CsvRowDataDeserializationSchema(
- resultTypeInfo, csvSchema, runtimeConverter,
ignoreParseErrors);
+ resultTypeInfo,
+ csvSchema,
+ runtimeConverter,
+ ignoreParseErrors,
+ enabledFeatures.isEmpty()
+ ? EnumSet.noneOf(CsvParser.Feature.class)
+ : EnumSet.copyOf(enabledFeatures),
+ disabledFeatures.isEmpty()
+ ? EnumSet.noneOf(CsvParser.Feature.class)
+ : EnumSet.copyOf(disabledFeatures));
}
}
@@ -221,7 +289,9 @@ public final class CsvRowDataDeserializationSchema
implements DeserializationSch
.equals(otherSchema.getArrayElementSeparator())
&& csvSchema.getQuoteChar() == otherSchema.getQuoteChar()
&& csvSchema.getEscapeChar() == otherSchema.getEscapeChar()
- && Arrays.equals(csvSchema.getNullValue(),
otherSchema.getNullValue());
+ && Arrays.equals(csvSchema.getNullValue(),
otherSchema.getNullValue())
+ && enabledFeatures.equals(that.enabledFeatures)
+ && disabledFeatures.equals(that.disabledFeatures);
}
@Override
@@ -234,6 +304,8 @@ public final class CsvRowDataDeserializationSchema
implements DeserializationSch
csvSchema.getArrayElementSeparator(),
csvSchema.getQuoteChar(),
csvSchema.getEscapeChar(),
- Arrays.hashCode(csvSchema.getNullValue()));
+ Arrays.hashCode(csvSchema.getNullValue()),
+ enabledFeatures,
+ disabledFeatures);
}
}
diff --git
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index d4db9a4d3de..fbe1c993c74 100644
---
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.formats.csv;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
@@ -71,9 +74,16 @@ class CsvFormatFactoryTest {
.setArrayElementDelimiter("|")
.setEscapeCharacter('\\')
.setNullLiteral("n/a")
+ .setTrimSpaces(true)
+ .setEmptyStringAsNull(true)
.build();
open(expectedDeser);
- final Map<String, String> options = getAllOptions();
+ final Map<String, String> options =
+ getModifiedOptions(
+ opts -> {
+ opts.put("csv.trim-spaces", "true");
+ opts.put("csv.empty-string-as-null", "true");
+ });
DeserializationSchema<RowData> actualDeser =
createDeserializationSchema(options);
assertThat(actualDeser).isEqualTo(expectedDeser);
@@ -347,6 +357,105 @@ class CsvFormatFactoryTest {
assertThat(deserialized).isEqualTo(expected);
}
+ @Test
+ void testTrimSpaces() throws IOException {
+ final Map<String, String> options =
+ getModifiedOptions(opts -> opts.put("csv.trim-spaces",
"true"));
+ DeserializationSchema<RowData> deserializationSchema =
createDeserializationSchema(options);
+ RowData actual = deserializationSchema.deserialize(" abc
;123;false".getBytes());
+ GenericRowData expected = GenericRowData.of(fromString("abc"), 123,
false);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ void testEmptyStringAsNull() throws IOException {
+ final Map<String, String> options =
+ getModifiedOptions(opts ->
opts.put("csv.empty-string-as-null", "true"));
+ DeserializationSchema<RowData> deserializationSchema =
createDeserializationSchema(options);
+ RowData actual =
deserializationSchema.deserialize(";123;false".getBytes());
+ assertThat(actual.isNullAt(0)).isTrue();
+ assertThat(actual.getInt(1)).isEqualTo(123);
+ assertThat(actual.getBoolean(2)).isFalse();
+ }
+
+ @Test
+ void testAllowTrailingComma() throws IOException {
+ final Map<String, String> options =
+ getModifiedOptions(opts ->
opts.put("csv.allow-trailing-comma", "true"));
+ DeserializationSchema<RowData> deserializationSchema =
createDeserializationSchema(options);
+ RowData actual =
deserializationSchema.deserialize("abc;123;false;".getBytes());
+ GenericRowData expected = GenericRowData.of(fromString("abc"), 123,
false);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ void testIgnoreTrailingUnmappable() throws IOException {
+ final Map<String, String> options =
+ getModifiedOptions(opts ->
opts.put("csv.ignore-trailing-unmappable", "true"));
+ DeserializationSchema<RowData> deserializationSchema =
createDeserializationSchema(options);
+ RowData actual =
deserializationSchema.deserialize("abc;123;false;extra".getBytes());
+ GenericRowData expected = GenericRowData.of(fromString("abc"), 123,
false);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ void testFailOnMissingColumns() {
+ final Map<String, String> options =
+ getModifiedOptions(
+ opts -> {
+ opts.put("csv.fail-on-missing-columns", "true");
+ opts.remove("csv.ignore-parse-errors");
+ });
+ DeserializationSchema<RowData> deserializationSchema =
createDeserializationSchema(options);
+ assertThatThrownBy(() ->
deserializationSchema.deserialize("abc;123".getBytes()))
+ .isInstanceOf(IOException.class);
+ }
+
+ @Test
+ void testDeserializationSchemaEqualityWithFeatures() {
+ final CsvRowDataDeserializationSchema schema1 =
+ new CsvRowDataDeserializationSchema.Builder(
+ PHYSICAL_TYPE,
InternalTypeInfo.of(PHYSICAL_TYPE))
+ .setTrimSpaces(true)
+ .setEmptyStringAsNull(true)
+ .build();
+ final CsvRowDataDeserializationSchema schema2 =
+ new CsvRowDataDeserializationSchema.Builder(
+ PHYSICAL_TYPE,
InternalTypeInfo.of(PHYSICAL_TYPE))
+ .setTrimSpaces(true)
+ .setEmptyStringAsNull(true)
+ .build();
+ final CsvRowDataDeserializationSchema schema3 =
+ new CsvRowDataDeserializationSchema.Builder(
+ PHYSICAL_TYPE,
InternalTypeInfo.of(PHYSICAL_TYPE))
+ .setTrimSpaces(false)
+ .build();
+
+ assertThat(schema1).isEqualTo(schema2);
+ assertThat(schema1).isNotEqualTo(schema3);
+ }
+
+ @Test
+ void testBulkFormatWithParserFeatures() {
+ final Configuration formatOptions = new Configuration();
+ formatOptions.set(CsvFormatOptions.FIELD_DELIMITER, ";");
+ formatOptions.set(CsvFormatOptions.TRIM_SPACES, true);
+ formatOptions.set(CsvFormatOptions.IGNORE_TRAILING_UNMAPPABLE, true);
+ formatOptions.set(CsvFormatOptions.ALLOW_TRAILING_COMMA, true);
+ formatOptions.set(CsvFormatOptions.EMPTY_STRING_AS_NULL, true);
+
+ CsvFileFormatFactory.CsvBulkDecodingFormat bulkDecodingFormat =
+ new CsvFileFormatFactory.CsvBulkDecodingFormat(formatOptions);
+
+ // Verify the bulk format can be created without errors
+ BulkFormat<RowData, FileSourceSplit> bulkFormat =
+ bulkDecodingFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE,
+ PHYSICAL_DATA_TYPE,
+ Projection.all(PHYSICAL_DATA_TYPE).toNestedIndexes());
+ assertThat(bulkFormat).isNotNull();
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------