This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new b50fc62  [FLINK-19790][json] Clear reused ObjectNode's content for map 
converter in RowDataToJsonConverters
b50fc62 is described below

commit b50fc62e5b08363932815e0c68fbea3f08f0011f
Author: Benchao Li <libenc...@gmail.com>
AuthorDate: Wed Nov 4 21:33:23 2020 +0800

    [FLINK-19790][json] Clear reused ObjectNode's content for map converter in 
RowDataToJsonConverters
    
    This closes 13926
---
 .../json/JsonRowDataSerializationSchema.java       |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 23 +++++++++++++++++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 89b3b87..afc8ede 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -268,6 +268,7 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
                                node = mapper.createObjectNode();
                        } else {
                                node = (ObjectNode) reuse;
+                               node.removeAll();
                        }
 
                        MapData map = (MapData) object;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index fedfbea..a27de49 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -232,7 +232,12 @@ public class JsonRowDataSerDeSchemaTest {
                RowType rowType = (RowType) ROW(
                        FIELD("f1", INT()),
                        FIELD("f2", BOOLEAN()),
-                       FIELD("f3", STRING())
+                       FIELD("f3", STRING()),
+                       FIELD("f4", MAP(STRING(), STRING())),
+                       FIELD("f5", ARRAY(STRING())),
+                       FIELD("f6", ROW(
+                               FIELD("f1", STRING()),
+                               FIELD("f2", INT())))
                ).getLogicalType();
 
                JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
@@ -247,6 +252,14 @@ public class JsonRowDataSerDeSchemaTest {
                        root.put("f1", 1);
                        root.put("f2", true);
                        root.put("f3", "str");
+                       ObjectNode map = root.putObject("f4");
+                       map.put("hello1", "flink");
+                       ArrayNode array = root.putArray("f5");
+                       array.add("element1");
+                       array.add("element2");
+                       ObjectNode row = root.putObject("f6");
+                       row.put("f1", "this is row1");
+                       row.put("f2", 12);
                        byte[] serializedJson = 
objectMapper.writeValueAsBytes(root);
                        RowData rowData = 
deserializationSchema.deserialize(serializedJson);
                        byte[] actual = serializationSchema.serialize(rowData);
@@ -259,6 +272,14 @@ public class JsonRowDataSerDeSchemaTest {
                        root.put("f1", 10);
                        root.put("f2", false);
                        root.put("f3", "newStr");
+                       ObjectNode map = root.putObject("f4");
+                       map.put("hello2", "json");
+                       ArrayNode array = root.putArray("f5");
+                       array.add("element3");
+                       array.add("element4");
+                       ObjectNode row = root.putObject("f6");
+                       row.put("f1", "this is row2");
+                       row.putNull("f2");
                        byte[] serializedJson = 
objectMapper.writeValueAsBytes(root);
                        RowData rowData = 
deserializationSchema.deserialize(serializedJson);
                        byte[] actual = serializationSchema.serialize(rowData);

Reply via email to