This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 93555a7d55d [FLINK-16627][json] Support ignore null fields when 
serializing into JSON
93555a7d55d is described below

commit 93555a7d55dc27571122cccb7c4d8af2c5db54cb
Author: zhouyisha <zhouyi...@bytedance.com>
AuthorDate: Fri Mar 1 17:11:32 2024 +0800

    [FLINK-16627][json] Support ignore null fields when serializing into JSON
    
    Close apache/flink#24430
---
 .../docs/connectors/table/formats/debezium.md      |  7 ++
 .../docs/connectors/table/formats/json.md          |  7 ++
 .../docs/connectors/table/formats/maxwell.md       |  7 ++
 .../docs/connectors/table/formats/ogg.md           |  9 ++-
 .../docs/connectors/table/formats/debezium.md      |  7 ++
 docs/content/docs/connectors/table/formats/json.md |  8 +++
 .../docs/connectors/table/formats/maxwell.md       |  7 ++
 docs/content/docs/connectors/table/formats/ogg.md  |  7 ++
 .../hive/util/ThriftObjectConversions.java         |  2 +-
 .../flink/formats/json/JsonFormatFactory.java      |  7 +-
 .../flink/formats/json/JsonFormatOptions.java      |  7 ++
 .../json/JsonRowDataSerializationSchema.java       | 21 ++++--
 .../formats/json/RowDataToJsonConverters.java      | 15 ++--
 .../formats/json/canal/CanalJsonFormatFactory.java |  7 +-
 .../json/canal/CanalJsonSerializationSchema.java   |  6 +-
 .../json/debezium/DebeziumJsonFormatFactory.java   |  6 +-
 .../debezium/DebeziumJsonSerializationSchema.java  |  6 +-
 .../json/maxwell/MaxwellJsonFormatFactory.java     |  7 +-
 .../maxwell/MaxwellJsonSerializationSchema.java    |  6 +-
 .../formats/json/ogg/OggJsonFormatFactory.java     |  7 +-
 .../json/ogg/OggJsonSerializationSchema.java       |  6 +-
 .../flink/formats/json/JsonFormatFactoryTest.java  |  2 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 84 +++++++++++++++++++---
 .../json/canal/CanalJsonFormatFactoryTest.java     |  3 +
 .../json/canal/CanalJsonSerDeSchemaTest.java       |  3 +-
 .../debezium/DebeziumJsonFormatFactoryTest.java    |  2 +
 .../json/debezium/DebeziumJsonSerDeSchemaTest.java |  3 +-
 .../json/maxwell/MaxwellJsonFormatFactoryTest.java |  2 +
 .../json/maxwell/MaxwellJsonSerDerTest.java        |  3 +-
 .../formats/json/ogg/OggJsonFormatFactoryTest.java |  2 +
 .../formats/json/ogg/OggJsonSerDeSchemaTest.java   |  3 +-
 .../gateway/rest/serde/ResultInfoSerializer.java   |  5 +-
 32 files changed, 236 insertions(+), 38 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/debezium.md 
b/docs/content.zh/docs/connectors/table/formats/debezium.md
index a6ac486f0f0..7ba62dd408d 100644
--- a/docs/content.zh/docs/connectors/table/formats/debezium.md
+++ b/docs/content.zh/docs/connectors/table/formats/debezium.md
@@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 
format 来
       <td>Boolean</td>
       <td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 
<code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
     </tr>
+    <tr>
+      <td><h5>debezium-json.encode.ignore-null-fields</h5></td>
+      <td>选填</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content.zh/docs/connectors/table/formats/json.md 
b/docs/content.zh/docs/connectors/table/formats/json.md
index f1acdd7a001..005485a7a0a 100644
--- a/docs/content.zh/docs/connectors/table/formats/json.md
+++ b/docs/content.zh/docs/connectors/table/formats/json.md
@@ -135,6 +135,13 @@ Format 参数
       <td>Boolean</td>
       <td>将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:<code>0.000000027</code> 默认会表示为 
<code>2.7E-8</code>。当此选项设为 true 时,则会表示为 <code>0.000000027</code>。</td>
     </tr>
+    <tr>
+      <td><h5>json.encode.ignore-null-fields</h5></td>
+      <td>选填</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
+    </tr>
     <tr>
       <td><h5>decode.json-parser.enabled</h5></td>
       <td>选填</td>
