This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 38e4e2b  [FLINK-11727][formats] Fixed JSON format issues after 
serialization
38e4e2b is described below

commit 38e4e2b8f9bc63a793a2bddef5a578e3f80b7376
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Mar 5 10:45:20 2019 +0100

    [FLINK-11727][formats] Fixed JSON format issues after serialization
    
    This commit reworks JSON format to use a runtime converter created based
    on given TypeInformation. Pre this commit conversion logic was based on
    reference comparison of TypeInformation which was not working after
    serialization of the format.
    
    This also introduces a builder pattern for ensuring future immutability
    of schemas.
    
    This closes #7932.
---
 .../test-scripts/kafka_sql_common.sh               |  19 +-
 .../formats/json/JsonRowDeserializationSchema.java | 394 +++++++++++++++------
 .../flink/formats/json/JsonRowFormatFactory.java   |  13 +-
 .../formats/json/JsonRowSerializationSchema.java   | 332 +++++++++++------
 .../org/apache/flink/formats/json/TimeFormats.java |  46 +++
 .../json/JsonRowDeserializationSchemaTest.java     |  66 ++--
 .../formats/json/JsonRowFormatFactoryTest.java     |  12 +-
 .../json/JsonRowSerializationSchemaTest.java       |  63 ++--
 .../utils/DeserializationSchemaMatcher.java        | 164 +++++++++
 .../formats/utils/SerializationSchemaMatcher.java  | 192 ++++++++++
 10 files changed, 1020 insertions(+), 281 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh 
