This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new fa1021ee [FLINK-39608] Change KafkaDatasetFacet to extend 
DatasetConfigFacet (#246)
fa1021ee is described below

commit fa1021ee09c2594a3087be7983f864574539015c
Author: Swapna Marru <[email protected]>
AuthorDate: Thu Jun 4 00:46:35 2026 -0700

    [FLINK-39608] Change KafkaDatasetFacet to extend DatasetConfigFacet (#246)
    
    This fixes classloader issues
---
 .../kafka/lineage/DefaultKafkaDatasetFacet.java        | 17 +++++++++++++++++
 .../connector/kafka/lineage/KafkaDatasetFacet.java     |  4 ++--
 .../flink/connector/kafka/sink/KafkaSinkTest.java      | 18 ++++++++++++------
 .../flink/connector/kafka/source/KafkaSourceTest.java  |  1 +
 4 files changed, 32 insertions(+), 8 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
index 9d126e4d..f4ed3404 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
@@ -22,6 +22,9 @@ package org.apache.flink.connector.kafka.lineage;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 
@@ -81,4 +84,18 @@ public class DefaultKafkaDatasetFacet implements 
KafkaDatasetFacet {
     public String name() {
         return KAFKA_FACET_NAME;
     }
+
+    @Override
+    public Map<String, String> config() {
+        if (properties == null) {
+            return Collections.emptyMap();
+        }
+
+        Map<String, String> config = new HashMap<>();
+        for (String key : properties.stringPropertyNames()) {
+            config.put(key, properties.getProperty(key));
+        }
+
+        return config;
+    }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
index 42ce28b5..6c445bd1 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
@@ -20,13 +20,13 @@
 package org.apache.flink.connector.kafka.lineage;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
 
 import java.util.Properties;
 
 /** Facet definition to contain all Kafka specific information on Kafka 
sources and sinks. */
 @PublicEvolving
-public interface KafkaDatasetFacet extends LineageDatasetFacet {
+public interface KafkaDatasetFacet extends DatasetConfigFacet {
     Properties getProperties();
 
     KafkaDatasetIdentifier getTopicIdentifier();
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
index e2a6f29b..37477e0b 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
 import org.apache.flink.streaming.api.lineage.LineageVertex;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -106,18 +108,22 @@ public class KafkaSinkTest {
         
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1");
         assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1");
 
-        assertThat(
-                        lineageVertex
-                                .datasets()
-                                .get(0)
-                                .facets()
-                                
.get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME))
+        LineageDatasetFacet lineageDatasetFacet =
+                lineageVertex
+                        .datasets()
+                        .get(0)
+                        .facets()
+                        .get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
+        assertThat(lineageDatasetFacet)
                 .hasFieldOrPropertyWithValue("properties", kafkaProperties)
                 .hasFieldOrPropertyWithValue(
                         "topicIdentifier",
                         DefaultKafkaDatasetIdentifier.ofTopics(
                                 Collections.singletonList("topic1")));
 
+        assertThat(((DatasetConfigFacet) lineageDatasetFacet).config())
+                .containsEntry("bootstrap.servers", "host1;host2");
+
         assertThat(
                         lineageVertex
                                 .datasets()
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
index 259668c5..c81d6cd1 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
@@ -161,6 +161,7 @@ public class KafkaSourceTest {
                         
dataset.facets().get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
 
         
assertThat(kafkaFacet.getProperties()).containsEntry("bootstrap.servers", 
"host1;host2");
+        assertThat(kafkaFacet.config()).containsEntry("bootstrap.servers", 
"host1;host2");
 
         
assertThat(dataset.facets()).containsKey(DefaultTypeDatasetFacet.TYPE_FACET_NAME);
         
assertThat(dataset.facets().get(DefaultTypeDatasetFacet.TYPE_FACET_NAME))

Reply via email to