diff --git a/docs/content.zh/docs/connectors/table/formats/maxwell.md 
b/docs/content.zh/docs/connectors/table/formats/maxwell.md
index a3ac161f231..0bdedeac682 100644
--- a/docs/content.zh/docs/connectors/table/formats/maxwell.md
+++ b/docs/content.zh/docs/connectors/table/formats/maxwell.md
@@ -251,6 +251,13 @@ Format Options
       <td>Boolean</td>
       <td>Encode all decimals as plain numbers instead of possible scientific 
notations. By default, decimals may be written using scientific notation. For 
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, 
and will be written as <code>0.000000027</code> if set this option to true.</td>
     </tr>
+    <tr>
+      <td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Encode only non-null fields. By default, all fields will be 
included.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content.zh/docs/connectors/table/formats/ogg.md 
b/docs/content.zh/docs/connectors/table/formats/ogg.md
index c8e8a7a6c6d..61ec97b60fd 100644
--- a/docs/content.zh/docs/connectors/table/formats/ogg.md
+++ b/docs/content.zh/docs/connectors/table/formats/ogg.md
@@ -216,7 +216,7 @@ Format Options
       <td>当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。</td>
     </tr>
     <tr>
-      <td><h5>debezium-json.timestamp-format.standard</h5></td>
+      <td><h5>ogg-json.timestamp-format.standard</h5></td>
       <td>可选</td>
       <td style="word-wrap: break-word;"><code>'SQL'</code></td>
       <td>String</td>
@@ -247,6 +247,13 @@ Format Options
       <td>String</td>
       <td>当 <code>'ogg-json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换 
Map 中的空 key 值。</td>
     </tr>
+    <tr>
+      <td><h5>ogg-json.encode.ignore-null-fields</h5></td>
+      <td>选填</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/formats/debezium.md 
b/docs/content/docs/connectors/table/formats/debezium.md
index 790196e2588..f69e3dc5d8f 100644
--- a/docs/content/docs/connectors/table/formats/debezium.md
+++ b/docs/content/docs/connectors/table/formats/debezium.md
@@ -445,6 +445,13 @@ Use format `debezium-avro-confluent` to interpret Debezium 
Avro messages and for
       <td>Boolean</td>
       <td>Encode all decimals as plain numbers instead of possible scientific 
notations. By default, decimals may be written using scientific notation. For 
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, 
and will be written as <code>0.000000027</code> if set this option to true.</td>
     </tr>   
+    <tr>
+      <td><h5>debezium-json.encode.ignore-null-fields</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Encode only non-null fields. By default, all fields will be 
included.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/formats/json.md 
b/docs/content/docs/connectors/table/formats/json.md
index 52345a42ea1..64592ac28be 100644
--- a/docs/content/docs/connectors/table/formats/json.md
+++ b/docs/content/docs/connectors/table/formats/json.md
@@ -146,6 +146,14 @@ Format Options
       <td>Boolean</td>
       <td>Encode all decimals as plain numbers instead of possible scientific 
notations. By default, decimals may be written using scientific notation. For 
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, 
and will be written as <code>0.000000027</code> if set this option to true.</td>
     </tr>
+    <tr>
+      <td><h5>json.encode.ignore-null-fields</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Encode only non-null fields. By default, all fields will be 
included.</td>
+    </tr>
     <tr>
       <td><h5>decode.json-parser.enabled</h5></td>
       <td>optional</td>
diff --git a/docs/content/docs/connectors/table/formats/maxwell.md 
b/docs/content/docs/connectors/table/formats/maxwell.md
index a7a98270f38..47c87442c73 100644
--- a/docs/content/docs/connectors/table/formats/maxwell.md
+++ b/docs/content/docs/connectors/table/formats/maxwell.md
@@ -251,6 +251,13 @@ Format Options
       <td>Boolean</td>
       <td>Encode all decimals as plain numbers instead of possible scientific 
notations. By default, decimals may be written using scientific notation. For 
example, <code>0.000000027</code> is encoded as <code>2.7E-8</code> by default, 
and will be written as <code>0.000000027</code> if set this option to true.</td>
     </tr>
