This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 4cde4be [FLINK-16220][json] Fix cast exception in JsonRowSerializationSchema when serializing null fields 4cde4be is described below commit 4cde4be46213b73dcb90864b103c61caa0e22666 Author: Benchao Li <libenc...@gmail.com> AuthorDate: Fri Mar 20 14:44:19 2020 +0800 [FLINK-16220][json] Fix cast exception in JsonRowSerializationSchema when serializing null fields This closes #11180 --- .../formats/json/JsonRowSerializationSchema.java | 6 ++-- .../json/JsonRowSerializationSchemaTest.java | 35 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java index 5e89b42..b4cb875 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java @@ -331,7 +331,8 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> { return (mapper, reuse, object) -> { ObjectNode node; - if (reuse == null) { + // reuse could be a NullNode if last record is null. + if (reuse == null || reuse.isNull()) { node = mapper.createObjectNode(); } else { node = (ObjectNode) reuse; @@ -353,7 +354,8 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> { return (mapper, reuse, object) -> { ArrayNode node; - if (reuse == null) { + // reuse could be a NullNode if last record is null. + if (reuse == null || reuse.isNull()) { node = mapper.createArrayNode(); } else { node = (ArrayNode) reuse; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java index 4a0706f..378f92b 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java @@ -92,6 +92,41 @@ public class JsonRowSerializationSchemaTest { } @Test + public void testMultiRowsWithNullValues() throws IOException { + String[] jsons = new String[] { + "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\", \"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"}, " + + "\"ids\":[1, 2, 3]}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}", + }; + + String[] expected = new String[] { + "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"}," + + "\"ids\":[1,2,3]}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}", + }; + + TypeInformation<Row> schema = Types.ROW_NAMED( + new String[]{"svt", "ops", "ids"}, + Types.STRING, + Types.ROW_NAMED(new String[]{"id"}, Types.STRING), + Types.PRIMITIVE_ARRAY(Types.INT)); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(schema) + .build(); + JsonRowSerializationSchema serializationSchema = JsonRowSerializationSchema.builder() + .withTypeInfo(schema) + .build(); + + for (int i = 0; i < jsons.length; i++) { + String json = jsons[i]; + Row row = deserializationSchema.deserialize(json.getBytes()); + String result = new String(serializationSchema.serialize(row)); + assertEquals(expected[i], result); + } + } + + @Test public void testNestedSchema() { final TypeInformation<Row> rowSchema = Types.ROW_NAMED( new String[] {"f1", "f2", "f3"},