[
https://issues.apache.org/jira/browse/BEAM-7310?focusedWorklogId=376876&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376876
]
ASF GitHub Bot logged work on BEAM-7310:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Jan/20 15:57
Start Date: 24/Jan/20 15:57
Worklog Time Spent: 10m
Work Description: aromanenko-dev commented on pull request #10563:
[BEAM-7310] Add support of Confluent Schema Registry for KafkaIO
URL: https://github.com/apache/beam/pull/10563#discussion_r370708266
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -165,6 +175,30 @@
* <br>
* 3. Start from <em>latest</em> offset by default;
*
+ * <h3>Use Avro schema with Confluent Schema Registry</h3>
+ *
+ * <p>If the message keys or/and values in a topic were serialised by Avro
schema stored in
+ * Confluent Schema Registry, KafkaIO source can fetch this schema from
specified Schema Registry
+ * URL and use it for deserialization further. As an output, it will return a
{@link PCollection} of
+ * {@link KafkaRecord}s where key or/and value will be typed as {@link
+ * org.apache.avro.generic.GenericRecord}. In this case, users don't need to
specify key or/and
+ * value deserializers and coders since they will be set to {@link
KafkaAvroDeserializer} and {@link
+ * AvroCoder} by default accordingly.
+ *
+ * <p>For example, below topic values are serialized with Avro schema stored
in Schema Registry,
+ * keys are typed as {@link Long}:
+ *
+ * <pre>{@code
+ * PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
+ * .apply(KafkaIO.<Long, GenericRecord>read()
+ * .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopic("my_topic")
+ * .withSchemaRegistry(""http://localhost:8081"") // Confluent Schema
Registry URL
Review comment:
@reuvenlax In multiple topics case we expect that all topics follow the same
schema structure which is stored in Confluent Schema Registry. This is like
what we do with deserialisers (and coders) in such case - we expect that all
messages from different topics can be deserialised by the same deserialiser. I
was thinking to create a kind of map for topic->schema relations and
deserialise it accordingly, but there are two objections against that:
- we will need to write a wrapper around `KafkaAvroDeserializer` to support
multiple topics with multiple schemas there, which is probably not a big deal
but having an output PCollection of `GenericRecord`s created based on different
schemas seems quite complex.
- afaik, we can set only one single Beam schema for PCollection. So what we
should do in multiple schemas case?
As for now, I think we need to clear explain it in Javadoc and leave it as a
user responsibility to choose a proper schema for all required input topic(s)
that satisfy it.
Please, let me know if you have other ideas or examples about that, I'd be
happy to discuss them to find a best solution.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 376876)
Time Spent: 9h 10m (was: 9h)
> Confluent Schema Registry support in KafkaIO
> --------------------------------------------
>
> Key: BEAM-7310
> URL: https://issues.apache.org/jira/browse/BEAM-7310
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Affects Versions: 2.12.0
> Reporter: Yohei Shimomae
> Assignee: Alexey Romanenko
> Priority: Minor
> Time Spent: 9h 10m
> Remaining Estimate: 0h
>
> Confluent Schema Registry is useful when we manage Avro Schema but KafkaIO
> does not support Confluent Schema Registry as discussed here.
> https://stackoverflow.com/questions/56035121/unable-to-connect-from-dataflow-job-to-schema-registry-when-schema-registry-requ
> https://lists.apache.org/thread.html/7695fccddebd08733b80ae1e43b79b636b63cd5fe583a2bdeecda6c4@%3Cuser.beam.apache.org%3E
--
This message was sent by Atlassian Jira
(v8.3.4#803005)