pawel-big-lebowski commented on code in PR #130:
URL:
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1818853390
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +416,43 @@ public ProducerRecord<byte[], byte[]> serialize(
value,
headerProvider != null ?
headerProvider.getHeaders(element) : null);
}
+
+ @Override
+ public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+ if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+ LOG.warn("Cannot identify topics. Not an
TopicsIdentifierProvider");
+ return Optional.empty();
+ }
+
+ Optional<KafkaDatasetIdentifier> topicsIdentifier =
+ ((KafkaDatasetIdentifierProvider)
(topicSelector)).getDatasetIdentifier();
+
+ if (!topicsIdentifier.isPresent()) {
+ LOG.warn("No topics' identifiers provided");
+ return Optional.empty();
+ }
+
+ TypeInformation typeInformation;
+ if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+ typeInformation =
+ ((ResultTypeQueryable<?>)
this.valueSerializationSchema).getProducedType();
+ } else {
+ // gets type information from serialize method signature
+ typeInformation =
Review Comment:
This is returned within the facet and then listener (like
OpenLineageJobListener) converts it to dataset schema format description. For
OpenLineage, it's called `SchemaDatasetFacet`. I think this is not Kafka
connector specific and there should be a general schema-alike facet within
flink core. However, I don't feel I would be able to achieve this now. Schema
information is valuable for both input and output datasets.
I hope typeInformation approach will work well for `Avro` and `Protobuf`.
Hopefully, in some time, I create separate tests within OpenLineage job
listener to verify this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]