This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f76042  [FLINK-17629][json] Implement format factory for JSON 
serialization and deserialization schema
2f76042 is described below

commit 2f76042f3cc80bf5468fe383b0c57887840efffb
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Fri May 15 09:17:23 2020 +0800

    [FLINK-17629][json] Implement format factory for JSON serialization and 
deserialization schema
    
    This closes #12140
---
 flink-formats/flink-json/pom.xml                   |   9 ++
 .../flink/formats/json/JsonFormatFactory.java      | 159 +++++++++++++++++++
 .../json/JsonRowDataDeserializationSchema.java     |  20 +++
 .../json/JsonRowDataSerializationSchema.java       |  22 +++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../flink/formats/json/JsonFormatFactoryTest.java  | 175 +++++++++++++++++++++
 6 files changed, 401 insertions(+)

diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index 19d5045..0d31525 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -100,6 +100,15 @@ under the License.
                        <artifactId>scala-compiler</artifactId>
                        <scope>test</scope>
                </dependency>
+
+               <!-- JSON RowData format factory testing -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
        </dependencies>
 
        <profiles>
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
new file mode 100644
index 0000000..07e6d2d
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Table format factory for providing configured instances of JSON to RowData
+ * {@link SerializationSchema} and {@link DeserializationSchema}.
+ */
+public class JsonFormatFactory implements
+               DeserializationFormatFactory,
+               SerializationFormatFactory {
+
+       public static final String IDENTIFIER = "json";
+
+       // 
------------------------------------------------------------------------
+       //  Options
+       // 
------------------------------------------------------------------------
+
+       private static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = 
ConfigOptions
+                       .key("fail-on-missing-field")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("Optional flag to specify whether to 
fail if a field is missing or not, false by default");
+
+       private static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
ConfigOptions
+                       .key("ignore-parse-errors")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
+                                       + "fields are set to null in case of 
errors, false by default");
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+                       DynamicTableFactory.Context context,
+                       ReadableConfig formatOptions) {
+               FactoryUtil.validateFactoryOptions(this, formatOptions);
+               validateFormatOptions(formatOptions);
+
+               final boolean failOnMissingField = 
formatOptions.get(FAIL_ON_MISSING_FIELD);
+               final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
+
+               return new ScanFormat<DeserializationSchema<RowData>>() {
+                       @Override
+                       public DeserializationSchema<RowData> createScanFormat(
+                                       ScanTableSource.Context scanContext,
+                                       DataType producedDataType) {
+                               final RowType rowType = (RowType) 
producedDataType.getLogicalType();
+                               final TypeInformation<RowData> rowDataTypeInfo =
+                                               (TypeInformation<RowData>) 
scanContext.createTypeInformation(producedDataType);
+                               return new JsonRowDataDeserializationSchema(
+                                               rowType,
+                                               rowDataTypeInfo,
+                                               failOnMissingField,
+                                               ignoreParseErrors);
+                       }
+
+                       @Override
+                       public ChangelogMode getChangelogMode() {
+                               return ChangelogMode.insertOnly();
+                       }
+               };
+       }
+
+       @Override
+       public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+                       DynamicTableFactory.Context context,
+                       ReadableConfig formatOptions) {
+               FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+               return new SinkFormat<SerializationSchema<RowData>>() {
+                       @Override
+                       public SerializationSchema<RowData> createSinkFormat(
+                                       DynamicTableSink.Context context,
+                                       DataType consumedDataType) {
+                               final RowType rowType = (RowType) 
consumedDataType.getLogicalType();
+                               return new 
JsonRowDataSerializationSchema(rowType);
+                       }
+
+                       @Override
+                       public ChangelogMode getChangelogMode() {
+                               return ChangelogMode.insertOnly();
+                       }
+               };
+       }
+
+       @Override
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               return Collections.emptySet();
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(FAIL_ON_MISSING_FIELD);
+               options.add(IGNORE_PARSE_ERRORS);
+               return options;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Validation
+       // 
------------------------------------------------------------------------
+
+       private void validateFormatOptions(ReadableConfig tableOptions) {
+               boolean failOnMissingField = 
tableOptions.get(FAIL_ON_MISSING_FIELD);
+               boolean ignoreParseErrors = 
tableOptions.get(IGNORE_PARSE_ERRORS);
+               if (ignoreParseErrors && failOnMissingField) {
+                       throw new 
ValidationException(FAIL_ON_MISSING_FIELD.key()
+                                       + " and "
+                                       + IGNORE_PARSE_ERRORS.key()
+                                       + " shouldn't both be true.");
+               }
+       }
+}
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 974b8dd..87dee7f 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -56,6 +56,7 @@ import java.time.temporal.TemporalQueries;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 
 import static java.lang.String.format;
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
@@ -130,6 +131,25 @@ public class JsonRowDataDeserializationSchema implements 
DeserializationSchema<R
                return resultTypeInfo;
        }
 
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               JsonRowDataDeserializationSchema that = 
(JsonRowDataDeserializationSchema) o;
+               return failOnMissingField == that.failOnMissingField &&
+                               ignoreParseErrors == that.ignoreParseErrors &&
+                               resultTypeInfo.equals(that.resultTypeInfo);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(failOnMissingField, ignoreParseErrors, 
resultTypeInfo);
+       }
+
        // 
-------------------------------------------------------------------------------------
        // Runtime Converters
        // 
-------------------------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 7c17738..263e282 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -42,6 +42,7 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalTime;
 import java.util.Arrays;
+import java.util.Objects;
 
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
 import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
