Hi Gyula,

First of all sorry for the delayed response.

I see the argument for handling metadata from kafka headers. I haven't
noticed the schema you are proposing is actually
KafkaDeserializationSchema, which means it works only with Kafka.

I still believe it would be really beneficial for the community to have
a more general registry schema, but if we want to support the schema
being encoded in the records metadata it would require a rework of the
hierarchy of the (Connector)DeserializationSchemas. Which I guess should
be discussed separately.

Having said that I tend to agree with you it would make sense to add the
thin wrapper as an initial version. Especially as you are suggesting to
hide the implementation details behind a builder. Some comments on the
design:

* I would make it more explicit in the entry point this works with the
Cloudera(Hortonworks) schema registry (Maybe sth like
ClouderaRegistryDeserializationSchema.builder())

* I would make it somehow more explicit that it constructs only
*Kafka*(De)serializationSchema.

* We should consider the dependencies design. This schema in contrast to
the Confluent's, would pull in kafka consumer dependencies. If we add a
schema that could deserialize data from other systems, we should not
pull the kafka dependencies automatically.

Best,

Dawid

On 06/11/2019 11:32, Gyula Fóra wrote:
> Hi Dawid,
>
> In general I agree if we can provide a completely unified way of
> handling this registries that would be great but I wonder if that
> makes sense in the long term. While the cloudera schema registry only
> supports Avro at the moment, it aims to support other formats in the
> future, and accessing this functionality will probably rely on using
> those specific serializer/deserializer implementations. This might not
> be a valid concern at this point though :)
>
> The reason why we went with wrapping the
> KafkaAvroDeserializer/Serializer directly now, is that it was super
> simple to do and the current SchemaCoder approach lacks a lot of
> flexibility/functionality.
>
> The schema itself doesn't always come from the serialized data (I
> believe in this case it is either stored in the serialized data or the
> kafka record metadata) and also we want to be able to handle kafka
> message keys. I guess these could be solved by making the
> deserialization logic Kafka specific and exposing the ConsumerRecord
> but that would completely change the current schemacoder related
> interfaces.
>
> Cheers,
> Gyula
>
> On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Hi Gyula,
>
>     I did not want to discourage this contribution. I do agree we should
>     treat this connector equally to the confluent's schema registry. I
>     just
>     wanted to express my uncertainty about general approach to new
>     connectors contributions. By no means I wanted to discourage this
>     contribution.
>
>     As for the second point. Do you mean that you are wrapping the
>     KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks
>     schema
>     registry?
>
>     Personally I would very much prefer using the SchemaCoder
>     approach. All
>     schemas boil down to two steps. (De)Serializing the schema with
>     registry
>     specific protocol + (de)serializing the record itself. I think the
>     approach with SchemaCoder has the benefit that we can optimize
>     instantiation of Avro's readers and writers in a unified way. It's
>     also
>     easier to maintain as we have just a single point where the actual
>     record (de)serialization happens. It also provides a unified way of
>     instantiating the TypeInformation. Could you give some explanation why
>     would you prefer not to use this approach?
>
>     Best,
>
>     Dawid
>
>     On 05/11/2019 14:48, Gyula Fóra wrote:
>     > Thanks Matyas for starting the discussion!
>     > I think this would be a very valuable addition to Flink as many
>     companies
>     > are already using the Hortonworks/Cloudera registry and it would
>     enable
>     > them to connect to Flink easily.
>     >
>     > @Dawid:
>     > Regarding the implementation this a much more lightweight
>     connector than
>     > what we have now for the Confluent registry and the PR you
>     linked. This
>     > wraps the cloudera registry directly, providing a very thin
>     wrapper + some
>     > enhanced functionality regarding handling of Kafka messages keys.
>     >
>     > As for the question of main repo outside, I would prefer this to be
>     > included in the main repo, similar to the Confluent registry
>     connector.
>     > Unless we decide to move all of these connectors out I would
>     like to take a
>     > consistent approach.
>     >
>     > Cheers,
>     > Gyula
>     >
>     >
>     > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz
>     <dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
>     > wrote:
>     >
>     >> Hi Matyas,
>     >>
>     >> I think this would be a valuable addition. You may reuse some
>     of the
>     >> already available abstractions for writing avro deserialization
>     schema
>     >> based on a schema registry (have a look at
>     RegistryDeserializationSchema
>     >> and SchemaCoderProvider). There is also an opened PR for adding a
>     >> similar serialization schema[1].
>     >>
>     >> The only concern is that I am not 100% sure what is the
>     consensus on
>     >> which connectors do we want to adapt into the main repository
>     and which
>     >> would we prefer to be hosted separately and included in the
>     ecosystem
>     >> webpage[2] (that I hope will be published soon).
>     >>
>     >> Whatever option will be preferred I could help review the code.
>     >>
>     >> Best,
>     >>
>     >> Dawid
>     >>
>     >> [1] https://github.com/apache/flink/pull/8371
>     >>
>     >> [2]
>     >>
>     >>
>     
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>     >>
>     >> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
>     >>> Dear Flink Community!
>     >>>
>     >>> We have noticed a recent request for Hortonworks schema
>     registry support
>     >> (
>     >>> FLINK-14577
>     <https://issues.apache.org/jira/browse/FLINK-14577>). We
>     >> have
>     >>> an implementation for it already, and we would be happy to
>     contribute it
>     >> to
>     >>> Apache Flink.
>     >>>
>     >>> You can find the documentation below[1]. Let us know your
>     thoughts!
>     >>>
>     >>> Best Regards,
>     >>> Matyas
>     >>>
>     >>> [1] Flink Avro Cloudera Registry User Guide
>     >>> -----------------------------------------------------------
>     >>>
>     >>> Add the following dependency to use the schema registry
>     integration:
>     >>> <dependency>
>     >>>     <groupId>org.apache.flink</groupId>
>     >>>     <artifactId>flink-avro-cloudera-registry</artifactId>
>     >>>     <version>${flink.version}</version>
>     >>> </dependency>
>     >>>
>     >>>
>     >>> The schema registry can be plugged directly into the
>     FlinkKafkaConsumer
>     >> and
>     >>> FlinkKafkaProducer using the appropriate schema:
>     >>> -
>     >>>
>     >>
>     
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
>     >>> -
>     >>>
>     >>
>     
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>     >>>
>     >>> Supported types
>     >>> ----------------------
>     >>> - Avro Specific Record types
>     >>> - Avro Generic Records
>     >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double,
>     Float,
>     >> Long,
>     >>> String, Boolean
>     >>>
>     >>> SchemaRegistrySerializationSchema
>     >>> --------------------------------------------------
>     >>> The serialization schema can be constructed using the included
>     builder
>     >>> object SchemaRegistrySerializationSchema.builder(..).
>     >>>
>     >>> Required settings:
>     >>> - Topic configuration when creating the builder. Can be static
>     or dynamic
>     >>> (extracted from the data)
>     >>> - RegistryAddress parameter on the builder to establish the
>     connection
>     >>>
>     >>> Optional settings:
>     >>> - Arbitrary SchemaRegistry client configuration using the
>     setConfig
>     >> method
>     >>> - Key configuration for the produced Kafka messages
>     >>>  - By specifying a KeySelector function that extracts the key
>     from each
>     >>> record
>     >>>  - Using a Tuple2 stream for (key, value) pairs directly
>     >>> - Security configuration
>     >>>
>     >>> Example:
>     >>> KafkaSerializationSchema<ItemTransaction> schema =
>     >>> SchemaRegistrySerializationSchema
>     >>>     .<ItemTransaction>builder(topic)
>     >>>     .setRegistryAddress(registryAddress)
>     >>>     .setKey(ItemTransaction::getItemId)
>     >>>     .build();
>     >>> FlinkKafkaProducer<ItemTransaction> sink = new
>     >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
>     >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>     >>>
>     >>> SchemaRegistryDeserializationSchema
>     >>> -----------------------------------------------------
>     >>> The deserialization schema can be constructed using the
>     included builder
>     >>> object SchemaRegistryDeserializationSchema.builder(..).
>     >>> When reading messages (and keys) we always have to specify the
>     expected
>     >>> Class<T> or record Schema of the input records so that Flink
>     can do any
>     >>> necessary conversion between the data on Kafka and what is
>     expected.
>     >>>
>     >>> Required settings:
>     >>> - Class or Schema of the input messages depending on the data type
>     >>> - RegistryAddress parameter on the builder to establish the
>     connection
>     >>>
>     >>> Optional settings:
>     >>> - Arbitrary SchemaRegistry client configuration using the
>     setConfig
>     >> method
>     >>> - Key configuration for the consumed Kafka messages
>     >>>  - Should only be specified when we want to read the keys as
>     well into a
>     >>> (key, value) stream
>     >>> - Security configuration
>     >>>
>     >>> Example:
>     >>> KafkaDeserializationSchema<ItemTransaction> schema =
>     >>> SchemaRegistryDeserializationSchema
>     >>>    .builder(ItemTransaction.class)
>     >>>    .setRegistryAddress(registryAddress)
>     >>>    .build();
>     >>> FlinkKafkaConsumer<ItemTransaction> source = new
>     >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>     >>>
>     >>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to