This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 93555a7d55d [FLINK-16627][json] Support ignore null fields when serializing into JSON 93555a7d55d is described below commit 93555a7d55dc27571122cccb7c4d8af2c5db54cb Author: zhouyisha <zhouyi...@bytedance.com> AuthorDate: Fri Mar 1 17:11:32 2024 +0800 [FLINK-16627][json] Support ignore null fields when serializing into JSON Close apache/flink#24430 --- .../docs/connectors/table/formats/debezium.md | 7 ++ .../docs/connectors/table/formats/json.md | 7 ++ .../docs/connectors/table/formats/maxwell.md | 7 ++ .../docs/connectors/table/formats/ogg.md | 9 ++- .../docs/connectors/table/formats/debezium.md | 7 ++ docs/content/docs/connectors/table/formats/json.md | 8 +++ .../docs/connectors/table/formats/maxwell.md | 7 ++ docs/content/docs/connectors/table/formats/ogg.md | 7 ++ .../hive/util/ThriftObjectConversions.java | 2 +- .../flink/formats/json/JsonFormatFactory.java | 7 +- .../flink/formats/json/JsonFormatOptions.java | 7 ++ .../json/JsonRowDataSerializationSchema.java | 21 ++++-- .../formats/json/RowDataToJsonConverters.java | 15 ++-- .../formats/json/canal/CanalJsonFormatFactory.java | 7 +- .../json/canal/CanalJsonSerializationSchema.java | 6 +- .../json/debezium/DebeziumJsonFormatFactory.java | 6 +- .../debezium/DebeziumJsonSerializationSchema.java | 6 +- .../json/maxwell/MaxwellJsonFormatFactory.java | 7 +- .../maxwell/MaxwellJsonSerializationSchema.java | 6 +- .../formats/json/ogg/OggJsonFormatFactory.java | 7 +- .../json/ogg/OggJsonSerializationSchema.java | 6 +- .../flink/formats/json/JsonFormatFactoryTest.java | 2 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 84 +++++++++++++++++++--- .../json/canal/CanalJsonFormatFactoryTest.java | 3 + .../json/canal/CanalJsonSerDeSchemaTest.java | 3 +- .../debezium/DebeziumJsonFormatFactoryTest.java | 2 + .../json/debezium/DebeziumJsonSerDeSchemaTest.java | 3 +- .../json/maxwell/MaxwellJsonFormatFactoryTest.java | 2 + .../json/maxwell/MaxwellJsonSerDerTest.java | 3 +- .../formats/json/ogg/OggJsonFormatFactoryTest.java | 2 + .../formats/json/ogg/OggJsonSerDeSchemaTest.java | 3 +- .../gateway/rest/serde/ResultInfoSerializer.java | 5 +- 32 files changed, 236 insertions(+), 38 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/debezium.md b/docs/content.zh/docs/connectors/table/formats/debezium.md index a6ac486f0f0..7ba62dd408d 100644 --- a/docs/content.zh/docs/connectors/table/formats/debezium.md +++ b/docs/content.zh/docs/connectors/table/formats/debezium.md @@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 format 来 <td>Boolean</td> <td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 <code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td> </tr> + <tr> + <td><h5>debezium-json.encode.ignore-null-fields</h5></td> + <td>选填</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td> + </tr> </tbody> </table> diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md index f1acdd7a001..005485a7a0a 100644 --- a/docs/content.zh/docs/connectors/table/formats/json.md +++ b/docs/content.zh/docs/connectors/table/formats/json.md @@ -135,6 +135,13 @@ Format 参数 <td>Boolean</td> <td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 <code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td> </tr> + <tr> + <td><h5>json.encode.ignore-null-fields</h5></td> + <td>选填</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td> + </tr> <tr> <td><h5>decode.json-parser.enabled</h5></td> <td>选填</td> diff --git a/docs/content.zh/docs/connectors/table/formats/maxwell.md b/docs/content.zh/docs/connectors/table/formats/maxwell.md index a3ac161f231..0bdedeac682 100644 --- a/docs/content.zh/docs/connectors/table/formats/maxwell.md +++ b/docs/content.zh/docs/connectors/table/formats/maxwell.md @@ -251,6 +251,13 @@ Format Options <td>Boolean</td> <td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td> </tr> + <tr> + <td><h5>maxwell-json.encode.ignore-null-fields</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Encode only non-null fields. By default, all fields will be included.</td> + </tr> </tbody> </table> diff --git a/docs/content.zh/docs/connectors/table/formats/ogg.md b/docs/content.zh/docs/connectors/table/formats/ogg.md index c8e8a7a6c6d..61ec97b60fd 100644 --- a/docs/content.zh/docs/connectors/table/formats/ogg.md +++ b/docs/content.zh/docs/connectors/table/formats/ogg.md @@ -216,7 +216,7 @@ Format Options <td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。</td> </tr> <tr> - <td><h5>debezium-json.timestamp-format.standard</h5></td> + <td><h5>ogg-json.timestamp-format.standard</h5></td> <td>可选</td> <td style="word-wrap: break-word;"><code>'SQL'</code></td> <td>String</td> @@ -247,6 +247,13 @@ Format Options <td>String</td> <td>当 <code>'ogg-json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。</td> </tr> + <tr> + <td><h5>ogg-json.encode.ignore-null-fields</h5></td> + <td>选填</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td> + </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/table/formats/debezium.md b/docs/content/docs/connectors/table/formats/debezium.md index 790196e2588..f69e3dc5d8f 100644 --- a/docs/content/docs/connectors/table/formats/debezium.md +++ b/docs/content/docs/connectors/table/formats/debezium.md @@ -445,6 +445,13 @@ Use format `debezium-avro-confluent` to interpret Debezium Avro messages and for <td>Boolean</td> <td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td> </tr> + <tr> + <td><h5>debezium-json.encode.ignore-null-fields</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Encode only non-null fields. By default, all fields will be included.</td> + </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md index 52345a42ea1..64592ac28be 100644 --- a/docs/content/docs/connectors/table/formats/json.md +++ b/docs/content/docs/connectors/table/formats/json.md @@ -146,6 +146,14 @@ Format Options <td>Boolean</td> <td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td> </tr> + <tr> + <td><h5>json.encode.ignore-null-fields</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Encode only non-null fields. By default, all fields will be included.</td> + </tr> <tr> <td><h5>decode.json-parser.enabled</h5></td> <td>optional</td> diff --git a/docs/content/docs/connectors/table/formats/maxwell.md b/docs/content/docs/connectors/table/formats/maxwell.md index a7a98270f38..47c87442c73 100644 --- a/docs/content/docs/connectors/table/formats/maxwell.md +++ b/docs/content/docs/connectors/table/formats/maxwell.md @@ -251,6 +251,13 @@ Format Options <td>Boolean</td> <td>Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, and will be written as <code>0.000000027</code> if set this option to true.</td> </tr> + <tr> + <td><h5>maxwell-json.encode.ignore-null-fields</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Encode only non-null fields. By default, all fields will be included.</td> + </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/table/formats/ogg.md b/docs/content/docs/connectors/table/formats/ogg.md index 482273af8ce..3b53916e36d 100644 --- a/docs/content/docs/connectors/table/formats/ogg.md +++ b/docs/content/docs/connectors/table/formats/ogg.md @@ -260,6 +260,13 @@ Format Options <td>String</td> <td>Specify string literal to replace null key when <code>'ogg-json.map-null-key.mode'</code> is LITERAL.</td> </tr> + <tr> + <td><h5>ogg-json.encode.ignore-null-fields</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Encode only non-null fields. By default, all fields will be included.</td> + </tr> </tbody> </table> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java index 35a40ea187f..fbc7d89d6ad 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java @@ -108,7 +108,7 @@ public class ThriftObjectConversions { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final RowDataToJsonConverters TO_JSON_CONVERTERS = new RowDataToJsonConverters( - TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null"); + TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", false); private static final Map<String, TableKind> TABLE_TYPE_MAPPINGS = buildTableTypeMapping(); // -------------------------------------------------------------------------------------------- diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java index 562b99e6bc8..6ecc9a0f086 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java @@ -46,6 +46,7 @@ import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD; import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL; @@ -147,6 +148,7 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); return new EncodingFormat<SerializationSchema<RowData>>() { @Override @@ -158,7 +160,8 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ timestampOption, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } @Override @@ -187,6 +190,7 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ options.add(MAP_NULL_KEY_MODE); options.add(MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } @@ -197,6 +201,7 @@ public class JsonFormatFactory implements DeserializationFormatFactory, Serializ options.add(MAP_NULL_KEY_MODE); options.add(MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java index 5c9e61068ac..cc40b325d91 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java @@ -73,6 +73,13 @@ public class JsonFormatOptions { .withDescription( "Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default."); + public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS = + ConfigOptions.key("encode.ignore-null-fields") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to specify whether to ignore null fields when encoding, false by default."); + public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED = ConfigOptions.key("decode.json-parser.enabled") .booleanType() diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java index 376d0d568a3..4b68bb0c2af 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java @@ -68,19 +68,28 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa /** Flag indicating whether to serialize all decimals as plain numbers. */ private final boolean encodeDecimalAsPlainNumber; + /** Flag indicating whether to ignore null fields. */ + private final boolean ignoreNullFields; + public JsonRowDataSerializationSchema( RowType rowType, TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { this.rowType = rowType; this.timestampFormat = timestampFormat; this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyLiteral = mapNullKeyLiteral; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; + this.ignoreNullFields = ignoreNullFields; this.runtimeConverter = - new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral) + new RowDataToJsonConverters( + timestampFormat, + mapNullKeyMode, + mapNullKeyLiteral, + ignoreNullFields) .createConverter(rowType); } @@ -95,7 +104,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa @Override public byte[] serialize(RowData row) { - if (node == null) { + if (node == null || ignoreNullFields) { node = mapper.createObjectNode(); } @@ -120,7 +129,8 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa && timestampFormat.equals(that.timestampFormat) && mapNullKeyMode.equals(that.mapNullKeyMode) && mapNullKeyLiteral.equals(that.mapNullKeyLiteral) - && encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber; + && encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber + && ignoreNullFields == that.ignoreNullFields; } @Override @@ -130,6 +140,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java index 2a1cd076ecf..7ca5f59d183 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java @@ -68,13 +68,18 @@ public class RowDataToJsonConverters implements Serializable { /** The string literal when handling mode for map null key LITERAL. is */ private final String mapNullKeyLiteral; + /** Flag indicating whether to ignore null fields. */ + private final boolean ignoreNullFields; + public RowDataToJsonConverters( TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, - String mapNullKeyLiteral) { + String mapNullKeyLiteral, + boolean ignoreNullFields) { this.timestampFormat = timestampFormat; this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyLiteral = mapNullKeyLiteral; + this.ignoreNullFields = ignoreNullFields; } /** @@ -331,9 +336,11 @@ public class RowDataToJsonConverters implements Serializable { String fieldName = fieldNames[i]; try { Object field = fieldGetters[i].getFieldOrNull(row); - node.set( - fieldName, - fieldConverters[i].convert(mapper, node.get(fieldName), field)); + if (field != null || !ignoreNullFields) { + node.set( + fieldName, + fieldConverters[i].convert(mapper, node.get(fieldName), field)); + } } catch (Throwable t) { throw new RuntimeException( String.format("Fail to serialize at field: %s.", fieldName), t); diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java index 9f3ccad2d01..77dfefa9505 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java @@ -44,6 +44,7 @@ import java.util.HashSet; import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.DATABASE_INCLUDE; import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.canal.CanalJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; @@ -91,6 +92,8 @@ public class CanalJsonFormatFactory final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); + return new EncodingFormat<SerializationSchema<RowData>>() { @Override public ChangelogMode getChangelogMode() { @@ -111,7 +114,8 @@ public class CanalJsonFormatFactory timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -136,6 +140,7 @@ public class CanalJsonFormatFactory options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java index 362b9df6e6a..aaa292ef9df 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java @@ -59,14 +59,16 @@ public class CanalJsonSerializationSchema implements SerializationSchema<RowData TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java index d72fcd23deb..7fec2f43c1e 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java @@ -45,6 +45,7 @@ import java.util.HashSet; import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE; @@ -92,6 +93,7 @@ public class DebeziumJsonFormatFactory final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); return new EncodingFormat<SerializationSchema<RowData>>() { @@ -114,7 +116,8 @@ public class DebeziumJsonFormatFactory timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -138,6 +141,7 @@ public class DebeziumJsonFormatFactory options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java index 0dc9a96b012..7312b30593a 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java @@ -56,14 +56,16 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<RowD TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java index 1bbbec84414..e56966753a2 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java @@ -44,6 +44,7 @@ import java.util.HashSet; import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE; @@ -86,6 +87,8 @@ public class MaxwellJsonFormatFactory final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); + return new EncodingFormat<SerializationSchema<RowData>>() { @Override @@ -107,7 +110,8 @@ public class MaxwellJsonFormatFactory timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -130,6 +134,7 @@ public class MaxwellJsonFormatFactory options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java index 1fe567b08c3..ad1accdddd6 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java @@ -56,14 +56,16 @@ public class MaxwellJsonSerializationSchema implements SerializationSchema<RowDa TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { this.jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); this.timestampFormat = timestampFormat; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java index f853983d43e..11182e93806 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java @@ -44,6 +44,7 @@ import java.util.HashSet; import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE; @@ -99,6 +100,8 @@ public class OggJsonFormatFactory final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); + return new EncodingFormat<SerializationSchema<RowData>>() { @Override @@ -120,7 +123,8 @@ public class OggJsonFormatFactory timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -143,6 +147,7 @@ public class OggJsonFormatFactory options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java index 635ff3dc7e3..f44387a5863 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java @@ -57,14 +57,16 @@ public class OggJsonSerializationSchema implements SerializationSchema<RowData> TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } private static RowType createJsonRowType(DataType databaseSchema) { diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java index 3559e2b2c87..4430203b28e 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java @@ -176,6 +176,7 @@ class JsonFormatFactoryTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); SerializationSchema<RowData> actualSer = @@ -227,6 +228,7 @@ class JsonFormatFactoryTest { options.put("json.map-null-key.mode", "LITERAL"); options.put("json.map-null-key.literal", "null"); options.put("json.encode.decimal-as-plain-number", "true"); + options.put("json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index ced449e0936..916b04f50f8 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -216,7 +216,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); byte[] actualBytes = serializationSchema.serialize(rowData); @@ -300,7 +301,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); // the first row @@ -381,7 +383,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); for (int i = 0; i < jsons.length; i++) { @@ -496,7 +499,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); ObjectNode root = OBJECT_MAPPER.createObjectNode(); @@ -538,7 +542,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.FAIL, "null", - true); + true, + false); open(serializationSchema1); // expect message for serializationSchema1 String errorMessage1 = @@ -551,7 +556,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.DROP, "null", - true); + true, + false); open(serializationSchema2); // expect result for serializationSchema2 String expectResult2 = "{\"nestedMap\":{\"no-null key\":{\"no-null key\":1}}}"; @@ -562,7 +568,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "nullKey", - true); + true, + false); open(serializationSchema3); // expect result for serializationSchema3 String expectResult3 = @@ -601,7 +608,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); plainDecimalSerializer.open(new DummyInitializationContext()); JsonRowDataSerializationSchema scientificDecimalSerializer = new JsonRowDataSerializationSchema( @@ -609,6 +617,7 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + false, false); scientificDecimalSerializer.open(new DummyInitializationContext()); @@ -626,6 +635,62 @@ public class JsonRowDataSerDeSchemaTest { assertThat(scientificDecimalResult).isEqualTo(scientificDecimalJson); } + @TestTemplate + void testSerDeMultiRowsWithNullValuesIgnored() throws Exception { + String[] jsons = + new String[] { + "{\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}", + "{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\", \"svt\":\"2020-02-24T12:58:09.209+0800\"}, " + + "\"ids\":[1, 2, 3]}", + "{\"ops\":{\"id\":null, \"svt\":\"2020-02-24T12:58:09.209+0800\"}, " + + "\"ids\":[1, 2, null]}", + "{\"ops\":{},\"ids\":[],\"metrics\":{}}", + }; + + String[] expected = + new String[] { + "{\"metrics\":{\"k1\":10.01,\"k2\":null}}", + "{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\",\"svt\":\"2020-02-24T12:58:09.209+0800\"}," + + "\"ids\":[1,2,3]}", + "{\"ops\":{\"svt\":\"2020-02-24T12:58:09.209+0800\"},\"ids\":[1,2,null]}", + "{\"ops\":{},\"ids\":[],\"metrics\":{}}", + }; + + RowType rowType = + (RowType) + ROW( + FIELD( + "ops", + ROW(FIELD("id", STRING()), FIELD("svt", STRING()))), + FIELD("ids", ARRAY(INT())), + FIELD("metrics", MAP(STRING(), DOUBLE()))) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = + new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601); + open(deserializationSchema); + JsonRowDataSerializationSchema serializationSchema = + new JsonRowDataSerializationSchema( + rowType, + TimestampFormat.ISO_8601, + JsonFormatOptions.MapNullKeyMode.LITERAL, + "null", + false, + true); + open(serializationSchema); + for (int i = 0; i < jsons.length; i++) { + String json = jsons[i]; + RowData row = deserializationSchema.deserialize(json.getBytes()); + String result = new String(serializationSchema.serialize(row)); + assertThat(result).isEqualTo(expected[i]); + } + } + @TestTemplate void testJsonParse() throws Exception { for (TestSpec spec : testData) { @@ -648,7 +713,8 @@ public class JsonRowDataSerDeSchemaTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.FAIL, "null", - true); + true, + false); open(serializationSchema); String errorMessage = "Fail to serialize at field: f1."; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java index 00bd5a0625e..bf2b95dac98 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java @@ -74,6 +74,7 @@ class CanalJsonFormatFactoryTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.FAIL, "null", + false, false); SerializationSchema<RowData> actualSer = createSerializationSchema(options); assertThat(actualSer).isEqualTo(expectedSer); @@ -89,6 +90,7 @@ class CanalJsonFormatFactoryTest { options.put("canal-json.map-null-key.mode", "LITERAL"); options.put("canal-json.map-null-key.literal", "nullKey"); options.put("canal-json.encode.decimal-as-plain-number", "true"); + options.put("canal-json.encode.ignore-null-fields", "true"); // test Deser CanalJsonDeserializationSchema expectedDeser = @@ -109,6 +111,7 @@ class CanalJsonFormatFactoryTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "nullKey", + true, true); SerializationSchema<RowData> actualSer = createSerializationSchema(options); assertThat(actualSer).isEqualTo(expectedSer); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java index e45bfcc5eee..cf326f2a339 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java @@ -218,7 +218,8 @@ class CanalJsonSerDeSchemaTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); serializationSchema.open(new DummyInitializationContext()); List<String> result = new ArrayList<>(); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java index d000877b2f9..c469e0b2f95 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java @@ -81,6 +81,7 @@ class DebeziumJsonFormatFactoryTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); @@ -200,6 +201,7 @@ class DebeziumJsonFormatFactoryTest { options.put("debezium-json.map-null-key.mode", "LITERAL"); options.put("debezium-json.map-null-key.literal", "null"); options.put("debezium-json.encode.decimal-as-plain-number", "true"); + options.put("debezium-json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java index 3b9151f33a9..ffe0007f522 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java @@ -249,7 +249,8 @@ class DebeziumJsonSerDeSchemaTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); actual = new ArrayList<>(); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java index bc47d1e68f0..54fe0804a5b 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java @@ -69,6 +69,7 @@ class MaxwellJsonFormatFactoryTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); final Map<String, String> options = getAllOptions(); @@ -165,6 +166,7 @@ class MaxwellJsonFormatFactoryTest { options.put("maxwell-json.map-null-key.mode", "LITERAL"); options.put("maxwell-json.map-null-key.literal", "null"); options.put("maxwell-json.encode.decimal-as-plain-number", "true"); + options.put("maxwell-json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java index 12d64fd99d0..d17d6a83534 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java @@ -187,7 +187,8 @@ class MaxwellJsonSerDerTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); List<String> result = new ArrayList<>(); for (RowData rowData : collector.list) { diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java index c04e991a2de..f840783ca95 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java @@ -55,6 +55,7 @@ class OggJsonFormatFactoryTest { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); @@ -137,6 +138,7 @@ class OggJsonFormatFactoryTest { options.put("ogg-json.map-null-key.mode", "LITERAL"); options.put("ogg-json.map-null-key.literal", "null"); options.put("ogg-json.encode.decimal-as-plain-number", "true"); + options.put("ogg-json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java index 2fa78c89412..76e417d4ac1 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java @@ -216,7 +216,8 @@ class OggJsonSerDeSchemaTest { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); actual = new ArrayList<>(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java index b796d32ba3e..fb43f6d62b8 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java @@ -68,7 +68,10 @@ public class ResultInfoSerializer extends StdSerializer<ResultInfo> { private static final RowDataToJsonConverters TO_JSON_CONVERTERS = new RowDataToJsonConverters( - TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null"); + TimestampFormat.ISO_8601, + JsonFormatOptions.MapNullKeyMode.LITERAL, + "null", + false); @Override public void serialize(