Hi Gyula, First of all sorry for the delayed response.
I see the argument for handling metadata from kafka headers. I haven't noticed the schema you are proposing is actually KafkaDeserializationSchema, which means it works only with Kafka. I still believe it would be really beneficial for the community to have a more general registry schema, but if we want to support the schema being encoded in the records metadata it would require a rework of the hierarchy of the (Connector)DeserializationSchemas. Which I guess should be discussed separately. Having said that I tend to agree with you it would make sense to add the thin wrapper as an initial version. Especially as you are suggesting to hide the implementation details behind a builder. Some comments on the design: * I would make it more explicit in the entry point this works with the Cloudera(Hortonworks) schema registry (Maybe sth like ClouderaRegistryDeserializationSchema.builder()) * I would make it somehow more explicit that it constructs only *Kafka*(De)serializationSchema. * We should consider the dependencies design. This schema in contrast to the Confluent's, would pull in kafka consumer dependencies. If we add a schema that could deserialize data from other systems, we should not pull the kafka dependencies automatically. Best, Dawid On 06/11/2019 11:32, Gyula Fóra wrote: > Hi Dawid, > > In general I agree if we can provide a completely unified way of > handling this registries that would be great but I wonder if that > makes sense in the long term. While the cloudera schema registry only > supports Avro at the moment, it aims to support other formats in the > future, and accessing this functionality will probably rely on using > those specific serializer/deserializer implementations. This might not > be a valid concern at this point though :) > > The reason why we went with wrapping the > KafkaAvroDeserializer/Serializer directly now, is that it was super > simple to do and the current SchemaCoder approach lacks a lot of > flexibility/functionality. > > The schema itself doesn't always come from the serialized data (I > believe in this case it is either stored in the serialized data or the > kafka record metadata) and also we want to be able to handle kafka > message keys. I guess these could be solved by making the > deserialization logic Kafka specific and exposing the ConsumerRecord > but that would completely change the current schemacoder related > interfaces. > > Cheers, > Gyula > > On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote: > > Hi Gyula, > > I did not want to discourage this contribution. I do agree we should > treat this connector equally to the confluent's schema registry. I > just > wanted to express my uncertainty about general approach to new > connectors contributions. By no means I wanted to discourage this > contribution. > > As for the second point. Do you mean that you are wrapping the > KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks > schema > registry? > > Personally I would very much prefer using the SchemaCoder > approach. All > schemas boil down to two steps. (De)Serializing the schema with > registry > specific protocol + (de)serializing the record itself. I think the > approach with SchemaCoder has the benefit that we can optimize > instantiation of Avro's readers and writers in a unified way. It's > also > easier to maintain as we have just a single point where the actual > record (de)serialization happens. It also provides a unified way of > instantiating the TypeInformation. Could you give some explanation why > would you prefer not to use this approach? > > Best, > > Dawid > > On 05/11/2019 14:48, Gyula Fóra wrote: > > Thanks Matyas for starting the discussion! > > I think this would be a very valuable addition to Flink as many > companies > > are already using the Hortonworks/Cloudera registry and it would > enable > > them to connect to Flink easily. > > > > @Dawid: > > Regarding the implementation this a much more lightweight > connector than > > what we have now for the Confluent registry and the PR you > linked. This > > wraps the cloudera registry directly, providing a very thin > wrapper + some > > enhanced functionality regarding handling of Kafka messages keys. > > > > As for the question of main repo outside, I would prefer this to be > > included in the main repo, similar to the Confluent registry > connector. > > Unless we decide to move all of these connectors out I would > like to take a > > consistent approach. > > > > Cheers, > > Gyula > > > > > > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz > <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> > > wrote: > > > >> Hi Matyas, > >> > >> I think this would be a valuable addition. You may reuse some > of the > >> already available abstractions for writing avro deserialization > schema > >> based on a schema registry (have a look at > RegistryDeserializationSchema > >> and SchemaCoderProvider). There is also an opened PR for adding a > >> similar serialization schema[1]. > >> > >> The only concern is that I am not 100% sure what is the > consensus on > >> which connectors do we want to adapt into the main repository > and which > >> would we prefer to be hosted separately and included in the > ecosystem > >> webpage[2] (that I hope will be published soon). > >> > >> Whatever option will be preferred I could help review the code. > >> > >> Best, > >> > >> Dawid > >> > >> [1] https://github.com/apache/flink/pull/8371 > >> > >> [2] > >> > >> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html > >> > >> On 05/11/2019 12:40, Őrhidi Mátyás wrote: > >>> Dear Flink Community! > >>> > >>> We have noticed a recent request for Hortonworks schema > registry support > >> ( > >>> FLINK-14577 > <https://issues.apache.org/jira/browse/FLINK-14577>). We > >> have > >>> an implementation for it already, and we would be happy to > contribute it > >> to > >>> Apache Flink. > >>> > >>> You can find the documentation below[1]. Let us know your > thoughts! > >>> > >>> Best Regards, > >>> Matyas > >>> > >>> [1] Flink Avro Cloudera Registry User Guide > >>> ----------------------------------------------------------- > >>> > >>> Add the following dependency to use the schema registry > integration: > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> <artifactId>flink-avro-cloudera-registry</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> > >>> > >>> The schema registry can be plugged directly into the > FlinkKafkaConsumer > >> and > >>> FlinkKafkaProducer using the appropriate schema: > >>> - > >>> > >> > > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema > >>> - > >>> > >> > > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema > >>> > >>> Supported types > >>> ---------------------- > >>> - Avro Specific Record types > >>> - Avro Generic Records > >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, > Float, > >> Long, > >>> String, Boolean > >>> > >>> SchemaRegistrySerializationSchema > >>> -------------------------------------------------- > >>> The serialization schema can be constructed using the included > builder > >>> object SchemaRegistrySerializationSchema.builder(..). > >>> > >>> Required settings: > >>> - Topic configuration when creating the builder. Can be static > or dynamic > >>> (extracted from the data) > >>> - RegistryAddress parameter on the builder to establish the > connection > >>> > >>> Optional settings: > >>> - Arbitrary SchemaRegistry client configuration using the > setConfig > >> method > >>> - Key configuration for the produced Kafka messages > >>> - By specifying a KeySelector function that extracts the key > from each > >>> record > >>> - Using a Tuple2 stream for (key, value) pairs directly > >>> - Security configuration > >>> > >>> Example: > >>> KafkaSerializationSchema<ItemTransaction> schema = > >>> SchemaRegistrySerializationSchema > >>> .<ItemTransaction>builder(topic) > >>> .setRegistryAddress(registryAddress) > >>> .setKey(ItemTransaction::getItemId) > >>> .build(); > >>> FlinkKafkaProducer<ItemTransaction> sink = new > >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps, > >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > >>> > >>> SchemaRegistryDeserializationSchema > >>> ----------------------------------------------------- > >>> The deserialization schema can be constructed using the > included builder > >>> object SchemaRegistryDeserializationSchema.builder(..). > >>> When reading messages (and keys) we always have to specify the > expected > >>> Class<T> or record Schema of the input records so that Flink > can do any > >>> necessary conversion between the data on Kafka and what is > expected. > >>> > >>> Required settings: > >>> - Class or Schema of the input messages depending on the data type > >>> - RegistryAddress parameter on the builder to establish the > connection > >>> > >>> Optional settings: > >>> - Arbitrary SchemaRegistry client configuration using the > setConfig > >> method > >>> - Key configuration for the consumed Kafka messages > >>> - Should only be specified when we want to read the keys as > well into a > >>> (key, value) stream > >>> - Security configuration > >>> > >>> Example: > >>> KafkaDeserializationSchema<ItemTransaction> schema = > >>> SchemaRegistryDeserializationSchema > >>> .builder(ItemTransaction.class) > >>> .setRegistryAddress(registryAddress) > >>> .build(); > >>> FlinkKafkaConsumer<ItemTransaction> source = new > >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId); > >>> > >> >
signature.asc
Description: OpenPGP digital signature