+    <tr>
+      <td><h5>maxwell-json.encode.ignore-null-fields</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Encode only non-null fields. By default, all fields will be 
included.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/formats/ogg.md 
b/docs/content/docs/connectors/table/formats/ogg.md
index 482273af8ce..3b53916e36d 100644
--- a/docs/content/docs/connectors/table/formats/ogg.md
+++ b/docs/content/docs/connectors/table/formats/ogg.md
@@ -260,6 +260,13 @@ Format Options
       <td>String</td>
       <td>Specify string literal to replace null key when 
<code>'ogg-json.map-null-key.mode'</code> is LITERAL.</td>
     </tr>
+    <tr>
+      <td><h5>ogg-json.encode.ignore-null-fields</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Encode only non-null fields. By default, all fields will be 
included.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
index 35a40ea187f..fbc7d89d6ad 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java
@@ -108,7 +108,7 @@ public class ThriftObjectConversions {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
             new RowDataToJsonConverters(
-                    TimestampFormat.SQL, 
JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
+                    TimestampFormat.SQL, 
JsonFormatOptions.MapNullKeyMode.LITERAL, "null", false);
     private static final Map<String, TableKind> TABLE_TYPE_MAPPINGS = 
buildTableTypeMapping();
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 562b99e6bc8..6ecc9a0f086 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -46,6 +46,7 @@ import java.util.Set;
 
 import static 
org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL;
@@ -147,6 +148,7 @@ public class JsonFormatFactory implements 
DeserializationFormatFactory, Serializ
 
         final boolean encodeDecimalAsPlainNumber =
                 formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        final boolean ignoreNullFields = 
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
 
         return new EncodingFormat<SerializationSchema<RowData>>() {
             @Override
@@ -158,7 +160,8 @@ public class JsonFormatFactory implements 
DeserializationFormatFactory, Serializ
                         timestampOption,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
             }
 
             @Override
@@ -187,6 +190,7 @@ public class JsonFormatFactory implements 
DeserializationFormatFactory, Serializ
         options.add(MAP_NULL_KEY_MODE);
         options.add(MAP_NULL_KEY_LITERAL);
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        options.add(ENCODE_IGNORE_NULL_FIELDS);
         return options;
     }
 
@@ -197,6 +201,7 @@ public class JsonFormatFactory implements 
DeserializationFormatFactory, Serializ
         options.add(MAP_NULL_KEY_MODE);
         options.add(MAP_NULL_KEY_LITERAL);
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        options.add(ENCODE_IGNORE_NULL_FIELDS);
         return options;
     }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
index 5c9e61068ac..cc40b325d91 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java
@@ -73,6 +73,13 @@ public class JsonFormatOptions {
                     .withDescription(
                             "Optional flag to specify whether to encode all 
decimals as plain numbers instead of possible scientific notations, false by 
default.");
 
+    public static final ConfigOption<Boolean> ENCODE_IGNORE_NULL_FIELDS =
+            ConfigOptions.key("encode.ignore-null-fields")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional flag to specify whether to ignore null 
fields when encoding, false by default.");
+
     public static final ConfigOption<Boolean> DECODE_JSON_PARSER_ENABLED =
             ConfigOptions.key("decode.json-parser.enabled")
                     .booleanType()
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 376d0d568a3..4b68bb0c2af 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -68,19 +68,28 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
     /** Flag indicating whether to serialize all decimals as plain numbers. */
     private final boolean encodeDecimalAsPlainNumber;
 
+    /** Flag indicating whether to ignore null fields. */
+    private final boolean ignoreNullFields;
+
     public JsonRowDataSerializationSchema(
             RowType rowType,
             TimestampFormat timestampFormat,
             JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
             String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
+            boolean encodeDecimalAsPlainNumber,
+            boolean ignoreNullFields) {
         this.rowType = rowType;
         this.timestampFormat = timestampFormat;
         this.mapNullKeyMode = mapNullKeyMode;
         this.mapNullKeyLiteral = mapNullKeyLiteral;
         this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+        this.ignoreNullFields = ignoreNullFields;
         this.runtimeConverter =
-                new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, 
mapNullKeyLiteral)
+                new RowDataToJsonConverters(
+                                timestampFormat,
+                                mapNullKeyMode,
+                                mapNullKeyLiteral,
+                                ignoreNullFields)
                         .createConverter(rowType);
     }
 
@@ -95,7 +104,7 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
 
     @Override
     public byte[] serialize(RowData row) {
-        if (node == null) {
+        if (node == null || ignoreNullFields) {
             node = mapper.createObjectNode();
         }
 
@@ -120,7 +129,8 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
                 && timestampFormat.equals(that.timestampFormat)
                 && mapNullKeyMode.equals(that.mapNullKeyMode)
                 && mapNullKeyLiteral.equals(that.mapNullKeyLiteral)
-                && encodeDecimalAsPlainNumber == 
that.encodeDecimalAsPlainNumber;
+                && encodeDecimalAsPlainNumber == 
that.encodeDecimalAsPlainNumber
+                && ignoreNullFields == that.ignoreNullFields;
     }
 
     @Override
@@ -130,6 +140,7 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
                 timestampFormat,
                 mapNullKeyMode,
                 mapNullKeyLiteral,
-                encodeDecimalAsPlainNumber);
+                encodeDecimalAsPlainNumber,
+                ignoreNullFields);
     }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