@@ -59,6 +60,9 @@ import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
 public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowData> {
        private static final long serialVersionUID = 1L;
 
+       /** RowType to generate the runtime converter. */
+       private final RowType rowType;
+
        /** The converter that converts internal data formats to JsonNode. */
        private final SerializationRuntimeConverter runtimeConverter;
 
@@ -69,6 +73,7 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
        private transient ObjectNode node;
 
        public JsonRowDataSerializationSchema(RowType rowType) {
+               this.rowType = rowType;
                this.runtimeConverter = createConverter(rowType);
        }
 
@@ -87,6 +92,23 @@ public class JsonRowDataSerializationSchema implements 
SerializationSchema<RowDa
                }
        }
 
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               JsonRowDataSerializationSchema that = 
(JsonRowDataSerializationSchema) o;
+               return rowType.equals(that.rowType);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(rowType);
+       }
+
        // 
--------------------------------------------------------------------------------
        // Runtime Converters
        // 
--------------------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..3a243d0
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.json.JsonFormatFactory
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
new file mode 100644
index 0000000..7638378
--- /dev/null
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link JsonFormatFactory}.
+ */
+public class JsonFormatFactoryTest extends TestLogger {
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       private static final TableSchema SCHEMA = TableSchema.builder()
+                       .field("field1", DataTypes.BOOLEAN())
+                       .field("field2", DataTypes.INT())
+                       .build();
+
+       private static final RowType ROW_TYPE = (RowType) 
SCHEMA.toRowDataType().getLogicalType();
+
+       @Test
+       public void testSeDeSchema() {
+               final Map<String, String> tableOptions = getAllOptions();
+
+               testSchemaSerializationSchema(tableOptions);
+
+               testSchemaDeserializationSchema(tableOptions);
+       }
+
+       @Test
+       public void testFailOnMissingField() {
+               final Map<String, String> tableOptions = getModifyOptions(
+                               options -> 
options.put("json.fail-on-missing-field", "true"));
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
ValidationException("fail-on-missing-field and ignore-parse-errors shouldn't 
both be true.")));
+               testSchemaDeserializationSchema(tableOptions);
+       }
+
+       @Test
+       public void testInvalidOptionForIgnoreParseErrors() {
+               final Map<String, String> tableOptions = getModifyOptions(
+                               options -> 
options.put("json.ignore-parse-errors", "abc"));
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
IllegalArgumentException("Unrecognized option for boolean: abc. Expected either 
true or false(case insensitive)")));
+               testSchemaDeserializationSchema(tableOptions);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private void testSchemaDeserializationSchema(Map<String, String> 
options) {
+               final JsonRowDataDeserializationSchema expectedDeser =
+                               new JsonRowDataDeserializationSchema(
+                                               ROW_TYPE,
+                                               new RowDataTypeInfo(ROW_TYPE),
+                                               false,
+                                               true);
+
+               final DynamicTableSource actualSource = 
createTableSource(options);
+               assert actualSource instanceof 
TestDynamicTableFactory.DynamicTableSourceMock;
+               TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+                               
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+               DeserializationSchema<RowData> actualDeser = 
scanSourceMock.sourceValueFormat
+                               .createScanFormat(
+                                               
ScanRuntimeProviderContext.INSTANCE,
+                                               SCHEMA.toRowDataType());
+
+               assertEquals(expectedDeser, actualDeser);
+       }
+
+       private void testSchemaSerializationSchema(Map<String, String> options) 
{
+               final JsonRowDataSerializationSchema expectedSer = new 
JsonRowDataSerializationSchema(ROW_TYPE);
+
+               final DynamicTableSink actualSink = createTableSink(options);
+               assert actualSink instanceof 
TestDynamicTableFactory.DynamicTableSinkMock;
+               TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                               (TestDynamicTableFactory.DynamicTableSinkMock) 
actualSink;
+
+               SerializationSchema<RowData> actualSer = 
sinkMock.sinkValueFormat
+                               .createSinkFormat(
+                                               new 
SinkRuntimeProviderContext(false),
+                                               SCHEMA.toRowDataType());
+
+               assertEquals(expectedSer, actualSer);
+       }
+
+       /**
+        * Returns the full options modified by the given consumer {@code 
optionModifier}.
+        *
+        * @param optionModifier Consumer to modify the options
+        */
+       private Map<String, String> getModifyOptions(Consumer<Map<String, 
String>> optionModifier) {
+               Map<String, String> options = getAllOptions();
+               optionModifier.accept(options);
+               return options;
+       }
+
+       private Map<String, String> getAllOptions() {
+               final Map<String, String> options = new HashMap<>();
+               options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+               options.put("target", "MyTarget");
+               options.put("buffer-size", "1000");
+
+               options.put("format", JsonFormatFactory.IDENTIFIER);
+               options.put("json.fail-on-missing-field", "false");
+               options.put("json.ignore-parse-errors", "true");
+               return options;
+       }
+
+       private static DynamicTableSource createTableSource(Map<String, String> 
options) {
+               return FactoryUtil.createTableSource(
+                               null,
+                               ObjectIdentifier.of("default", "default", "t1"),
+                               new CatalogTableImpl(SCHEMA, options, "Mock 
scan table"),
+                               new Configuration(),
+                               JsonFormatFactoryTest.class.getClassLoader());
+       }
+
+       private static DynamicTableSink createTableSink(Map<String, String> 
options) {
+               return FactoryUtil.createTableSink(
+                               null,
+                               ObjectIdentifier.of("default", "default", "t1"),
+                               new CatalogTableImpl(SCHEMA, options, "Mock 
sink table"),
+                               new Configuration(),
+                               JsonFormatFactoryTest.class.getClassLoader());
+       }
+}

Reply via email to