[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529329#comment-16529329 ] ASF GitHub Bot commented on FLINK-9337: --- Github user cricket007 commented on the issue: https://github.com/apache/flink/pull/5995 What about implementing a `KeyedDeserializationSchema` for Avro? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523770#comment-16523770 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r198156227 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + private static final long serialVersionUID = -1671641202177852775L; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema(Class recordClazz, @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric(Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param url url of schema registry to connect +* @param identityMapCapacity maximum number of cached schema versions (default: 1000) +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric(Schema schema, String url, --- End diff -- End user is supposed to use only this or `forSpecific` method and no other one. Therefore it must be public. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523767#comment-16523767 ] ASF GitHub Bot commented on FLINK-9337: --- Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r198150153 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + private static final long serialVersionUID = -1671641202177852775L; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema(Class recordClazz, @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric(Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param url url of schema registry to connect +* @param identityMapCapacity maximum number of cached schema versions (default: 1000) +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric(Schema schema, String url, --- End diff -- @dawidwys couldn't this be `private`? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major >
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523766#comment-16523766 ] ASF GitHub Bot commented on FLINK-9337: --- Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r198150284 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + private static final long serialVersionUID = -1671641202177852775L; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema(Class recordClazz, @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric(Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param url url of schema registry to connect +* @param identityMapCapacity maximum number of cached schema versions (default: 1000) +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric(Schema schema, String url, + int identityMapCapacity) { + return new ConfluentRegistryAvroDeserializationSchema<>( + GenericRecord.class, + schema, + new CachedSchemaCoderProvider(url, identityMapCapacity)); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487362#comment-16487362 ] ASF GitHub Bot commented on FLINK-9337: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5995 > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487166#comment-16487166 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Looks good, thanks! +1 to merge this > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16483578#comment-16483578 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 I've addressed your comments @StephanEwen . If you don't have any more, I will merge it today. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480350#comment-16480350 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Added a few more comment, most importantly around exception wrapping. Otherwise, looking good... > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480344#comment-16480344 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197633 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { --- End diff -- Minor comment: I found it helps code structure/readability to move static/factory methods either to the top or the bottom of the class. > Implement AvroDeserializationSchema >
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480345#comment-16480345 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197766 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { + return new AvroDeserializationSchema<>(GenericRecord.class, schema); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were generated from
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480333#comment-16480333 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189195014 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + + private static final long serialVersionUID = -884738268437806062L; + + /** Provider for schema coder. Used for initializing in each task. */ + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + + /** Coder used for reading schema from incoming stream. */ + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema(Class recordClazz, @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); + this.schemaCoderProvider = schemaCoderProvider; + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public T deserialize(byte[] message) { + // read record + try { + checkAvroInitialized(); + getInputStream().setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(getInputStream()); + Schema readerSchema = getReaderSchema(); + + GenericDatumReader datumReader = getDatumReader(); + + datumReader.setSchema(writerSchema); + datumReader.setExpected(readerSchema); + + return datumReader.read(null, getDecoder()); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Row.", e); --- End diff -- The method `deserialize()` can throw an IOException. That got dropped from the signature, and exceptions are not wrapped into a RuntimeException. That makes exception stack traces more complicated, and hides the fact that "there is a possible exceptional case to handle" from the consumers of that code. I think that this makes a general rule: Whenever using `RutimeException`, take a step back and look at the exception structure and signatures, and see if something is not declared well. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480293#comment-16480293 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185420 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); --- End diff -- I would skip the initialization in the constructor, if you have he initialization in `checkAvroInitialized()`. Simpler, and avoids having two places that to the initialization which have to be kept in sync. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480292#comment-16480292 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185186 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480248#comment-16480248 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 @StephanEwen could you have another look? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477008#comment-16477008 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 I would actually keep the package name for now. It makes sense, because the connection to the registry is avro-specific at the moment... > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476320#comment-16476320 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 Also as for the package name or place where to put it, I don't feel competent to suggest a place, therefore will be happy to apply your suggestion. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476314#comment-16476314 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 As for the snapshot binary data, I do understand that it should be created with appropriate flink version (in this case in theory with flink 1.3) and I've tried really hard to do so until I found out that this test is incompatible with 1.3 and the data could not be generated with flink 1.3 Later found out the comment to the test class that also states so: > Important: Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3) > * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already. > * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types) > * works properly. Also the commented code does not compile with flink 1.3(but this is a minor thing) Data serialized with version of avro used in flink 1.3 (1.7.7) is not binary compatible with avro 1.8.2 (in flink 1.4+), due to changes how SpecificFixed is constructed. Therefore how I regenerated this snapshot data is that I run the commented code on current branch. That is why I also changed few descriptions to that test as it test compatibility of `PojoSerializer` with `AvroSerializer` rather than binary backwards compatibility. Nevertheless I am more than happy to hear any comments on that. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476299#comment-16476299 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188386920 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { // Utilities // + private static boolean isGenericRecord(Class type) { + return !SpecificRecord.class.isAssignableFrom(type) && + GenericRecord.class.isAssignableFrom(type); + } + @Override public TypeSerializer duplicate() { - return new AvroSerializer<>(type); + if (schemaString != null) { + return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString)); --- End diff -- Didn't think it through well. Thought we need to create a deep copy of the schema, but as it is stateless I think we can just pass the schema. My mistake. Correct me if I am wrong. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476291#comment-16476291 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188386286 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476283#comment-16476283 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188385527 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476118#comment-16476118 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188321914 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema + extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param url url of schema registry to connect +* @param identityMapCapacity maximum number of cached schema versions (default: 1000) +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url, int identityMapCapacity) { + return new ConfluentRegistryAvroDeserializationSchema<>( + GenericRecord.class, + schema, + new CachedSchemaCoderProvider(url, identityMapCapacity)); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro +* schema and looks up writer schema
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476133#comment-16476133 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188350196 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); + this.schemaCoderProvider = schemaCoderProvider; + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public T deserialize(byte[] message) { + // read record + try { + getInputStream().setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(getInputStream()); + Schema readerSchema = getReaderSchema(); + + GenericDatumReader datumReader = getDatumReader(); + + datumReader.setSchema(writerSchema); + datumReader.setExpected(readerSchema); + + return datumReader.read(null, getDecoder()); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { --- End diff -- See above, would suggest to avoid readObject() > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476124#comment-16476124 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328437 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* --- End diff -- Double Apache header > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476137#comment-16476137 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188349853 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); --- End diff -- Empty line / indentation > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476127#comment-16476127 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188347289 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* --- End diff -- This class uses a mixture of eager initialization of transient members (in readObject()) and lazy initialization (in getDatumReader()). I would suggest to do it all one way or the other. My suggestion would be to avoid `readObject()` whenever possible. If you encounter an exception during schema parsing (and it may be something weird from Jackson, like a missing manifest due to a shading issue), you will get the most unhelpful exception stack trace ever, in the weirdest place (like Flink's RPC message decoder). In my experience, when a user sees such a stack trace, they are rarely able to diagnose that. Best case they show up on the mailing list, worst case they give up. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476126#comment-16476126 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188348636 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* --- End diff -- That would mean initializing it all lazily, in `getDatumReader()` or in a `checkAvroInitialized()` method. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476135#comment-16476135 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188355390 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -99,9 +108,29 @@ /** * Creates a new AvroSerializer for the type indicated by the given class. +* This constructor is intended to be used with {@link SpecificRecord} or reflection serializer. +* For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)} */ public AvroSerializer(Class type) { + Preconditions.checkArgument(!isGenericRecord(type), --- End diff -- Minor: Other preconditions checks in this class are done by statically imported methods. While this is not consistent within the code base, I would suggest to keep this consistent within a class as much as possible. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476129#comment-16476129 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188338897 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; --- End diff -- You can make this variable final as well. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476130#comment-16476130 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328926 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; --- End diff -- I would avoid null initialization, it is redundant. It actually does nothing (fields are null anyways) but acually exists as byte code, hence costs cpu cycles. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476120#comment-16476120 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188325819 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import org.apache.avro.Schema; + +import java.io.DataInputStream; +import java.io.InputStream; + +/** + * Reads schema using Confluent Schema Registry protocol. + */ +public class ConfluentSchemaRegistryCoder implements SchemaCoder { + + private final SchemaRegistryClient schemaRegistryClient; + + /** +* Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to +* schema registry. +* +* @param schemaRegistryClient client to connect schema registry +*/ + public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) { + this.schemaRegistryClient = schemaRegistryClient; + } + + @Override + public Schema readSchema(InputStream in) throws Exception { + DataInputStream dataInputStream = new DataInputStream(in); + + if (dataInputStream.readByte() != 0) { + throw new RuntimeException("Unknown data format. Magic number does not match"); --- End diff -- RuntimeExceptions (unchecked exceptions) are usually used to indicate programming errors, or (as a workaround) if the scope does not allow throwing any exception. This here is a case for a checked exception, in my opinion, like an `IOException`, `FlinkException`, etc. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476136#comment-16476136 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188353074 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -79,15 +87,16 @@ // runtime fields, non-serializable, lazily initialized --- - private transient SpecificDatumWriter writer; - private transient SpecificDatumReader reader; + private transient GenericDatumWriter writer; + private transient GenericDatumReader reader; private transient DataOutputEncoder encoder; private transient DataInputDecoder decoder; - private transient SpecificData avroData; + private transient GenericData avroData; private transient Schema schema; + private final String schemaString; --- End diff -- As per the comments, the existing code orders config fields before runtime fields. Can you place the schema to match that pattern? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476121#comment-16476121 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188337378 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { --- End diff -- This should have a serial version UID. You can activate the respective inspections in IntelliJ to warn about such issues. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476128#comment-16476128 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188340240 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476119#comment-16476119 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188316643 --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml --- @@ -0,0 +1,94 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-formats + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-avro-confluent-registry + + + + confluent + http://packages.confluent.io/maven/ + + + + + + io.confluent + kafka-schema-registry-client + 3.3.1 + + + org.apache.avro + avro + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + org.apache.flink + flink-avro + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + com.fasterxml.jackson.core + org.apache.flink.shaded.com.fasterxml.jackson.core --- End diff -- We may need to qualify this further by this project, because we have that relocation pattern already in other places, for potentially different jackson versions. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476122#comment-16476122 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188329273 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** --- End diff -- Minor comment: Many newer classes pick up a style where the JavaDocs of fields are in one line, to make the fields section a bit more compact: ``` /** Class to deserialize to. */ private Class recordClazz; ``` > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476131#comment-16476131 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188349118 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476116#comment-16476116 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188319278 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema + extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); --- End diff -- For such situations, code in the method and parameter list should use different indentation, or be separated by an empty line. Otherwise makes it hard to parse. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476134#comment-16476134 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188355756 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { // Utilities // + private static boolean isGenericRecord(Class type) { + return !SpecificRecord.class.isAssignableFrom(type) && + GenericRecord.class.isAssignableFrom(type); + } + @Override public TypeSerializer duplicate() { - return new AvroSerializer<>(type); + if (schemaString != null) { + return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString)); --- End diff -- Duplication happens frequently, would be good to avoid schema parsing. You can add a private copy constructor that takes class and string. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476125#comment-16476125 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328236 --- Diff: flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ConfluentSchemaRegistryCoder}. + */ +public class ConfluentSchemaRegistryCoderTest { --- End diff -- Do we want to test the magic byte verification? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476123#comment-16476123 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188336725 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; --- End diff -- All fields should be final whenever possible - immutability as the default choice. That acts both as documentation about the writer's intention and makes it future proof. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476132#comment-16476132 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188349785 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; --- End diff -- We typically do empty lines between class declaration and members. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476115#comment-16476115 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188319368 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema + extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); --- End diff -- Same as above > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16475936#comment-16475936 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188316052 --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml --- @@ -0,0 +1,94 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-formats + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-avro-confluent-registry --- End diff -- That is a good question - maybe eventually. We could leave it in `flink-formats` for now until we have a case to create `flink-catalogs`. This is also not a full-blown catalog support, as for the Table API, but something much simpler - just multiple Avro Schemas. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474345#comment-16474345 ] ASF GitHub Bot commented on FLINK-9337: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r187992680 --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml --- @@ -0,0 +1,94 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-formats + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-avro-confluent-registry --- End diff -- Do we really want the Confluent Schema Registry code to be in `flink-formats`? Shouldn't this be in something like `flink-catalogs` in the future? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474026#comment-16474026 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 Right sorry for that. I've changed the data generator a bit, so it produced different results than before with the same seed. I've recreated the serialized data with updated `TestDataGenerator`. It took me a while though to figure out that it should be created with current code rather than 1.3 branch. Therefore I updated the comment accordingly. Also reworded a bit other names as the `BackwardsCompatibleAvroSerializerTest` does not test compatibility with 1.3, but only with `PojoSerializer`. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473860#comment-16473860 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Thanks, the main code looks good! Unfortunately, this seems to wither break the compatibility with prior savepoints (when Avro types were implicitly handled through Kryo, now bridged through the `BackwarsCompatibleAvroSerializer`) or needs to adjust that test. There are also some license header issues, causing the build to fail. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472578#comment-16472578 ] ASF GitHub Bot commented on FLINK-9337: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/5995 [FLINK-9337] Implemented AvroDeserializationSchema ## What is the purpose of the change Provides implementation of AvroDeserializationSchema that reads records serialized as avro and also provides version that uses Confluent Schema Registry to look up writer schema. ## Brief change log - Implemented AvroDeserializationSchema / RegistryAvroDeserializationSchema / ConfluentRegistryAvroDeserializationSchema - Extended AvroSerializer to handle GenericRecords - Added GenericRecordTypeInformation ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink avro-deserializer2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5995.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5995 commit 885c31daa5ad9db03924311a6f443b72795d6ca3 Author: Dawid WysakowiczDate: 2018-05-11T16:55:10Z [FLINK-9337] Implemented AvroDeserializationSchema commit 4f1b398837d83d1a8be9a697a686a2ff54c5b22c Author: Dawid Wysakowicz Date: 2018-05-11T16:57:26Z [FLINK-9338] Implemented RegistryAvroDeserializationSchema & provided implementation for Confluent Schema Registry > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)