b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
index 85dbbae..37744d6 100644
--- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
@@ -31,14 +31,14 @@ function create_kafka_json_source {
     # put JSON data into Kafka
     echo "Sending messages to Kafka..."
 
-    send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": 
"Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' 
$topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": 
"Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' 
$topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": 
"Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' 
$topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": 
"Alice", "event": { "type": "INFO", "message": "This is a info."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": 
"Steve", "event": { "type": "INFO", "message": "This is another info."}}' 
$topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": 
"Steve", "event": { "type": "INFO", "message": "This is another info."}}' 
$topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, 
"event": { "type": "WARNING", "message": "This is a bad message because the 
user is missing."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": 
"Bob", "event": { "type": "ERROR", "message": "This is an error."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T08:00:00Z", "user": 
"Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' 
$topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T08:10:00Z", "user": 
"Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' 
$topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:00:00Z", "user": 
"Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' 
$topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:10:00Z", "user": 
"Alice", "event": { "type": "INFO", "message": "This is a info."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:20:00Z", "user": 
"Steve", "event": { "type": "INFO", "message": "This is another info."}}' 
$topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:30:00Z", "user": 
"Steve", "event": { "type": "INFO", "message": "This is another info."}}' 
$topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:30:00Z", "user": 
null, "event": { "type": "WARNING", "message": "This is a bad message because 
the user is missing."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T10:40:00Z", "user": 
"Bob", "event": { "type": "ERROR", "message": "This is an error."}}' $topicName
 }
 
 function get_kafka_json_source_schema {
@@ -79,7 +79,8 @@ function get_kafka_json_source_schema {
           "type": "object",
           "properties": {
             "timestamp": {
-              "type": "string"
+              "type": "string",
+              "format": "date-time"
             },
             "user": {
               "type": ["string", "null"]
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index df1fbc5..4a1ff27 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -27,18 +27,39 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import java.io.IOException;
-import java.lang.reflect.Array;
+import java.io.Serializable;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static java.util.Spliterators.spliterator;
+import static java.util.stream.StreamSupport.stream;
+import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Deserialization schema from JSON to Flink types.
@@ -54,45 +75,56 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
        private static final long serialVersionUID = -228294330688809195L;
 
        /** Type information describing the result type. */
-       private final TypeInformation<Row> typeInfo;
+       private final RowTypeInfo typeInfo;
+
+       private boolean failOnMissingField;
 
        /** Object mapper for parsing the JSON. */
        private final ObjectMapper objectMapper = new ObjectMapper();
 
-       /** Flag indicating whether to fail on a missing field. */
-       private boolean failOnMissingField;
+       private DeserializationRuntimeConverter runtimeConverter;
+
+       private JsonRowDeserializationSchema(
+                       TypeInformation<Row> typeInfo,
+                       boolean failOnMissingField) {
+               checkNotNull(typeInfo, "Type information");
+               checkArgument(typeInfo instanceof RowTypeInfo, "Only 
RowTypeInfo is supported");
+               this.typeInfo = (RowTypeInfo) typeInfo;
+               this.failOnMissingField = failOnMissingField;
+               this.runtimeConverter = createConverter(this.typeInfo);
+       }
 
        /**
-        * Creates a JSON deserialization schema for the given type information.
-        *
-        * @param typeInfo Type information describing the result type. The 
field names of {@link Row}
-        *                 are used to parse the JSON properties.
+        * @deprecated Use the provided {@link Builder} instead.
         */
+       @Deprecated
        public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
-               Preconditions.checkNotNull(typeInfo, "Type information");
-               this.typeInfo = typeInfo;
-
-               if (!(typeInfo instanceof RowTypeInfo)) {
-                       throw new IllegalArgumentException("Row type 
information expected.");
-               }
+               this(typeInfo, false);
        }
 
        /**
-        * Creates a JSON deserialization schema for the given JSON schema.
-        *
-        * @param jsonSchema JSON schema describing the result type
-        *
-        * @see <a href="http://json-schema.org/";>http://json-schema.org/</a>
+        * @deprecated Use the provided {@link Builder} instead.
         */
+       @Deprecated
        public JsonRowDeserializationSchema(String jsonSchema) {
-               this(JsonRowSchemaConverter.convert(jsonSchema));
+               this(JsonRowSchemaConverter.convert(checkNotNull(jsonSchema)), 
false);
+       }
+
+       /**
+        * @deprecated Use the provided {@link Builder} instead.
+        */
+       @Deprecated
+       public void setFailOnMissingField(boolean failOnMissingField) {
+               // TODO make this class immutable once we drop this method
+               this.failOnMissingField = failOnMissingField;
+               this.runtimeConverter = createConverter(this.typeInfo);
        }
 
        @Override
        public Row deserialize(byte[] message) throws IOException {
                try {
                        final JsonNode root = objectMapper.readTree(message);
-                       return convertRow(root, (RowTypeInfo) typeInfo);
+                       return (Row) runtimeConverter.convert(objectMapper, 
root);
                } catch (Throwable t) {
                        throw new IOException("Failed to deserialize JSON 
object.", t);
                }
@@ -109,14 +141,48 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
        }
 
        /**
-        * Configures the failure behaviour if a JSON field is missing.
-        *
-        * <p>By default, a missing field is ignored and the field is set to 
null.
-        *
-        * @param failOnMissingField Flag indicating whether to fail or not on 
a missing field.
+        * Builder for {@link JsonRowDeserializationSchema}.
         */
-       public void setFailOnMissingField(boolean failOnMissingField) {
-               this.failOnMissingField = failOnMissingField;
+       public static class Builder {
+
+               private final RowTypeInfo typeInfo;
+               private boolean failOnMissingField = false;
+
+               /**
+                * Creates a JSON deserialization schema for the given type 
information.
+                *
+                * @param typeInfo Type information describing the result type. 
The field names of {@link Row}
+                *                 are used to parse the JSON properties.
+                */
+               public Builder(TypeInformation<Row> typeInfo) {
+                       checkArgument(typeInfo instanceof RowTypeInfo, "Only 
RowTypeInfo is supported");
+                       this.typeInfo = (RowTypeInfo) typeInfo;
+               }
+
+               /**
+                * Creates a JSON deserialization schema for the given JSON 
schema.
+                *
+                * @param jsonSchema JSON schema describing the result type
+                *
+                * @see <a 
href="http://json-schema.org/";>http://json-schema.org/</a>
+                */
+               public Builder(String jsonSchema) {
+                       
this(JsonRowSchemaConverter.convert(checkNotNull(jsonSchema)));
+               }
+
+               /**
+                * Configures schema to fail if a JSON field is missing.
+                *
+                * <p>By default, a missing field is ignored and the field is 
set to null.
+                */
+               public Builder failOnMissingField() {
+                       this.failOnMissingField = true;
+                       return this;
+               }
+
+               public JsonRowDeserializationSchema build() {
+                       return new JsonRowDeserializationSchema(typeInfo, 
failOnMissingField);
+               }
        }
 
        @Override
@@ -128,7 +194,8 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
                        return false;
                }
                final JsonRowDeserializationSchema that = 
(JsonRowDeserializationSchema) o;
-               return failOnMissingField == that.failOnMissingField && 
Objects.equals(typeInfo, that.typeInfo);
+               return Objects.equals(typeInfo, that.typeInfo) &&
+                       Objects.equals(failOnMissingField, 
that.failOnMissingField);
        }
 
        @Override
@@ -136,99 +203,214 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
                return Objects.hash(typeInfo, failOnMissingField);
        }
 
-       // 
--------------------------------------------------------------------------------------------
-
-       private Object convert(JsonNode node, TypeInformation<?> info) {
-               if (info == Types.VOID || node.isNull()) {
-                       return null;
-               } else if (info == Types.BOOLEAN) {
-                       return node.asBoolean();
-               } else if (info == Types.STRING) {
-                       return node.asText();
-               } else if (info == Types.BIG_DEC) {
-                       return node.decimalValue();
-               } else if (info == Types.BIG_INT) {
-                       return node.bigIntegerValue();
-               } else if (info == Types.SQL_DATE) {
-                       return Date.valueOf(node.asText());
-               } else if (info == Types.SQL_TIME) {
-                       // according to RFC 3339 every full-time must have a 
timezone;
-                       // until we have full timezone support, we only support 
UTC;
-                       // users can parse their time as string as a workaround
-                       final String time = node.asText();
-                       if (time.indexOf('Z') < 0 || time.indexOf('.') >= 0) {
-                               throw new IllegalStateException(
-                                       "Invalid time format. Only a time in 
UTC timezone without milliseconds is supported yet. " +
-                                               "Format: HH:mm:ss'Z'");
+       /*
+               Runtime converter
+        */
+
+       /**
+        * Runtime converter that maps between {@link JsonNode}s and Java 
objects.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+               Object convert(ObjectMapper mapper, JsonNode jsonNode);
+       }
+
+       private DeserializationRuntimeConverter 
createConverter(TypeInformation<?> typeInfo) {
+               DeserializationRuntimeConverter baseConverter = 
createConverterForSimpleType(typeInfo)
+                       .orElseGet(() ->
+                               createContainerConverter(typeInfo)
+                                       .orElseGet(() -> 
createFallbackConverter(typeInfo.getTypeClass())));
+               return wrapIntoNullableConverter(baseConverter);
+       }
+
+       private DeserializationRuntimeConverter 
wrapIntoNullableConverter(DeserializationRuntimeConverter converter) {
+               return (mapper, jsonNode) -> {
+                       if (jsonNode.isNull()) {
+                               return null;
                        }
-                       return Time.valueOf(time.substring(0, time.length() - 
1));
-               } else if (info == Types.SQL_TIMESTAMP) {
+
+                       return converter.convert(mapper, jsonNode);
+               };
+       }
+
+       private Optional<DeserializationRuntimeConverter> 
createContainerConverter(TypeInformation<?> typeInfo) {
+               if (typeInfo instanceof RowTypeInfo) {
+                       return Optional.of(createRowConverter((RowTypeInfo) 
typeInfo));
+               } else if (typeInfo instanceof ObjectArrayTypeInfo) {
+                       return 
Optional.of(createObjectArrayConverter(((ObjectArrayTypeInfo) 
typeInfo).getComponentInfo()));
+               } else if (typeInfo instanceof BasicArrayTypeInfo) {
+                       return 
Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo()));
+               } else if (isPrimitiveByteArray(typeInfo)) {
+                       return Optional.of(createByteArrayConverter());
+               } else {
+                       return Optional.empty();
+               }
+       }
+
+       private DeserializationRuntimeConverter createByteArrayConverter() {
+               return (mapper, jsonNode) -> {
+                       try {
+                               return jsonNode.binaryValue();
+                       } catch (IOException e) {
+                               throw new WrappingRuntimeException("Unable to 
deserialize byte array.", e);
+                       }
+               };
+       }
+
+       private boolean isPrimitiveByteArray(TypeInformation<?> typeInfo) {
+               return typeInfo instanceof PrimitiveArrayTypeInfo &&
+                       ((PrimitiveArrayTypeInfo) typeInfo).getComponentType() 
== Types.BYTE;
+       }
+
+       private DeserializationRuntimeConverter 
createObjectArrayConverter(TypeInformation elementTypeInfo) {
+               DeserializationRuntimeConverter elementConverter = 
createConverter(elementTypeInfo);
+               return assembleArrayConverter(elementConverter);
+       }
+
+       private DeserializationRuntimeConverter createRowConverter(RowTypeInfo 
typeInfo) {
+               List<DeserializationRuntimeConverter> fieldConverters = 
Arrays.stream(typeInfo.getFieldTypes())
+                       .map(this::createConverter)
+                       .collect(Collectors.toList());
+
+               return assembleRowConverter(typeInfo.getFieldNames(), 
fieldConverters);
+       }
+
+       private DeserializationRuntimeConverter 
createFallbackConverter(Class<?> valueType) {
+               return (mapper, jsonNode) -> {
+                       // for types that were specified without JSON schema
+                       // e.g. POJOs
+                       try {
+                               return mapper.treeToValue(jsonNode, valueType);
+                       } catch (JsonProcessingException e) {
+                               throw new 
WrappingRuntimeException(format("Could not convert node: %s", jsonNode), e);
+                       }
+               };
+       }
+
+       private Optional<DeserializationRuntimeConverter> 
createConverterForSimpleType(TypeInformation<?> simpleTypeInfo) {
+               if (simpleTypeInfo == Types.VOID) {
+                       return Optional.of((mapper, jsonNode) -> null);
+               } else if (simpleTypeInfo == Types.BOOLEAN) {
+                       return Optional.of((mapper, jsonNode) -> 
jsonNode.asBoolean());
+               } else if (simpleTypeInfo == Types.STRING) {
+                       return Optional.of((mapper, jsonNode) -> 
jsonNode.asText());
+               } else if (simpleTypeInfo == Types.INT) {
+                       return Optional.of((mapper, jsonNode) -> 
jsonNode.asInt());
+               } else if (simpleTypeInfo == Types.LONG) {
+                       return Optional.of((mapper, jsonNode) -> 
jsonNode.asLong());
+               } else if (simpleTypeInfo == Types.DOUBLE) {
+                       return Optional.of((mapper, jsonNode) -> 
jsonNode.asDouble());
+               } else if (simpleTypeInfo == Types.FLOAT) {
+                       return Optional.of((mapper, jsonNode) -> 
Float.parseFloat(jsonNode.asText().trim()));
+               } else if (simpleTypeInfo == Types.SHORT) {
+                       return Optional.of((mapper, jsonNode) -> 
Short.parseShort(jsonNode.asText().trim()));
+               } else if (simpleTypeInfo == Types.BYTE) {
+                       return Optional.of((mapper, jsonNode) -> 
Byte.parseByte(jsonNode.asText().trim()));
+               } else if (simpleTypeInfo == Types.BIG_DEC) {
+                       return Optional.of((mapper, jsonNode) -> 
jsonNode.decimalValue());
+               } else if (simpleTypeInfo == Types.BIG_INT) {
+                       return Optional.of((mapper, jsonNode) -> 
jsonNode.bigIntegerValue());
+               } else if (simpleTypeInfo == Types.SQL_DATE) {
+                       return Optional.of(createDateConverter());
+               } else if (simpleTypeInfo == Types.SQL_TIME) {
+                       return Optional.of(createTimeConverter());
+               } else if (simpleTypeInfo == Types.SQL_TIMESTAMP) {
+                       return Optional.of(createTimestampConverter());
+               } else {
+                       return Optional.empty();
+               }
+       }
+
+       private DeserializationRuntimeConverter createDateConverter() {
+               return (mapper, jsonNode) -> 
Date.valueOf(ISO_LOCAL_DATE.parse(jsonNode.asText())
+                       .query(TemporalQueries.localDate()));
+       }
+
+       private DeserializationRuntimeConverter createTimestampConverter() {
+               return (mapper, jsonNode) -> {
                        // according to RFC 3339 every date-time must have a 
timezone;
                        // until we have full timezone support, we only support 
UTC;
                        // users can parse their time as string as a workaround
-                       final String timestamp = node.asText();
-                       if (timestamp.indexOf('Z') < 0) {
+                       TemporalAccessor parsedTimestamp = 
RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+
+                       ZoneOffset zoneOffset = 
parsedTimestamp.query(TemporalQueries.offset());
+
+                       if (zoneOffset != null && zoneOffset.getTotalSeconds() 
!= 0) {
                                throw new IllegalStateException(
                                        "Invalid timestamp format. Only a 
timestamp in UTC timezone is supported yet. " +
                                                "Format: 
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
                        }
-                       return Timestamp.valueOf(timestamp.substring(0, 
timestamp.length() - 1).replace('T', ' '));
-               } else if (info instanceof RowTypeInfo) {
-                       return convertRow(node, (RowTypeInfo) info);
-               } else if (info instanceof ObjectArrayTypeInfo) {
-                       return convertObjectArray(node, ((ObjectArrayTypeInfo) 
info).getComponentInfo());
-               } else if (info instanceof BasicArrayTypeInfo) {
-                       return convertObjectArray(node, ((BasicArrayTypeInfo) 
info).getComponentInfo());
-               } else if (info instanceof PrimitiveArrayTypeInfo &&
-                               ((PrimitiveArrayTypeInfo) 
info).getComponentType() == Types.BYTE) {
-                       return convertByteArray(node);
-               } else {
-                       // for types that were specified without JSON schema
-                       // e.g. POJOs
-                       try {
-                               return objectMapper.treeToValue(node, 
info.getTypeClass());
-                       } catch (JsonProcessingException e) {
-                               throw new IllegalStateException("Unsupported 
type information '" + info + "' for node: " + node);
+
+                       LocalTime localTime = 
parsedTimestamp.query(TemporalQueries.localTime());
+                       LocalDate localDate = 
parsedTimestamp.query(TemporalQueries.localDate());
+
+                       return Timestamp.valueOf(LocalDateTime.of(localDate, 
localTime));
+               };
+       }
+
+       private DeserializationRuntimeConverter createTimeConverter() {
+               return (mapper, jsonNode) -> {
+
+                       // according to RFC 3339 every full-time must have a 
timezone;
+                       // until we have full timezone support, we only support 
UTC;
+                       // users can parse their time as string as a workaround
+                       TemporalAccessor parsedTime = 
RFC3339_TIME_FORMAT.parse(jsonNode.asText());
+
+                       ZoneOffset zoneOffset = 
parsedTime.query(TemporalQueries.offset());
+                       LocalTime localTime = 
parsedTime.query(TemporalQueries.localTime());
+
+                       if (zoneOffset != null && zoneOffset.getTotalSeconds() 
!= 0 || localTime.getNano() != 0) {
+                               throw new IllegalStateException(
+                                       "Invalid time format. Only a time in 
UTC timezone without milliseconds is supported yet.");
                        }
-               }
+
+                       return Time.valueOf(localTime);
+               };
        }
 
-       private Row convertRow(JsonNode node, RowTypeInfo info) {
-               final String[] names = info.getFieldNames();
-               final TypeInformation<?>[] types = info.getFieldTypes();
-
-               final Row row = new Row(names.length);
-               for (int i = 0; i < names.length; i++) {
-                       final String name = names[i];
-                       final JsonNode subNode = node.get(name);
-                       if (subNode == null) {
-                               if (failOnMissingField) {
-                                       throw new IllegalStateException(
-                                               "Could not find field with name 
'" + name + "'.");
-                               } else {
-                                       row.setField(i, null);
-                               }
-                       } else {
-                               row.setField(i, convert(subNode, types[i]));
+       private DeserializationRuntimeConverter assembleRowConverter(
+               String[] fieldNames,
+               List<DeserializationRuntimeConverter> fieldConverters) {
+               return (mapper, jsonNode) -> {
+                       ObjectNode node = (ObjectNode) jsonNode;
+
+                       int arity = fieldNames.length;
+                       Row row = new Row(arity);
+                       for (int i = 0; i < arity; i++) {
+                               String fieldName = fieldNames[i];
+                               JsonNode field = node.get(fieldName);
+                               Object convertField = convertField(mapper, 
fieldConverters.get(i), fieldName, field);
+                               row.setField(i, convertField);
                        }
-               }
 
-               return row;
+                       return row;
+               };
        }
 
-       private Object convertObjectArray(JsonNode node, TypeInformation<?> 
elementType) {
-               final Object[] array = (Object[]) 
Array.newInstance(elementType.getTypeClass(), node.size());
-               for (int i = 0; i < node.size(); i++) {
-                       array[i] = convert(node.get(i), elementType);
+       private Object convertField(
+               ObjectMapper mapper,
+               DeserializationRuntimeConverter fieldConverter,
+               String fieldName,
+               JsonNode field) {
+               if (field == null) {
+                       if (failOnMissingField) {
+                               throw new IllegalStateException(
+                                       "Could not find field with name '" + 
fieldName + "'.");
+                       } else {
+                               return null;
+                       }
+               } else {
+                       return fieldConverter.convert(mapper, field);
                }
-               return array;
        }
 
-       private Object convertByteArray(JsonNode node) {
-               try {
-                       return node.binaryValue();
-               } catch (IOException e) {
-                       throw new RuntimeException("Unable to deserialize byte 
array.", e);
-               }
+       private DeserializationRuntimeConverter 
assembleArrayConverter(DeserializationRuntimeConverter elementConverter) {
+               return (mapper, jsonNode) -> {
+                       ArrayNode node = (ArrayNode) jsonNode;
+
+                       return stream(spliterator(node.elements(), node.size(), 
0), false)
+                               .map(innerNode -> 
elementConverter.convert(mapper, innerNode))
+                               .toArray();
+               };
        }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
index 567bef3..af758b6 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
@@ -58,12 +58,17 @@ public class JsonRowFormatFactory extends 
TableFormatFactoryBase<Row>
                final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
 
                // create and configure
-               final JsonRowDeserializationSchema schema = new 
JsonRowDeserializationSchema(createTypeInformation(descriptorProperties));
+               final JsonRowDeserializationSchema.Builder schema =
+                       new 
JsonRowDeserializationSchema.Builder(createTypeInformation(descriptorProperties));
 
                
descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
-                               .ifPresent(schema::setFailOnMissingField);
+                       .ifPresent(flag -> {
+                               if (flag) {
+                                       schema.failOnMissingField();
+                               }
+                       });
 
-               return schema;
+               return schema.build();
        }
 
        @Override
@@ -71,7 +76,7 @@ public class JsonRowFormatFactory extends 
TableFormatFactoryBase<Row>
                final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
 
                // create and configure
-               return new 
JsonRowSerializationSchema(createTypeInformation(descriptorProperties));
+               return new 
JsonRowSerializationSchema.Builder(createTypeInformation(descriptorProperties)).build();
        }
 
        private TypeInformation<Row> createTypeInformation(DescriptorProperties 
descriptorProperties) {
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index d942062..0aa7151 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -27,19 +27,32 @@ import 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Serialization schema that serializes an object of Flink types into a JSON 
bytes.
@@ -55,42 +68,61 @@ public class JsonRowSerializationSchema implements 
SerializationSchema<Row> {
        private static final long serialVersionUID = -2885556750743978636L;
 
        /** Type information describing the input type. */
-       private final TypeInformation<Row> typeInfo;
+       private final RowTypeInfo typeInfo;
 
        /** Object mapper that is used to create output JSON objects. */
        private final ObjectMapper mapper = new ObjectMapper();
 
-       /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone, without milliseconds). */
-       private SimpleDateFormat timeFormat = new 
SimpleDateFormat("HH:mm:ss'Z'");
-
-       /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone). */
-       private SimpleDateFormat timeFormatWithMillis = new 
SimpleDateFormat("HH:mm:ss.SSS'Z'");
-
-       /** Formatter for RFC 3339-compliant string representation of a 
timestamp value (with UTC timezone). */
-       private SimpleDateFormat timestampFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+       private final SerializationRuntimeConverter runtimeConverter;
 
        /** Reusable object node. */
        private transient ObjectNode node;
 
        /**
-        * Creates a JSON serialization schema for the given type information.
-        *
-        * @param typeInfo The field names of {@link Row} are used to map to 
JSON properties.
+        * @deprecated Use the provided {@link Builder} instead.
         */
+       @Deprecated
        public JsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
+               // TODO make this constructor private in the future
                Preconditions.checkNotNull(typeInfo, "Type information");
-               this.typeInfo = typeInfo;
+               Preconditions.checkArgument(typeInfo instanceof RowTypeInfo, 
"Only RowTypeInfo is supported");
+               this.typeInfo = (RowTypeInfo) typeInfo;
+               this.runtimeConverter = createConverter(typeInfo);
        }
 
        /**
-        * Creates a JSON serialization schema for the given JSON schema.
-        *
-        * @param jsonSchema JSON schema describing the result type
-        *
-        * @see <a href="http://json-schema.org/";>http://json-schema.org/</a>
+        * Builder for {@link JsonRowSerializationSchema}.
         */
-       public JsonRowSerializationSchema(String jsonSchema) {
-               this(JsonRowSchemaConverter.convert(jsonSchema));
+       @PublicEvolving
+       public static class Builder {
+
+               private final RowTypeInfo typeInfo;
+
+               /**
+                * Creates a JSON serialization schema for the given type 
information.
+                *
+                * @param typeInfo Type information describing the result type. 
The field names of {@link Row}
+                *                 are used to parse the JSON properties.
+                */
+               public Builder(TypeInformation<Row> typeInfo) {
+                       checkArgument(typeInfo instanceof RowTypeInfo, "Only 
RowTypeInfo is supported");
+                       this.typeInfo = (RowTypeInfo) typeInfo;
+               }
+
+               /**
+                * Creates a JSON serialization schema for the given JSON 
schema.
+                *
+                * @param jsonSchema JSON schema describing the result type
+                *
+                * @see <a 
href="http://json-schema.org/";>http://json-schema.org/</a>
+                */
+               public Builder(String jsonSchema) {
+                       
this(JsonRowSchemaConverter.convert(checkNotNull(jsonSchema)));
+               }
+
+               public JsonRowSerializationSchema build() {
+                       return new JsonRowSerializationSchema(typeInfo);
+               }
        }
 
        @Override
@@ -100,7 +132,7 @@ public class JsonRowSerializationSchema implements 
SerializationSchema<Row> {
                }
 
                try {
-                       convertRow(node, (RowTypeInfo) typeInfo, row);
+                       runtimeConverter.convert(mapper, node, row);
                        return mapper.writeValueAsBytes(node);
                } catch (Throwable t) {
                        throw new RuntimeException("Could not serialize row '" 
+ row + "'. " +
@@ -125,102 +157,204 @@ public class JsonRowSerializationSchema implements 
SerializationSchema<Row> {
                return Objects.hash(typeInfo);
        }
 
-       // 
--------------------------------------------------------------------------------------------
+       /*
+               Runtime converters
+        */
 
-       private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row 
row) {
-               if (reuse == null) {
-                       reuse = mapper.createObjectNode();
-               }
-               final String[] fieldNames = info.getFieldNames();
-               final TypeInformation<?>[] fieldTypes = info.getFieldTypes();
+       /**
+        * Runtime converter that maps between Java objects and corresponding 
{@link JsonNode}s.
+        */
+       @FunctionalInterface
+       private interface SerializationRuntimeConverter extends Serializable {
+               JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object 
object);
+       }
+
+       private SerializationRuntimeConverter 
createConverter(TypeInformation<?> typeInfo) {
+               SerializationRuntimeConverter baseConverter = 
createConverterForSimpleType(typeInfo)
+                       .orElseGet(() ->
+                               createContainerConverter(typeInfo)
+                                       
.orElseGet(this::createFallbackConverter));
+               return wrapIntoNullableConverter(baseConverter);
+       }
+
+       private SerializationRuntimeConverter 
wrapIntoNullableConverter(SerializationRuntimeConverter converter) {
+               return (mapper, reuse, object) -> {
+                       if (object == null) {
+                               return mapper.getNodeFactory().nullNode();
+                       }
 
-               // validate the row
-               if (row.getArity() != fieldNames.length) {
-                       throw new IllegalStateException(String.format(
-                               "Number of elements in the row '%s' is 
different from number of field names: %d", row, fieldNames.length));
+                       return converter.convert(mapper, reuse, object);
+               };
+       }
+
+       private Optional<SerializationRuntimeConverter> 
createContainerConverter(TypeInformation<?> typeInfo) {
+               if (typeInfo instanceof RowTypeInfo) {
+                       return Optional.of(createRowConverter((RowTypeInfo) 
typeInfo));
+               } else if (typeInfo instanceof ObjectArrayTypeInfo) {
+                       return 
Optional.of(createObjectArrayConverter(((ObjectArrayTypeInfo) 
typeInfo).getComponentInfo()));
+               } else if (typeInfo instanceof BasicArrayTypeInfo) {
+                       return 
Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo()));
+               } else if (isPrimitiveByteArray(typeInfo)) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().binaryNode((byte[]) object));
+               } else {
+                       return Optional.empty();
                }
+       }
+
+       private boolean isPrimitiveByteArray(TypeInformation<?> typeInfo) {
+               return typeInfo instanceof PrimitiveArrayTypeInfo &&
+                       ((PrimitiveArrayTypeInfo) typeInfo).getComponentType() 
== Types.BYTE;
+       }
+
+       private SerializationRuntimeConverter 
createObjectArrayConverter(TypeInformation elementTypeInfo) {
+               SerializationRuntimeConverter elementConverter = 
createConverter(elementTypeInfo);
+               return assembleArrayConverter(elementConverter);
+       }
 
-               for (int i = 0; i < fieldNames.length; i++) {
-                       final String name = fieldNames[i];
+       private SerializationRuntimeConverter createRowConverter(RowTypeInfo 
typeInfo) {
+               List<SerializationRuntimeConverter> fieldConverters = 
Arrays.stream(typeInfo.getFieldTypes())
+                       .map(this::createConverter)
+                       .collect(Collectors.toList());
+
+               return assembleRowConverter(typeInfo.getFieldNames(), 
fieldConverters);
+       }
 
-                       final JsonNode fieldConverted = convert(reuse, 
reuse.get(name), fieldTypes[i], row.getField(i));
-                       reuse.set(name, fieldConverted);
+       private SerializationRuntimeConverter createFallbackConverter() {
+               return (mapper, reuse, object) -> {
+                       // for types that were specified without JSON schema
+                       // e.g. POJOs
+                       try {
+                               return mapper.valueToTree(object);
+                       } catch (IllegalArgumentException e) {
+                               throw new 
WrappingRuntimeException(format("Could not convert object: %s", object), e);
+                       }
+               };
+       }
+
+       private Optional<SerializationRuntimeConverter> 
createConverterForSimpleType(TypeInformation<?> simpleTypeInfo) {
+               if (simpleTypeInfo == Types.VOID) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().nullNode());
+               } else if (simpleTypeInfo == Types.BOOLEAN) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().booleanNode((Boolean) object));
+               } else if (simpleTypeInfo == Types.STRING) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().textNode((String) object));
+               } else if (simpleTypeInfo == Types.INT) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().numberNode((Integer) object));
+               } else if (simpleTypeInfo == Types.LONG) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().numberNode((Long) object));
+               } else if (simpleTypeInfo == Types.DOUBLE) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().numberNode((Double) object));
+               } else if (simpleTypeInfo == Types.FLOAT) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().numberNode((Float) object));
+               } else if (simpleTypeInfo == Types.SHORT) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().numberNode((Short) object));
+               } else if (simpleTypeInfo == Types.BYTE) {
+                       return Optional.of((mapper, reuse, object) -> 
mapper.getNodeFactory().numberNode((Byte) object));
+               } else if (simpleTypeInfo == Types.BIG_DEC) {
+                       return Optional.of(createBigDecimalConverter());
+               } else if (simpleTypeInfo == Types.BIG_INT) {
+                       return Optional.of(createBigIntegerConverter());
+               } else if (simpleTypeInfo == Types.SQL_DATE) {
+                       return Optional.of(createDateConverter());
+               } else if (simpleTypeInfo == Types.SQL_TIME) {
+                       return Optional.of(createTimeConverter());
+               } else if (simpleTypeInfo == Types.SQL_TIMESTAMP) {
+                       return Optional.of(createTimestampConverter());
+               } else {
+                       return Optional.empty();
                }
