This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 09747f999e5 [FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127 09747f999e5 is described below commit 09747f999e54b7921e8c12c944b941b0777be48f Author: fengjiankun <fengjian...@360.cn> AuthorDate: Fri Jul 1 17:12:20 2022 +0800 [FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127 --- .../docs/connectors/table/formats/csv.md | 7 +++ docs/content/docs/connectors/table/formats/csv.md | 8 +++ .../org/apache/flink/formats/csv/CsvCommons.java | 3 ++ .../apache/flink/formats/csv/CsvFormatFactory.java | 5 ++ .../apache/flink/formats/csv/CsvFormatOptions.java | 7 +++ .../formats/csv/CsvRowDataSerializationSchema.java | 15 ++++-- .../flink/formats/csv/CsvFormatFactoryTest.java | 58 ++++++++++++++++++++++ 7 files changed, 100 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/csv.md b/docs/content.zh/docs/connectors/table/formats/csv.md index 417af014a47..0c93e8e188b 100644 --- a/docs/content.zh/docs/connectors/table/formats/csv.md +++ b/docs/content.zh/docs/connectors/table/formats/csv.md @@ -141,6 +141,13 @@ Format 参数 <td>String</td> <td>是否将 "null" 字符串转化为 null 值。</td> </tr> + <tr> + <td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td> + <td>可选</td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>设置将 Bigdecimal 类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为 '1E+5';设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法</td> + </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/table/formats/csv.md b/docs/content/docs/connectors/table/formats/csv.md index e7cc54999a8..27389cc528e 100644 --- a/docs/content/docs/connectors/table/formats/csv.md +++ b/docs/content/docs/connectors/table/formats/csv.md @@ -152,6 +152,14 @@ Format Options <td>String</td> <td>Null literal string that is interpreted as a null value (disabled by default).</td> </tr> + <tr> + <td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation.</td> + </tr> </tbody> </table> diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java index 48825748f82..7d15bfb985b 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java @@ -35,6 +35,7 @@ import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER; import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL; import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER; +import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION; /** A class with common CSV format constants and utility methods. */ class CsvCommons { @@ -100,6 +101,7 @@ class CsvCommons { options.add(ARRAY_ELEMENT_DELIMITER); options.add(ESCAPE_CHARACTER); options.add(NULL_LITERAL); + options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION); return options; } @@ -112,6 +114,7 @@ class CsvCommons { options.add(ARRAY_ELEMENT_DELIMITER); options.add(ESCAPE_CHARACTER); options.add(NULL_LITERAL); + options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION); return options; } } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java index 49b16eda125..2db1a9f695c 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java @@ -52,6 +52,7 @@ import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER; import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL; import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER; +import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION; /** * Format factory for providing configured instances of CSV to RowData {@link SerializationSchema} @@ -206,5 +207,9 @@ public final class CsvFormatFactory .ifPresent(schemaBuilder::setEscapeCharacter); formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral); + + formatOptions + .getOptional(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION) + .ifPresent(schemaBuilder::setWriteBigDecimalInScientificNotation); } } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java index 459e7feef4f..c9beae1ca7e 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java @@ -87,5 +87,12 @@ public class CsvFormatOptions { "Optional null literal string that is interpreted as a\n" + "null value (disabled by default)"); + public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION = + ConfigOptions.key("write-bigdecimal-in-scientific-notation") + .booleanType() + .defaultValue(true) + .withDescription( + "Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation."); + private CsvFormatOptions() {} } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java index 16d7c966a52..25673fcc3d8 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java @@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -70,10 +71,11 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< .RowDataToCsvFormatConverterContext converterContext; - private CsvRowDataSerializationSchema(RowType rowType, CsvSchema csvSchema) { + private CsvRowDataSerializationSchema( + RowType rowType, CsvSchema csvSchema, CsvMapper csvMapper) { this.rowType = rowType; this.runtimeConverter = RowDataToCsvConverters.createRowConverter(rowType); - this.csvMapper = new CsvMapper(); + this.csvMapper = csvMapper; this.csvSchema = csvSchema.withLineSeparator(""); this.objectWriter = csvMapper.writer(this.csvSchema); } @@ -84,6 +86,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< private final RowType rowType; private CsvSchema csvSchema; + private CsvMapper csvMapper; /** * Creates a {@link CsvRowDataSerializationSchema} expecting the given {@link RowType}. @@ -95,6 +98,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< this.rowType = rowType; this.csvSchema = CsvRowSchemaConverter.convert(rowType); + this.csvMapper = new CsvMapper(); } public Builder setFieldDelimiter(char c) { @@ -128,8 +132,13 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema< return this; } + public void setWriteBigDecimalInScientificNotation(boolean isScientificNotation) { + this.csvMapper.configure( + JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, !isScientificNotation); + } + public CsvRowDataSerializationSchema build() { - return new CsvRowDataSerializationSchema(rowType, csvSchema); + return new CsvRowDataSerializationSchema(rowType, csvSchema, csvMapper); } } diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java index 6edde34a947..468cc8ff959 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java @@ -20,7 +20,10 @@ package org.apache.flink.formats.csv; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -37,6 +40,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.IOException; +import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -45,6 +49,7 @@ import java.util.Map; import java.util.function.Consumer; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.table.data.DecimalData.fromBigDecimal; import static org.apache.flink.table.data.StringData.fromString; import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE; import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE; @@ -230,6 +235,58 @@ public class CsvFormatFactoryTest extends TestLogger { createTableSink(SCHEMA, options); } + @Test + public void testSerializationWithWriteBigDecimalInScientificNotation() { + final Map<String, String> options = + getModifiedOptions( + opts -> opts.put("csv.write-bigdecimal-in-scientific-notation", "true")); + + ResolvedSchema schema = + ResolvedSchema.of( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.DECIMAL(10, 3)), + Column.physical("c", DataTypes.BOOLEAN())); + final DynamicTableSink actualSink = createTableSink(schema, options); + assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class); + TestDynamicTableFactory.DynamicTableSinkMock sinkMock = + (TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + + SerializationSchema<RowData> runtimeEncoder = + sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType()); + + RowData rowData = + GenericRowData.of( + fromString("abc"), fromBigDecimal(new BigDecimal("100000"), 10, 3), false); + byte[] bytes = runtimeEncoder.serialize(rowData); + assertThat(new String(bytes)).isEqualTo("abc;'1E+5';false"); + } + + @Test + public void testSerializationWithNotWriteBigDecimalInScientificNotation() { + final Map<String, String> options = + getModifiedOptions( + opts -> opts.put("csv.write-bigdecimal-in-scientific-notation", "false")); + + ResolvedSchema schema = + ResolvedSchema.of( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.DECIMAL(10, 3)), + Column.physical("c", DataTypes.BOOLEAN())); + final DynamicTableSink actualSink = createTableSink(schema, options); + assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class); + TestDynamicTableFactory.DynamicTableSinkMock sinkMock = + (TestDynamicTableFactory.DynamicTableSinkMock) actualSink; + + SerializationSchema<RowData> runtimeEncoder = + sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType()); + + RowData rowData = + GenericRowData.of( + fromString("abc"), fromBigDecimal(new BigDecimal("100000"), 10, 3), false); + byte[] bytes = runtimeEncoder.serialize(rowData); + assertThat(new String(bytes)).isEqualTo("abc;'100000';false"); + } + @Test public void testProjectionPushdown() throws IOException { final Map<String, String> options = getAllOptions(); @@ -311,6 +368,7 @@ public class CsvFormatFactoryTest extends TestLogger { options.put("csv.array-element-delimiter", "|"); options.put("csv.escape-character", "\\"); options.put("csv.null-literal", "n/a"); + options.put("csv.write-bigdecimal-in-scientific-notation", "true"); return options; }