[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535036#comment-16535036 ]
ASF GitHub Bot commented on FLINK-8558: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6264#discussion_r200696183 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowFormatFactory.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.descriptors.AvroValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.FormatDescriptorValidator; +import org.apache.flink.table.formats.DeserializationSchemaFactory; +import org.apache.flink.table.formats.SerializationSchemaFactory; +import org.apache.flink.types.Row; + +import org.apache.avro.specific.SpecificRecord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Table format factory for providing configured instances of Avro-to-row {@link SerializationSchema} + * and {@link DeserializationSchema}. + */ +public class AvroRowFormatFactory implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> { + + @Override + public Map<String, String> requiredContext() { + final Map<String, String> context = new HashMap<>(); + context.put(FormatDescriptorValidator.FORMAT_TYPE(), AvroValidator.FORMAT_TYPE_VALUE); + context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1"); + return context; + } + + @Override + public boolean supportsSchemaDerivation() { + return false; + } + + @Override + public List<String> supportedProperties() { + final List<String> properties = new ArrayList<>(); + properties.add(AvroValidator.FORMAT_RECORD_CLASS); + properties.add(AvroValidator.FORMAT_AVRO_SCHEMA); + return properties; + } + + @Override + public DeserializationSchema<Row> createDeserializationSchema(Map<String, String> properties) { + final DescriptorProperties props = new DescriptorProperties(true); --- End diff -- Validation and creation are independent tasks. Validation is an internal task that's why validators have internal utility classes (`DescriptorProperties`), however, the public API for users that want to implement own formats have the plain-old `java.util.Map`. > Add unified format interfaces and format discovery > -------------------------------------------------- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors > Reporter: Timo Walther > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)