+       }
+
+       private SerializationRuntimeConverter createDateConverter() {
+               return (mapper, reuse, object) -> {
+                       Date date = (Date) object;
+
+                       return 
mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format(date.toLocalDate()));
+               };
+       }
+
+       private SerializationRuntimeConverter createTimestampConverter() {
+               return (mapper, reuse, object) -> {
+                       Timestamp timestamp = (Timestamp) object;
 
-               return reuse;
+                       return mapper.getNodeFactory()
+                               
.textNode(RFC3339_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+               };
        }
 
-       private JsonNode convert(ContainerNode<?> container, JsonNode reuse, 
TypeInformation<?> info, Object object) {
-               if (info == Types.VOID || object == null) {
-                       return container.nullNode();
-               } else if (info == Types.BOOLEAN) {
-                       return container.booleanNode((Boolean) object);
-               } else if (info == Types.STRING) {
-                       return container.textNode((String) object);
-               } else if (info == Types.BIG_DEC) {
+       private SerializationRuntimeConverter createTimeConverter() {
+               return (mapper, reuse, object) -> {
+                       final Time time = (Time) object;
+
+                       JsonNodeFactory nodeFactory = mapper.getNodeFactory();
+                       return 
nodeFactory.textNode(RFC3339_TIME_FORMAT.format(time.toLocalTime()));
+               };
+       }
+
+       private SerializationRuntimeConverter createBigDecimalConverter() {
+               return (mapper, reuse, object) -> {
                        // convert decimal if necessary
+                       JsonNodeFactory nodeFactory = mapper.getNodeFactory();
                        if (object instanceof BigDecimal) {
-                               return container.numberNode((BigDecimal) 
object);
+                               return nodeFactory.numberNode((BigDecimal) 
object);
                        }
-                       return 
container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
-               } else if (info == Types.BIG_INT) {
-                       // convert integer if necessary
+                       return 
nodeFactory.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
+               };
+       }
+
+       private SerializationRuntimeConverter createBigIntegerConverter() {
+               return (mapper, reuse, object) -> {
+                       // convert decimal if necessary
+                       JsonNodeFactory nodeFactory = mapper.getNodeFactory();
                        if (object instanceof BigInteger) {
-                               return container.numberNode((BigInteger) 
object);
+                               return nodeFactory.numberNode((BigInteger) 
object);
                        }
-                       return 
container.numberNode(BigInteger.valueOf(((Number) object).longValue()));
-               } else if (info == Types.SQL_DATE) {
-                       return container.textNode(object.toString());
-               } else if (info == Types.SQL_TIME) {
-                       final Time time = (Time) object;
-                       // strip milliseconds if possible
-                       if (time.getTime() % 1000 > 0) {
-                               return 
container.textNode(timeFormatWithMillis.format(time));
-                       }
-                       return container.textNode(timeFormat.format(time));
-               } else if (info == Types.SQL_TIMESTAMP) {
-                       return 
container.textNode(timestampFormat.format((Timestamp) object));
-               } else if (info instanceof RowTypeInfo) {
-                       if (reuse != null && reuse instanceof ObjectNode) {
-                               return convertRow((ObjectNode) reuse, 
(RowTypeInfo) info, (Row) object);
+                       return 
nodeFactory.numberNode(BigInteger.valueOf(((Number) object).longValue()));
+               };
+       }
+
+       private SerializationRuntimeConverter assembleRowConverter(
+                       String[] fieldNames,
+                       List<SerializationRuntimeConverter> fieldConverters) {
+               return (mapper, reuse, object) -> {
+                       ObjectNode node;
+
+                       if (reuse == null) {
+                               node = mapper.createObjectNode();
                        } else {
-                               return convertRow(null, (RowTypeInfo) info, 
(Row) object);
+                               node = (ObjectNode) reuse;
                        }
-               } else if (info instanceof ObjectArrayTypeInfo) {
-                       if (reuse != null && reuse instanceof ArrayNode) {
-                               return convertObjectArray((ArrayNode) reuse, 
((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
-                       } else {
-                               return convertObjectArray(null, 
((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
+
+                       Row row = (Row) object;
+
+                       for (int i = 0; i < fieldNames.length; i++) {
+                               String fieldName = fieldNames[i];
+                               node.set(fieldName,
+                                       fieldConverters.get(i).convert(mapper, 
node.get(fieldNames[i]), row.getField(i)));
                        }
-               } else if (info instanceof BasicArrayTypeInfo) {
-                       if (reuse != null && reuse instanceof ArrayNode) {
-                               return convertObjectArray((ArrayNode) reuse, 
((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
+
+                       return node;
+               };
+       }
+
+       private SerializationRuntimeConverter 
assembleArrayConverter(SerializationRuntimeConverter elementConverter) {
+               return (mapper, reuse, object) -> {
+                       ArrayNode node;
+
+                       if (reuse == null) {
+                               node = mapper.createArrayNode();
                        } else {
-                               return convertObjectArray(null, 
((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
+                               node = (ArrayNode) reuse;
+                               node.removeAll();
                        }
-               } else if (info instanceof PrimitiveArrayTypeInfo && 
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
-                       return container.binaryNode((byte[]) object);
-               } else {
-                       // for types that were specified without JSON schema
-                       // e.g. POJOs
-                       try {
-                               return mapper.valueToTree(object);
-                       } catch (IllegalArgumentException e) {
-                               throw new IllegalStateException("Unsupported 
type information '" + info + "' for object: " + object, e);
-                       }
-               }
-       }
 
-       private ArrayNode convertObjectArray(ArrayNode reuse, 
TypeInformation<?> info, Object[] array) {
-               if (reuse == null) {
-                       reuse = mapper.createArrayNode();
-               } else {
-                       reuse.removeAll();
-               }
+                       Object[] array = (Object[]) object;
 
-               for (Object object : array) {
-                       reuse.add(convert(reuse, null, info, object));
-               }
-               return reuse;
+                       for (Object element : array) {
+                               node.add(elementConverter.convert(mapper, null, 
element));
+                       }
+
+                       return node;
+               };
        }
 }
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
new file mode 100644
index 0000000..c946c5d
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+
+/**
+ * Time formats respecting the RFC3339 specification.
+ */
+class TimeFormats {
+
+       /** Formatter for RFC 3339-compliant string representation of a time 
value. */
+       static final DateTimeFormatter RFC3339_TIME_FORMAT = new 
DateTimeFormatterBuilder()
+               .appendPattern("HH:mm:ss")
+               .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+               .appendPattern("'Z'")
+               .toFormatter();
+
+       /** Formatter for RFC 3339-compliant string representation of a 
timestamp value (with UTC timezone). */
+       static final DateTimeFormatter RFC3339_TIMESTAMP_FORMAT = new 
DateTimeFormatterBuilder()
+               .append(DateTimeFormatter.ISO_LOCAL_DATE)
+               .appendLiteral('T')
+               .append(RFC3339_TIME_FORMAT)
+               .toFormatter();
+
+       private TimeFormats() {
+       }
+}
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index 5e77b80..0573019 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.json;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 
@@ -27,15 +28,16 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static 
org.apache.flink.formats.utils.DeserializationSchemaMatcher.whenDeserializedWith;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.internal.matchers.ThrowableCauseMatcher.hasCause;
 
 /**
  * Tests for the {@link JsonRowDeserializationSchema}.
@@ -62,18 +64,18 @@ public class JsonRowDeserializationSchemaTest {
 
                byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema.Builder(
                        Types.ROW_NAMED(
-                               new String[] { "id", "name", "bytes" },
+                               new String[]{"id", "name", "bytes"},
                                Types.LONG, Types.STRING, 
Types.PRIMITIVE_ARRAY(Types.BYTE))
-               );
+               ).build();
 
-               Row deserialized = 
deserializationSchema.deserialize(serializedJson);
+               Row row = new Row(3);
+               row.setField(0, id);
+               row.setField(1, name);
+               row.setField(2, bytes);
 
-               assertEquals(3, deserialized.getArity());
-               assertEquals(id, deserialized.getField(0));
-               assertEquals(name, deserialized.getField(1));
-               assertArrayEquals(bytes, (byte[]) deserialized.getField(2));
+               assertThat(serializedJson, 
whenDeserializedWith(deserializationSchema).equalsTo(row));
        }
 
        @Test
@@ -103,7 +105,7 @@ public class JsonRowDeserializationSchemaTest {
 
                final byte[] serializedJson = 
objectMapper.writeValueAsBytes(root);
 
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
+               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema.Builder(
                        "{" +
                        "    type: 'object'," +
                        "    properties: {" +
@@ -124,9 +126,7 @@ public class JsonRowDeserializationSchemaTest {
                        "             }" +
                        "         }" +
                        "    }" +
-                       "}");
-
-               final Row deserialized = 
deserializationSchema.deserialize(serializedJson);
+                       "}").build();
 
                final Row expected = new Row(10);
                expected.setField(0, id);
@@ -143,7 +143,7 @@ public class JsonRowDeserializationSchemaTest {
                nestedRow.setField(1, BigDecimal.valueOf(12));
                expected.setField(9, nestedRow);
 
-               assertEquals(expected, deserialized);
+               assertThat(serializedJson, 
whenDeserializedWith(deserializationSchema).equalsTo(expected));
        }
 
        /**
@@ -158,25 +158,25 @@ public class JsonRowDeserializationSchemaTest {
                root.put("id", 123123123);
                byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
-               JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(
-                       Types.ROW_NAMED(
-                               new String[] { "name" },
-                               Types.STRING)
-               );
+               TypeInformation<Row> rowTypeInformation = Types.ROW_NAMED(
+                       new String[]{"name"},
+                       Types.STRING);
 
-               Row row = deserializationSchema.deserialize(serializedJson);
+               JsonRowDeserializationSchema deserializationSchema =
+                       new 
JsonRowDeserializationSchema.Builder(rowTypeInformation)
+                               .build();
 
-               assertEquals(1, row.getArity());
-               Assert.assertNull("Missing field not null", row.getField(0));
+               Row row = new Row(1);
+               assertThat(serializedJson,
+                       
whenDeserializedWith(deserializationSchema).equalsTo(row));
 
-               deserializationSchema.setFailOnMissingField(true);
+               deserializationSchema = new 
JsonRowDeserializationSchema.Builder(rowTypeInformation)
+                       .failOnMissingField()
+                       .build();
 
-               try {
-                       deserializationSchema.deserialize(serializedJson);
-                       Assert.fail("Did not throw expected Exception");
-               } catch (IOException e) {
-                       Assert.assertTrue(e.getCause() instanceof 
IllegalStateException);
-               }
+               assertThat(serializedJson,
+                       whenDeserializedWith(deserializationSchema)
+                               
.failsWithException(hasCause(instanceOf(IllegalStateException.class))));
        }
 
        /**
@@ -185,10 +185,10 @@ public class JsonRowDeserializationSchemaTest {
        @Test
        public void testNumberOfFieldNamesAndTypesMismatch() {
                try {
-                       new JsonRowDeserializationSchema(
+                       new JsonRowDeserializationSchema.Builder(
                                Types.ROW_NAMED(
                                        new String[]{"one", "two", "three"},
-                                       Types.LONG));
+                                       Types.LONG)).build();
                        Assert.fail("Did not throw expected Exception");
                } catch (IllegalArgumentException ignored) {
                        // Expected
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
index caf99f4..47b06c4 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
@@ -110,8 +110,7 @@ public class JsonRowFormatFactoryTest extends TestLogger {
                final DeserializationSchema<?> actual2 = TableFactoryService
                        .find(DeserializationSchemaFactory.class, properties)
                        .createDeserializationSchema(properties);
-               final JsonRowDeserializationSchema expected2 = new 
JsonRowDeserializationSchema(SCHEMA);
-               expected2.setFailOnMissingField(false);
+               final JsonRowDeserializationSchema expected2 = new 
JsonRowDeserializationSchema.Builder(SCHEMA).build();
                assertEquals(expected2, actual2);
        }
 
@@ -119,7 +118,7 @@ public class JsonRowFormatFactoryTest extends TestLogger {
                final SerializationSchema<?> actual1 = TableFactoryService
                        .find(SerializationSchemaFactory.class, properties)
                        .createSerializationSchema(properties);
-               final SerializationSchema<?> expected1 = new 
JsonRowSerializationSchema(SCHEMA);
+               final SerializationSchema<?> expected1 = new 
JsonRowSerializationSchema.Builder(SCHEMA).build();
                assertEquals(expected1, actual1);
        }
 
@@ -127,8 +126,9 @@ public class JsonRowFormatFactoryTest extends TestLogger {
                final DeserializationSchema<?> actual2 = TableFactoryService
                        .find(DeserializationSchemaFactory.class, properties)
                        .createDeserializationSchema(properties);
-               final JsonRowDeserializationSchema expected2 = new 
JsonRowDeserializationSchema(JSON_SCHEMA);
-               expected2.setFailOnMissingField(true);
+               final JsonRowDeserializationSchema expected2 = new 
JsonRowDeserializationSchema.Builder(JSON_SCHEMA)
+                       .failOnMissingField()
+                       .build();
                assertEquals(expected2, actual2);
        }
 
@@ -136,7 +136,7 @@ public class JsonRowFormatFactoryTest extends TestLogger {
                final SerializationSchema<?> actual1 = TableFactoryService
                        .find(SerializationSchemaFactory.class, properties)
                        .createSerializationSchema(properties);
-               final SerializationSchema<?> expected1 = new 
JsonRowSerializationSchema(JSON_SCHEMA);
+               final SerializationSchema<?> expected1 = new 
JsonRowSerializationSchema.Builder(JSON_SCHEMA).build();
                assertEquals(expected1, actual1);
        }
 
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
index e2410d4..cc1f5bf 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
@@ -30,7 +30,10 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static 
org.apache.flink.formats.utils.SerializationSchemaMatcher.whenSerializedWith;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for the {@link JsonRowSerializationSchema}.
@@ -38,7 +41,7 @@ import static org.junit.Assert.assertEquals;
 public class JsonRowSerializationSchemaTest {
 
        @Test
-       public void testRowSerialization() throws IOException {
+       public void testRowSerialization() {
                final TypeInformation<Row> rowSchema = Types.ROW_NAMED(
                        new String[] {"f1", "f2", "f3"},
                        Types.INT, Types.BOOLEAN, Types.STRING);
@@ -48,8 +51,14 @@ public class JsonRowSerializationSchemaTest {
                row.setField(1, true);
                row.setField(2, "str");
 
-               final Row resultRow = serializeAndDeserialize(rowSchema, row);
-               assertEquals(row, resultRow);
+               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema.Builder(rowSchema)
+                       .build();
+               final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema.Builder(rowSchema)
+                       .build();
+
+               assertThat(row, whenSerializedWith(serializationSchema)
+                       .andDeserializedWith(deserializationSchema)
+                       .equalsTo(row));
        }
 
        @Test
@@ -63,8 +72,10 @@ public class JsonRowSerializationSchemaTest {
                row1.setField(1, true);
                row1.setField(2, "str");
 
-               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(rowSchema);
-               final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(rowSchema);
+               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema.Builder(rowSchema)
+                       .build();
+               final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema.Builder(rowSchema)
+                       .build();
 
                byte[] bytes = serializationSchema.serialize(row1);
                assertEquals(row1, deserializationSchema.deserialize(bytes));
@@ -79,7 +90,7 @@ public class JsonRowSerializationSchemaTest {
        }
 
        @Test
-       public void testNestedSchema() throws IOException {
+       public void testNestedSchema() {
                final TypeInformation<Row> rowSchema = Types.ROW_NAMED(
                        new String[] {"f1", "f2", "f3"},
                        Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, 
Types.DOUBLE));
@@ -92,25 +103,32 @@ public class JsonRowSerializationSchemaTest {
                nested.setField(1, 2.3);
                row.setField(2, nested);
 
-               final Row resultRow = serializeAndDeserialize(rowSchema, row);
-               assertEquals(row, resultRow);
+               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema.Builder(rowSchema)
+                       .build();
+               final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema.Builder(rowSchema)
+                       .build();
+
+               assertThat(row, whenSerializedWith(serializationSchema)
+                       .andDeserializedWith(deserializationSchema)
+                       .equalsTo(row));
        }
 
-       @Test(expected = RuntimeException.class)
+       @Test
        public void testSerializeRowWithInvalidNumberOfFields() {
                final TypeInformation<Row> rowSchema = Types.ROW_NAMED(
-                       new String[] {"f1", "f2", "f3"},
+                       new String[]{"f1", "f2", "f3"},
                        Types.INT, Types.BOOLEAN, Types.STRING);
 
                final Row row = new Row(1);
                row.setField(0, 1);
 
-               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(rowSchema);
-               serializationSchema.serialize(row);
+               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema.Builder(rowSchema)
+                       .build();
+               assertThat(row, 
whenSerializedWith(serializationSchema).failsWithException(instanceOf(RuntimeException.class)));
        }
 
        @Test
-       public void testSchema() throws IOException {
+       public void testSchema() {
                final TypeInformation<Row> rowSchema = 
JsonRowSchemaConverter.convert(
                        "{" +
                        "    type: 'object'," +
@@ -157,17 +175,14 @@ public class JsonRowSerializationSchemaTest {
                nestedRow.setField(1, BigDecimal.valueOf(12));
                row.setField(10, nestedRow);
 
-               final Row resultRow = serializeAndDeserialize(rowSchema, row);
-               assertEquals(row, resultRow);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
+               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema.Builder(rowSchema)
+                       .build();
+               final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema.Builder(rowSchema)
+                       .build();
 
-       private Row serializeAndDeserialize(TypeInformation<Row> rowSchema, Row 
row) throws IOException {
-               final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(rowSchema);
-               final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(rowSchema);
-
-               final byte[] bytes = serializationSchema.serialize(row);
-               return deserializationSchema.deserialize(bytes);
+               assertThat(row, whenSerializedWith(serializationSchema)
+                       .andDeserializedWith(deserializationSchema)
+                       .equalsTo(row));
        }
+
 }
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/DeserializationSchemaMatcher.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/DeserializationSchemaMatcher.java
new file mode 100644
index 0000000..c5b19e7
--- /dev/null
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/DeserializationSchemaMatcher.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.utils;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.InstantiationUtil.deserializeObject;
+import static org.apache.flink.util.InstantiationUtil.serializeObject;
+
+/**
+ * Matcher that provides a common way for asserting results of {@link 
DeserializationSchema}. It takes into account
+ * e.g. the fact that serialization schema during runtime might be used after 
serializing it over a wire. Usage:
+ *
+ * <ul>
+ * <li>when asserting for result after deserializing a row
+ * <pre>{@code
+ *      byte[] jsonBytes = ...
+ *      Row expectedRow = ...
+ *      final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(rowSchema);
+ *
+ *      assertThat(jsonBytes, whenDeserializedWith(deserializationSchema)
+ *          .matches(expectedRow));
+ * }</pre>
+ * </li>
+ *
+ * <li>to check if an exception is thrown during serialization:
+ * <pre>{@code
+ *      assertThat(serializedJson,
+ *          whenDeserializedWith(deserializationSchema)
+ *              
.failsWithException(hasCause(instanceOf(IllegalStateException.class))));
+ * }</pre>
+ * </li>
+ * </ul>
+ */
+public abstract class DeserializationSchemaMatcher extends 
TypeSafeMatcher<byte[]> {
+
+       final DeserializationSchema<Row> deserializationSchema;
+
+       private DeserializationSchemaMatcher(DeserializationSchema<Row> 
deserializationSchema) {
+               this.deserializationSchema = deserializationSchema;
+       }
+
+       public static DeserializationSchemaMatcherBuilder 
whenDeserializedWith(DeserializationSchema<Row> deserializationSchema) {
+               return new 
DeserializationSchemaMatcherBuilder(deserializationSchema);
+       }
+
+       private static class DeserializationSchemaResultMatcher extends 
DeserializationSchemaMatcher {
+
+               private final Row expected;
+
+               private DeserializationSchemaResultMatcher(
+                       DeserializationSchema<Row> deserializationSchema,
+                       Row expected) {
+                       super(deserializationSchema);
+
+                       this.expected = expected;
+               }
+
+               @Override
+               protected boolean matchesSafely(byte[] item) {
+                       try {
+                               return 
Objects.deepEquals(deserializationSchema.deserialize(item), expected);
+                       } catch (IOException e) {
+                               throw new AssertionError("Could not 
deserialize", e);
+                       }
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendValue(expected);
+               }
+       }
+
+       private static class DeserializationSchemaExceptionMatcher extends 
DeserializationSchemaMatcher {
+
+               private final Matcher<? extends Throwable> exceptionMatcher;
+               private Throwable thrownException = null;
+
+               private DeserializationSchemaExceptionMatcher(
+                       DeserializationSchema<Row> deserializationSchema,
+                       Matcher<? extends Throwable> exceptionMatcher) {
+                       super(deserializationSchema);
+                       this.exceptionMatcher = exceptionMatcher;
+               }
+
+               @Override
+               protected boolean matchesSafely(byte[] item) {
+                       try {
+                               deserializationSchema.deserialize(item);
+                       } catch (IOException e) {
+                               thrownException = e;
+                       }
+                       return exceptionMatcher.matches(thrownException);
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       exceptionMatcher.describeTo(description);
+               }
+
+               @Override
+               protected void describeMismatchSafely(byte[] item, Description 
mismatchDescription) {
+                       exceptionMatcher.describeMismatch(thrownException, 
mismatchDescription);
+               }
+       }
+
+       /**
+        * Builder for {@link DeserializationSchemaMatcher}.
+        */
+       public static class DeserializationSchemaMatcherBuilder {
+
+               private DeserializationSchema<Row> deserializationSchema;
+
+               private 
DeserializationSchemaMatcherBuilder(DeserializationSchema<Row> 
deserializationSchema) {
+                       try {
+                               // we serialize and deserialize the schema to 
test runtime behavior
+                               // when the schema is shipped to the cluster
+                               this.deserializationSchema = deserializeObject(
+                                       serializeObject(deserializationSchema),
+                                       this.getClass().getClassLoader());
+                       } catch (IOException | ClassNotFoundException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               public DeserializationSchemaMatcher equalsTo(Row row) {
+                       return new DeserializationSchemaResultMatcher(
+                               deserializationSchema,
+                               row
+                       );
+               }
+
+               public DeserializationSchemaMatcher 
failsWithException(Matcher<? extends Throwable> exceptionMatcher) {
+                       return new DeserializationSchemaExceptionMatcher(
+                               deserializationSchema,
+                               exceptionMatcher
+                       );
+               }
+       }
+}
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/SerializationSchemaMatcher.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/SerializationSchemaMatcher.java
new file mode 100644
index 0000000..4cda5dd
--- /dev/null
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/SerializationSchemaMatcher.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.utils;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.InstantiationUtil.deserializeObject;
+import static org.apache.flink.util.InstantiationUtil.serializeObject;
+
+/**
+ * Matcher that provides a common way for asserting results of {@link 
SerializationSchema}. It takes into account
+ * e.g. the fact that serialization schema during runtime might be used after 
serializing and deserializing it over
+ * a wire. Usage:
+ *
+ * <ul>
+ * <li>when asserting for result after serializing and deserializing a row
+ * <pre>{@code
+ *      final JsonRowSerializationSchema serializationSchema = new 
JsonRowSerializationSchema(rowSchema);
+ *      final JsonRowDeserializationSchema deserializationSchema = new 
JsonRowDeserializationSchema(rowSchema);
+ *
+ *      assertThat(row, whenSerializedWith(serializationSchema)
+ *          .andDeserializedWith(deserializationSchema)
+ *          .matches(row));
+ * }</pre>
+ * </li>
+ *
+ * <li>to check if an exception is thrown during serialization:
+ * <pre>{@code
+ *      assertThat(row, 
whenSerializedWith(serializationSchema).failsWithException(instanceOf(RuntimeException.class)));
+ * }</pre>
+ * </li>
+ * </ul>
+ */
+public abstract class SerializationSchemaMatcher extends TypeSafeMatcher<Row> {
+
+       final SerializationSchema<Row> serializationSchema;
+
+       private SerializationSchemaMatcher(SerializationSchema<Row> 
serializationSchema) {
+               this.serializationSchema = serializationSchema;
+       }
+
+       public static SerializationSchemaMatcherBuilder 
whenSerializedWith(SerializationSchema<Row> serializationSchema) {
+               return new 
SerializationSchemaMatcherBuilder(serializationSchema);
+       }
+
+       private static class SerializationSchemaResultMatcher extends 
SerializationSchemaMatcher {
+
+               private final Row expected;
+               private final DeserializationSchema<Row> deserializationSchema;
+
+               private SerializationSchemaResultMatcher(
+                               SerializationSchema<Row> serializationSchema,
+                               DeserializationSchema<Row> 
deserializationSchema,
+                               Row expected) {
+                       super(serializationSchema);
+
+                       this.expected = expected;
+                       this.deserializationSchema = deserializationSchema;
+               }
+
+               @Override
+               protected boolean matchesSafely(Row item) {
+                       try {
+                               return Objects.deepEquals(
+                                       
deserializationSchema.deserialize(serializationSchema.serialize(item)),
+                                       expected);
+                       } catch (IOException e) {
+                               throw new AssertionError("Could not 
deserialize", e);
+                       }
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendValue(expected);
+               }
+       }
+
+       private static class SerializationSchemaExceptionMatcher extends 
SerializationSchemaMatcher {
+
+               private final Matcher<? extends Throwable> exceptionMatcher;
+               private Throwable thrownException = null;
+
+               private SerializationSchemaExceptionMatcher(
+                               SerializationSchema<Row> serializationSchema,
+                               Matcher<? extends Throwable> exceptionMatcher) {
+                       super(serializationSchema);
+                       this.exceptionMatcher = exceptionMatcher;
+               }
+
+               @Override
+               protected boolean matchesSafely(Row item) {
+                       try {
+                               serializationSchema.serialize(item);
+                       } catch (Exception e) {
+                               thrownException = e;
+                       }
+                       return exceptionMatcher.matches(thrownException);
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       exceptionMatcher.describeTo(description);
+               }
+
+               @Override
+               protected void describeMismatchSafely(Row item, Description 
mismatchDescription) {
+                       exceptionMatcher.describeMismatch(thrownException, 
mismatchDescription);
+               }
+       }
+
+       /**
+        * Builder for {@link SerializationSchemaMatcher} that can assert 
results after serialize and deserialize.
+        */
+       public static class 
SerializationWithDeserializationSchemaMatcherBuilder {
+
+               private SerializationSchema<Row> serializationSchema;
+               private DeserializationSchema<Row> deserializationSchema;
+
+               private SerializationWithDeserializationSchemaMatcherBuilder(
+                       SerializationSchema<Row> serializationSchema,
+                       DeserializationSchema<Row> deserializationSchema) {
+                       try {
+                               // we serialize and deserialize the schema to 
test runtime behavior
+                               // when the schema is shipped to the cluster
+                               this.serializationSchema = deserializeObject(
+                                       serializeObject(serializationSchema),
+                                       this.getClass().getClassLoader());
+                               this.deserializationSchema = deserializeObject(
+                                       serializeObject(deserializationSchema),
+                                       this.getClass().getClassLoader());
+                       } catch (IOException | ClassNotFoundException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               public SerializationSchemaMatcher equalsTo(Row expected) {
+                       return new SerializationSchemaResultMatcher(
+                               serializationSchema,
+                               deserializationSchema,
+                               expected
+                       );
+               }
+       }
+
+       /**
+        * Builder for {@link SerializationSchemaMatcher}.
+        */
+       public static class SerializationSchemaMatcherBuilder {
+
+               private SerializationSchema<Row> serializationSchema;
+
+               private 
SerializationSchemaMatcherBuilder(SerializationSchema<Row> serializationSchema) 
{
+                       this.serializationSchema = serializationSchema;
+               }
+
+               public SerializationWithDeserializationSchemaMatcherBuilder 
andDeserializedWith(DeserializationSchema<Row> deserializationSchema) {
+                       return new 
SerializationWithDeserializationSchemaMatcherBuilder(serializationSchema, 
deserializationSchema);
+               }
+
+               public SerializationSchemaMatcher failsWithException(Matcher<? 
extends Throwable> exceptionMatcher) {
+                       return new SerializationSchemaExceptionMatcher(
+                               serializationSchema,
+                               exceptionMatcher
+                       );
+               }
+       }
+}

Reply via email to