[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480344#comment-16480344 ]
ASF GitHub Bot commented on FLINK-9337: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197633 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param <T> type of record it produces + */ +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class<T> recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader<T> datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** + * Creates a Avro deserialization schema. + * + * @param recordClazz class to which deserialize. Should be one of: + * {@link org.apache.avro.specific.SpecificRecord}, + * {@link org.apache.avro.generic.GenericRecord}. + * @param reader reader's Avro schema. Should be provided if recordClazz is + * {@link GenericRecord} + */ + AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** + * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. + * + * @param schema schema of produced records + * @return deserialized record in form of {@link GenericRecord} + */ + public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) { --- End diff -- Minor comment: I found it helps code structure/readability to move static/factory methods either to the top or the bottom of the class. > Implement AvroDeserializationSchema > ----------------------------------- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)