[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-07-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529329#comment-16529329
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user cricket007 commented on the issue:

https://github.com/apache/flink/pull/5995
  
What about implementing a `KeyedDeserializationSchema` for Avro?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523770#comment-16523770
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198156227
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
--- End diff --

End user is supposed to use only this or `forSpecific` method and no other 
one. Therefore it must be public.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523767#comment-16523767
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198150153
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
--- End diff --

@dawidwys couldn't this be `private`?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523766#comment-16523766
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r198150284
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema extends 
RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   private static final long serialVersionUID = -1671641202177852775L;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(Class 
recordClazz, @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(Schema schema, String url,
+   int identityMapCapacity) {
+   return new ConfluentRegistryAvroDeserializationSchema<>(
+   GenericRecord.class,
+   schema,
+   new CachedSchemaCoderProvider(url, 
identityMapCapacity));
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro
+

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487362#comment-16487362
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5995


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487166#comment-16487166
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Looks good, thanks!

+1 to merge this


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16483578#comment-16483578
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
I've addressed your comments @StephanEwen . If you don't have any more, I 
will merge it today.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480350#comment-16480350
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Added a few more comment, most importantly around exception wrapping.
Otherwise, looking good...


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480344#comment-16480344
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197633
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
--- End diff --

Minor comment: I found it helps code structure/readability to move 
static/factory methods either to the top or the bottom of the class.


> Implement AvroDeserializationSchema
> 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480345#comment-16480345
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197766
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
+   return new AvroDeserializationSchema<>(GenericRecord.class, 
schema);
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480333#comment-16480333
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189195014
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+
+   private static final long serialVersionUID = -884738268437806062L;
+
+   /** Provider for schema coder. Used for initializing in each task. */
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+   /** Coder used for reading schema from incoming stream. */
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(Class recordClazz, 
@Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
+   this.schemaCoderProvider = schemaCoderProvider;
+   this.schemaCoder = schemaCoderProvider.get();
+   }
+
+   @Override
+   public T deserialize(byte[] message) {
+   // read record
+   try {
+   checkAvroInitialized();
+   getInputStream().setBuffer(message);
+   Schema writerSchema = 
schemaCoder.readSchema(getInputStream());
+   Schema readerSchema = getReaderSchema();
+
+   GenericDatumReader datumReader = getDatumReader();
+
+   datumReader.setSchema(writerSchema);
+   datumReader.setExpected(readerSchema);
+
+   return datumReader.read(null, getDecoder());
+   } catch (Exception e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
--- End diff --

The method `deserialize()` can throw an IOException. That got dropped from 
the signature, and exceptions are not wrapped into a RuntimeException. That 
makes exception stack traces more complicated, and hides the fact that "there 
is a possible exceptional case to handle" from the consumers of that code.

I think that this makes a general rule: Whenever using `RutimeException`, 
take a step back and look at the exception structure and signatures, and see if 
something is not declared well.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480293#comment-16480293
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185420
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
--- End diff --

I would skip the initialization in the constructor, if you have he 
initialization in `checkAvroInitialized()`. Simpler, and avoids having two 
places that to the initialization which have to be kept in sync.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480292#comment-16480292
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185186
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+ 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480248#comment-16480248
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
@StephanEwen could you have another look?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477008#comment-16477008
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
I would actually keep the package name for now. It makes sense, because the 
connection to the registry is avro-specific at the moment...


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476320#comment-16476320
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
Also as for the package name or place where to put it, I don't feel 
competent to suggest a place, therefore will be happy to apply your suggestion.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476314#comment-16476314
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
As for the snapshot binary data, I do understand that it should be created 
with appropriate flink version (in this case in theory with flink 1.3) and I've 
tried really hard to do so until I found out that this test is incompatible 
with 1.3 and the data could not be generated with flink 1.3 Later found out the 
comment to the test class that also states so: 

> Important: Since Avro itself broke class compatibility between 
1.7.7 (used in Flink 1.3) 

> * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken 
through Avro already.
> * This test only tests that the Avro serializer change (switching from 
Pojo to Avro for Avro types)
> * works properly.

Also the commented code does not compile with flink 1.3(but this is a minor 
thing)

Data serialized with version of avro used in flink 1.3 (1.7.7) is not 
binary compatible with avro 1.8.2 (in flink 1.4+), due to changes how 
SpecificFixed is constructed.

Therefore how I regenerated this snapshot data is that I run the commented 
code on current branch. That is why I also changed few descriptions to that 
test as it test compatibility of `PojoSerializer` with `AvroSerializer` rather 
than binary backwards compatibility. 

Nevertheless I am more than happy to hear any comments on that. 


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476299#comment-16476299
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188386920
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -275,9 +304,19 @@ else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
//  Utilities
// 

 
+   private static boolean isGenericRecord(Class type) {
+   return !SpecificRecord.class.isAssignableFrom(type) &&
+   GenericRecord.class.isAssignableFrom(type);
+   }
+
@Override
public TypeSerializer duplicate() {
-   return new AvroSerializer<>(type);
+   if (schemaString != null) {
+   return new AvroSerializer<>(type, new 
Schema.Parser().parse(schemaString));
--- End diff --

Didn't think it through well. Thought we need to create a deep copy of the 
schema, but as it is stateless I think we can just pass the schema. My mistake. 
Correct me if I am wrong. 


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476291#comment-16476291
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188386286
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476283#comment-16476283
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188385527
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476118#comment-16476118
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188321914
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema
+   extends RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url, int identityMapCapacity) {
+   return new ConfluentRegistryAvroDeserializationSchema<>(
+   GenericRecord.class,
+   schema,
+   new CachedSchemaCoderProvider(url, 
identityMapCapacity));
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro
+* schema and looks up writer schema 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476133#comment-16476133
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188350196
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
+   this.schemaCoderProvider = schemaCoderProvider;
+   this.schemaCoder = schemaCoderProvider.get();
+   }
+
+   @Override
+   public T deserialize(byte[] message) {
+   // read record
+   try {
+   getInputStream().setBuffer(message);
+   Schema writerSchema = 
schemaCoder.readSchema(getInputStream());
+   Schema readerSchema = getReaderSchema();
+
+   GenericDatumReader datumReader = getDatumReader();
+
+   datumReader.setSchema(writerSchema);
+   datumReader.setExpected(readerSchema);
+
+   return datumReader.read(null, getDecoder());
+   } catch (Exception e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+   }
+
+   private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
--- End diff --

See above, would suggest to avoid readObject()


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476124#comment-16476124
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328437
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
--- End diff --

Double Apache header


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476137#comment-16476137
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188349853
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
--- End diff --

Empty line / indentation


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476127#comment-16476127
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188347289
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
--- End diff --

This class uses a mixture of eager initialization of transient members (in 
readObject()) and lazy initialization (in getDatumReader()).

I would suggest to do it all one way or the other.

My suggestion would be to avoid `readObject()` whenever possible. If you 
encounter an exception during schema parsing (and it may be something weird 
from Jackson, like a missing manifest due to a shading issue), you will get the 
most unhelpful exception stack trace ever, in the weirdest place (like Flink's 
RPC message decoder).

In my experience, when a user sees such a stack trace, they are rarely able 
to diagnose that. Best case they show up on the mailing list, worst case they 
give up.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476126#comment-16476126
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188348636
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
--- End diff --

That would mean initializing it all lazily, in `getDatumReader()` or in a 
`checkAvroInitialized()` method.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476135#comment-16476135
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188355390
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -99,9 +108,29 @@
 
/**
 * Creates a new AvroSerializer for the type indicated by the given 
class.
+* This constructor is intended to be used with {@link SpecificRecord} 
or reflection serializer.
+* For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
 */
public AvroSerializer(Class type) {
+   Preconditions.checkArgument(!isGenericRecord(type),
--- End diff --

Minor: Other preconditions checks in this class are done by statically 
imported methods. While this is not consistent within the code base, I would 
suggest to keep this consistent within a class as much as possible.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476129#comment-16476129
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188338897
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
--- End diff --

You can make this variable final as well.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476130#comment-16476130
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328926
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
--- End diff --

I would avoid null initialization, it is redundant. It actually does 
nothing (fields are null anyways) but acually exists as byte code, hence costs 
cpu cycles.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476120#comment-16476120
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188325819
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import org.apache.avro.Schema;
+
+import java.io.DataInputStream;
+import java.io.InputStream;
+
+/**
+ * Reads schema using Confluent Schema Registry protocol.
+ */
+public class ConfluentSchemaRegistryCoder implements SchemaCoder {
+
+   private final SchemaRegistryClient schemaRegistryClient;
+
+   /**
+* Creates {@link SchemaCoder} that uses provided {@link 
SchemaRegistryClient} to connect to
+* schema registry.
+*
+* @param schemaRegistryClient client to connect schema registry
+*/
+   public ConfluentSchemaRegistryCoder(SchemaRegistryClient 
schemaRegistryClient) {
+   this.schemaRegistryClient = schemaRegistryClient;
+   }
+
+   @Override
+   public Schema readSchema(InputStream in) throws Exception {
+   DataInputStream dataInputStream = new DataInputStream(in);
+
+   if (dataInputStream.readByte() != 0) {
+   throw new RuntimeException("Unknown data format. Magic 
number does not match");
--- End diff --

RuntimeExceptions (unchecked exceptions) are usually used to indicate 
programming errors, or (as a workaround) if the scope does not allow throwing 
any exception.

This here is a case for a checked exception, in my opinion, like an 
`IOException`, `FlinkException`, etc.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476136#comment-16476136
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188353074
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -79,15 +87,16 @@
 
//  runtime fields, non-serializable, lazily initialized 
---
 
-   private transient SpecificDatumWriter writer;
-   private transient SpecificDatumReader reader;
+   private transient GenericDatumWriter writer;
+   private transient GenericDatumReader reader;
 
private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;
 
-   private transient SpecificData avroData;
+   private transient GenericData avroData;
 
private transient Schema schema;
+   private final String schemaString;
--- End diff --

As per the comments, the existing code orders config fields before runtime 
fields. Can you place the schema to match that pattern?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476121#comment-16476121
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188337378
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
--- End diff --

This should have a serial version UID.
You can activate the respective inspections in IntelliJ to warn about such 
issues.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476128#comment-16476128
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188340240
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+ 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476119#comment-16476119
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188316643
  
--- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
@@ -0,0 +1,94 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-formats
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-avro-confluent-registry
+
+   
+   
+   confluent
+   http://packages.confluent.io/maven/
+   
+   
+
+   
+   
+   io.confluent
+   kafka-schema-registry-client
+   3.3.1
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
org.apache.flink.shaded.com.fasterxml.jackson.core
--- End diff --

We may need to qualify this further by this project, because we have that 
relocation pattern already in other places, for potentially different jackson 
versions.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476122#comment-16476122
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188329273
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
--- End diff --

Minor comment: Many newer classes pick up a style where the JavaDocs of 
fields are in one line, to make the fields section a bit more compact:
```
/** Class to deserialize to. */
private Class recordClazz;
```


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476131#comment-16476131
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188349118
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+ 

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476116#comment-16476116
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188319278
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema
+   extends RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
--- End diff --

For such situations, code in the method and parameter list should use 
different indentation, or be separated by an empty line. Otherwise makes it 
hard to parse.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476134#comment-16476134
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188355756
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -275,9 +304,19 @@ else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
//  Utilities
// 

 
+   private static boolean isGenericRecord(Class type) {
+   return !SpecificRecord.class.isAssignableFrom(type) &&
+   GenericRecord.class.isAssignableFrom(type);
+   }
+
@Override
public TypeSerializer duplicate() {
-   return new AvroSerializer<>(type);
+   if (schemaString != null) {
+   return new AvroSerializer<>(type, new 
Schema.Parser().parse(schemaString));
--- End diff --

Duplication happens frequently, would be good to avoid schema parsing. You 
can add a private copy constructor that takes class and string.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476125#comment-16476125
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328236
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ConfluentSchemaRegistryCoder}.
+ */
+public class ConfluentSchemaRegistryCoderTest {
--- End diff --

Do we want to test the magic byte verification?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476123#comment-16476123
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188336725
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
--- End diff --

All fields should be final whenever possible - immutability as the default 
choice.
That acts both as documentation about the writer's intention and makes it 
future proof.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476132#comment-16476132
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188349785
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
--- End diff --

We typically do empty lines between class declaration and members.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476115#comment-16476115
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188319368
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema
+   extends RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
--- End diff --

Same as above


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16475936#comment-16475936
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188316052
  
--- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
@@ -0,0 +1,94 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-formats
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-avro-confluent-registry
--- End diff --

That is a good question - maybe eventually. We could leave it in 
`flink-formats` for now until we have a case to create `flink-catalogs`.

This is also not a full-blown catalog support, as for the Table API, but 
something much simpler - just multiple Avro Schemas.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474345#comment-16474345
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r187992680
  
--- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
@@ -0,0 +1,94 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-formats
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-avro-confluent-registry
--- End diff --

Do we really want the Confluent Schema Registry code to be in 
`flink-formats`? Shouldn't this be in something like `flink-catalogs` in the 
future?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474026#comment-16474026
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
Right sorry for that. I've changed the data generator a bit, so it produced 
different results than before with the same seed. I've recreated the serialized 
data with updated `TestDataGenerator`. It took me a while though to figure out 
that it should be created with current code rather than 1.3 branch. Therefore I 
updated the comment accordingly.

Also reworded a bit other names as the 
`BackwardsCompatibleAvroSerializerTest` does not test compatibility with 1.3, 
but only with `PojoSerializer`.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473860#comment-16473860
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Thanks, the main code looks good!

Unfortunately, this seems to wither break the compatibility with prior 
savepoints (when Avro types were implicitly handled through Kryo, now bridged 
through the `BackwarsCompatibleAvroSerializer`) or needs to adjust that test.

There are also some license header issues, causing the build to fail.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472578#comment-16472578
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/5995

 [FLINK-9337] Implemented AvroDeserializationSchema

## What is the purpose of the change
Provides implementation of AvroDeserializationSchema that reads records 
serialized as avro and also provides version that uses Confluent Schema 
Registry to look up writer schema.


## Brief change log

  - Implemented AvroDeserializationSchema / 
RegistryAvroDeserializationSchema / ConfluentRegistryAvroDeserializationSchema
  - Extended AvroSerializer to handle GenericRecords
  - Added GenericRecordTypeInformation

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink avro-deserializer2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5995.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5995


commit 885c31daa5ad9db03924311a6f443b72795d6ca3
Author: Dawid Wysakowicz 
Date:   2018-05-11T16:55:10Z

[FLINK-9337] Implemented AvroDeserializationSchema

commit 4f1b398837d83d1a8be9a697a686a2ff54c5b22c
Author: Dawid Wysakowicz 
Date:   2018-05-11T16:57:26Z

[FLINK-9338] Implemented RegistryAvroDeserializationSchema & provided 
implementation for Confluent Schema Registry




> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)