index 2a1cd076ecf..7ca5f59d183 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
@@ -68,13 +68,18 @@ public class RowDataToJsonConverters implements 
Serializable {
     /** The string literal when handling mode for map null key LITERAL. is */
     private final String mapNullKeyLiteral;
 
+    /** Flag indicating whether to ignore null fields. */
+    private final boolean ignoreNullFields;
+
     public RowDataToJsonConverters(
             TimestampFormat timestampFormat,
             JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral) {
+            String mapNullKeyLiteral,
+            boolean ignoreNullFields) {
         this.timestampFormat = timestampFormat;
         this.mapNullKeyMode = mapNullKeyMode;
         this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.ignoreNullFields = ignoreNullFields;
     }
 
     /**
@@ -331,9 +336,11 @@ public class RowDataToJsonConverters implements 
Serializable {
                 String fieldName = fieldNames[i];
                 try {
                     Object field = fieldGetters[i].getFieldOrNull(row);
-                    node.set(
-                            fieldName,
-                            fieldConverters[i].convert(mapper, 
node.get(fieldName), field));
+                    if (field != null || !ignoreNullFields) {
+                        node.set(
+                                fieldName,
+                                fieldConverters[i].convert(mapper, 
node.get(fieldName), field));
+                    }
                 } catch (Throwable t) {
                     throw new RuntimeException(
                             String.format("Fail to serialize at field: %s.", 
fieldName), t);
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index 9f3ccad2d01..77dfefa9505 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
 import static 
org.apache.flink.formats.json.canal.CanalJsonFormatOptions.DATABASE_INCLUDE;
 import static 
org.apache.flink.formats.json.canal.CanalJsonFormatOptions.IGNORE_PARSE_ERRORS;
 import static 
org.apache.flink.formats.json.canal.CanalJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
@@ -91,6 +92,8 @@ public class CanalJsonFormatFactory
         final boolean encodeDecimalAsPlainNumber =
                 formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
 
+        final boolean ignoreNullFields = 
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
         return new EncodingFormat<SerializationSchema<RowData>>() {
             @Override
             public ChangelogMode getChangelogMode() {
@@ -111,7 +114,8 @@ public class CanalJsonFormatFactory
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
             }
         };
     }
@@ -136,6 +140,7 @@ public class CanalJsonFormatFactory
         options.add(JSON_MAP_NULL_KEY_MODE);
         options.add(JSON_MAP_NULL_KEY_LITERAL);
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        options.add(ENCODE_IGNORE_NULL_FIELDS);
         return options;
     }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
index 362b9df6e6a..aaa292ef9df 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
@@ -59,14 +59,16 @@ public class CanalJsonSerializationSchema implements 
SerializationSchema<RowData
             TimestampFormat timestampFormat,
             JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
             String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
+            boolean encodeDecimalAsPlainNumber,
+            boolean ignoreNullFields) {
         jsonSerializer =
                 new JsonRowDataSerializationSchema(
                         createJsonRowType(fromLogicalToDataType(rowType)),
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
     }
 
     @Override
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index d72fcd23deb..7fec2f43c1e 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
 import static 
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS;
 import static 
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
 import static 
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -92,6 +93,7 @@ public class DebeziumJsonFormatFactory
 
         final boolean encodeDecimalAsPlainNumber =
                 formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        final boolean ignoreNullFields = 
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
 
         return new EncodingFormat<SerializationSchema<RowData>>() {
 
@@ -114,7 +116,8 @@ public class DebeziumJsonFormatFactory
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
             }
         };
     }
@@ -138,6 +141,7 @@ public class DebeziumJsonFormatFactory
         options.add(JSON_MAP_NULL_KEY_MODE);
         options.add(JSON_MAP_NULL_KEY_LITERAL);
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        options.add(ENCODE_IGNORE_NULL_FIELDS);
         return options;
     }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
index 0dc9a96b012..7312b30593a 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
@@ -56,14 +56,16 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<RowD
             TimestampFormat timestampFormat,
             JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
             String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
+            boolean encodeDecimalAsPlainNumber,
+            boolean ignoreNullFields) {
         jsonSerializer =
                 new JsonRowDataSerializationSchema(
                         createJsonRowType(fromLogicalToDataType(rowType)),
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
     }
 
     @Override
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
index 1bbbec84414..e56966753a2 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
 import static 
org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS;
 import static 
org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
 import static 
org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -86,6 +87,8 @@ public class MaxwellJsonFormatFactory
         final boolean encodeDecimalAsPlainNumber =
                 formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
 
+        final boolean ignoreNullFields = 
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
         return new EncodingFormat<SerializationSchema<RowData>>() {
 
             @Override
@@ -107,7 +110,8 @@ public class MaxwellJsonFormatFactory
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
             }
         };
     }
@@ -130,6 +134,7 @@ public class MaxwellJsonFormatFactory
         options.add(JSON_MAP_NULL_KEY_MODE);
         options.add(JSON_MAP_NULL_KEY_LITERAL);
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        options.add(ENCODE_IGNORE_NULL_FIELDS);
         return options;
     }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
index 1fe567b08c3..ad1accdddd6 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
@@ -56,14 +56,16 @@ public class MaxwellJsonSerializationSchema implements 
SerializationSchema<RowDa
             TimestampFormat timestampFormat,
             JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
             String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
+            boolean encodeDecimalAsPlainNumber,
+            boolean ignoreNullFields) {
         this.jsonSerializer =
                 new JsonRowDataSerializationSchema(
                         createJsonRowType(fromLogicalToDataType(rowType)),
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
         this.timestampFormat = timestampFormat;
     }
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
index f853983d43e..11182e93806 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS;
 import static 
org.apache.flink.formats.json.ogg.OggJsonFormatOptions.IGNORE_PARSE_ERRORS;
 import static 
org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
 import static 
org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE;
@@ -99,6 +100,8 @@ public class OggJsonFormatFactory
         final boolean encodeDecimalAsPlainNumber =
                 formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
 
+        final boolean ignoreNullFields = 
formatOptions.get(ENCODE_IGNORE_NULL_FIELDS);
+
         return new EncodingFormat<SerializationSchema<RowData>>() {
 
             @Override
@@ -120,7 +123,8 @@ public class OggJsonFormatFactory
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
             }
         };
     }
@@ -143,6 +147,7 @@ public class OggJsonFormatFactory
         options.add(JSON_MAP_NULL_KEY_MODE);
         options.add(JSON_MAP_NULL_KEY_LITERAL);
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        options.add(ENCODE_IGNORE_NULL_FIELDS);
         return options;
     }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
index 635ff3dc7e3..f44387a5863 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java
@@ -57,14 +57,16 @@ public class OggJsonSerializationSchema implements 
SerializationSchema<RowData>
             TimestampFormat timestampFormat,
             JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
             String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
+            boolean encodeDecimalAsPlainNumber,
+            boolean ignoreNullFields) {
         jsonSerializer =
                 new JsonRowDataSerializationSchema(
                         createJsonRowType(fromLogicalToDataType(rowType)),
                         timestampFormat,
                         mapNullKeyMode,
                         mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
+                        encodeDecimalAsPlainNumber,
+                        ignoreNullFields);
     }
 
     private static RowType createJsonRowType(DataType databaseSchema) {
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index 3559e2b2c87..4430203b28e 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -176,6 +176,7 @@ class JsonFormatFactoryTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
+                        true,
                         true);
 
         SerializationSchema<RowData> actualSer =
@@ -227,6 +228,7 @@ class JsonFormatFactoryTest {
         options.put("json.map-null-key.mode", "LITERAL");
         options.put("json.map-null-key.literal", "null");
         options.put("json.encode.decimal-as-plain-number", "true");
+        options.put("json.encode.ignore-null-fields", "true");
         return options;
     }
 }
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index ced449e0936..916b04f50f8 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -216,7 +216,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema);
 
         byte[] actualBytes = serializationSchema.serialize(rowData);
@@ -300,7 +301,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema);
 
         // the first row
@@ -381,7 +383,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema);
 
         for (int i = 0; i < jsons.length; i++) {
@@ -496,7 +499,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema);
 
         ObjectNode root = OBJECT_MAPPER.createObjectNode();
@@ -538,7 +542,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.FAIL,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema1);
         // expect message for serializationSchema1
         String errorMessage1 =
@@ -551,7 +556,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.DROP,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema2);
         // expect result for serializationSchema2
         String expectResult2 = "{\"nestedMap\":{\"no-null key\":{\"no-null 
key\":1}}}";
@@ -562,7 +568,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "nullKey",
-                        true);
+                        true,
+                        false);
         open(serializationSchema3);
         // expect result for serializationSchema3
         String expectResult3 =
@@ -601,7 +608,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
         plainDecimalSerializer.open(new DummyInitializationContext());
         JsonRowDataSerializationSchema scientificDecimalSerializer =
                 new JsonRowDataSerializationSchema(
@@ -609,6 +617,7 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
+                        false,
                         false);
         scientificDecimalSerializer.open(new DummyInitializationContext());
 
@@ -626,6 +635,62 @@ public class JsonRowDataSerDeSchemaTest {
         assertThat(scientificDecimalResult).isEqualTo(scientificDecimalJson);
     }
 
+    @TestTemplate
+    void testSerDeMultiRowsWithNullValuesIgnored() throws Exception {
+        String[] jsons =
+                new String[] {
+                    
"{\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+                    
"{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\", 
\"svt\":\"2020-02-24T12:58:09.209+0800\"}, "
+                            + "\"ids\":[1, 2, 3]}",
+                    "{\"ops\":{\"id\":null, 
\"svt\":\"2020-02-24T12:58:09.209+0800\"}, "
+                            + "\"ids\":[1, 2, null]}",
+                    "{\"ops\":{},\"ids\":[],\"metrics\":{}}",
+                };
+
+        String[] expected =
+                new String[] {
+                    "{\"metrics\":{\"k1\":10.01,\"k2\":null}}",
+                    
"{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\",\"svt\":\"2020-02-24T12:58:09.209+0800\"},"
+                            + "\"ids\":[1,2,3]}",
+                    
"{\"ops\":{\"svt\":\"2020-02-24T12:58:09.209+0800\"},\"ids\":[1,2,null]}",
+                    "{\"ops\":{},\"ids\":[],\"metrics\":{}}",
+                };
+
+        RowType rowType =
+                (RowType)
+                        ROW(
+                                        FIELD(
+                                                "ops",
+                                                ROW(FIELD("id", STRING()), 
FIELD("svt", STRING()))),
+                                        FIELD("ids", ARRAY(INT())),
+                                        FIELD("metrics", MAP(STRING(), 
DOUBLE())))
+                                .getLogicalType();
+
+        JsonRowDataDeserializationSchema deserializationSchema =
+                new JsonRowDataDeserializationSchema(
+                        rowType,
+                        InternalTypeInfo.of(rowType),
+                        false,
+                        true,
+                        TimestampFormat.ISO_8601);
+        open(deserializationSchema);
+        JsonRowDataSerializationSchema serializationSchema =
+                new JsonRowDataSerializationSchema(
+                        rowType,
+                        TimestampFormat.ISO_8601,
+                        JsonFormatOptions.MapNullKeyMode.LITERAL,
+                        "null",
+                        false,
+                        true);
+        open(serializationSchema);
+        for (int i = 0; i < jsons.length; i++) {
+            String json = jsons[i];
+            RowData row = deserializationSchema.deserialize(json.getBytes());
+            String result = new String(serializationSchema.serialize(row));
+            assertThat(result).isEqualTo(expected[i]);
+        }
+    }
+
     @TestTemplate
     void testJsonParse() throws Exception {
         for (TestSpec spec : testData) {
@@ -648,7 +713,8 @@ public class JsonRowDataSerDeSchemaTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.FAIL,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema);
         String errorMessage = "Fail to serialize at field: f1.";
 
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
index 00bd5a0625e..bf2b95dac98 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
@@ -74,6 +74,7 @@ class CanalJsonFormatFactoryTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.FAIL,
                         "null",
+                        false,
                         false);
         SerializationSchema<RowData> actualSer = 
createSerializationSchema(options);
         assertThat(actualSer).isEqualTo(expectedSer);
@@ -89,6 +90,7 @@ class CanalJsonFormatFactoryTest {
         options.put("canal-json.map-null-key.mode", "LITERAL");
         options.put("canal-json.map-null-key.literal", "nullKey");
         options.put("canal-json.encode.decimal-as-plain-number", "true");
+        options.put("canal-json.encode.ignore-null-fields", "true");
 
         // test Deser
         CanalJsonDeserializationSchema expectedDeser =
@@ -109,6 +111,7 @@ class CanalJsonFormatFactoryTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "nullKey",
+                        true,
                         true);
         SerializationSchema<RowData> actualSer = 
createSerializationSchema(options);
         assertThat(actualSer).isEqualTo(expectedSer);
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
index e45bfcc5eee..cf326f2a339 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
@@ -218,7 +218,8 @@ class CanalJsonSerDeSchemaTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
         serializationSchema.open(new DummyInitializationContext());
 
         List<String> result = new ArrayList<>();
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
index d000877b2f9..c469e0b2f95 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
@@ -81,6 +81,7 @@ class DebeziumJsonFormatFactoryTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
+                        true,
                         true);
 
         final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
@@ -200,6 +201,7 @@ class DebeziumJsonFormatFactoryTest {
         options.put("debezium-json.map-null-key.mode", "LITERAL");
         options.put("debezium-json.map-null-key.literal", "null");
         options.put("debezium-json.encode.decimal-as-plain-number", "true");
+        options.put("debezium-json.encode.ignore-null-fields", "true");
         return options;
     }
 }
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
index 3b9151f33a9..ffe0007f522 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -249,7 +249,8 @@ class DebeziumJsonSerDeSchemaTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
 
         open(serializationSchema);
         actual = new ArrayList<>();
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
index bc47d1e68f0..54fe0804a5b 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
@@ -69,6 +69,7 @@ class MaxwellJsonFormatFactoryTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
+                        true,
                         true);
 
         final Map<String, String> options = getAllOptions();
@@ -165,6 +166,7 @@ class MaxwellJsonFormatFactoryTest {
         options.put("maxwell-json.map-null-key.mode", "LITERAL");
         options.put("maxwell-json.map-null-key.literal", "null");
         options.put("maxwell-json.encode.decimal-as-plain-number", "true");
+        options.put("maxwell-json.encode.ignore-null-fields", "true");
         return options;
     }
 }
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
index 12d64fd99d0..d17d6a83534 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
@@ -187,7 +187,8 @@ class MaxwellJsonSerDerTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
         open(serializationSchema);
         List<String> result = new ArrayList<>();
         for (RowData rowData : collector.list) {
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
index c04e991a2de..f840783ca95 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java
@@ -55,6 +55,7 @@ class OggJsonFormatFactoryTest {
                         TimestampFormat.ISO_8601,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
+                        true,
                         true);
 
         final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
@@ -137,6 +138,7 @@ class OggJsonFormatFactoryTest {
         options.put("ogg-json.map-null-key.mode", "LITERAL");
         options.put("ogg-json.map-null-key.literal", "null");
         options.put("ogg-json.encode.decimal-as-plain-number", "true");
+        options.put("ogg-json.encode.ignore-null-fields", "true");
         return options;
     }
 }
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
index 2fa78c89412..76e417d4ac1 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java
@@ -216,7 +216,8 @@ class OggJsonSerDeSchemaTest {
                         TimestampFormat.SQL,
                         JsonFormatOptions.MapNullKeyMode.LITERAL,
                         "null",
-                        true);
+                        true,
+                        false);
 
         open(serializationSchema);
         actual = new ArrayList<>();
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
index b796d32ba3e..fb43f6d62b8 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
@@ -68,7 +68,10 @@ public class ResultInfoSerializer extends 
StdSerializer<ResultInfo> {
 
     private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
             new RowDataToJsonConverters(
-                    TimestampFormat.ISO_8601, 
JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
+                    TimestampFormat.ISO_8601,
+                    JsonFormatOptions.MapNullKeyMode.LITERAL,
+                    "null",
+                    false);
 
     @Override
     public void serialize(

Reply via email to