This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new fa1021ee [FLINK-39608] Change KafkaDatasetFacet to extend
DatasetConfigFacet (#246)
fa1021ee is described below
commit fa1021ee09c2594a3087be7983f864574539015c
Author: Swapna Marru <[email protected]>
AuthorDate: Thu Jun 4 00:46:35 2026 -0700
[FLINK-39608] Change KafkaDatasetFacet to extend DatasetConfigFacet (#246)
This fixes classloader issues
---
.../kafka/lineage/DefaultKafkaDatasetFacet.java | 17 +++++++++++++++++
.../connector/kafka/lineage/KafkaDatasetFacet.java | 4 ++--
.../flink/connector/kafka/sink/KafkaSinkTest.java | 18 ++++++++++++------
.../flink/connector/kafka/source/KafkaSourceTest.java | 1 +
4 files changed, 32 insertions(+), 8 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
index 9d126e4d..f4ed3404 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java
@@ -22,6 +22,9 @@ package org.apache.flink.connector.kafka.lineage;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -81,4 +84,18 @@ public class DefaultKafkaDatasetFacet implements
KafkaDatasetFacet {
public String name() {
return KAFKA_FACET_NAME;
}
+
+ @Override
+ public Map<String, String> config() {
+ if (properties == null) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, String> config = new HashMap<>();
+ for (String key : properties.stringPropertyNames()) {
+ config.put(key, properties.getProperty(key));
+ }
+
+ return config;
+ }
}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
index 42ce28b5..6c445bd1 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java
@@ -20,13 +20,13 @@
package org.apache.flink.connector.kafka.lineage;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
import java.util.Properties;
/** Facet definition to contain all Kafka specific information on Kafka
sources and sinks. */
@PublicEvolving
-public interface KafkaDatasetFacet extends LineageDatasetFacet {
+public interface KafkaDatasetFacet extends DatasetConfigFacet {
Properties getProperties();
KafkaDatasetIdentifier getTopicIdentifier();
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
index e2a6f29b..37477e0b 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
@@ -32,6 +32,8 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -106,18 +108,22 @@ public class KafkaSinkTest {
assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1");
assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1");
- assertThat(
- lineageVertex
- .datasets()
- .get(0)
- .facets()
-
.get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME))
+ LineageDatasetFacet lineageDatasetFacet =
+ lineageVertex
+ .datasets()
+ .get(0)
+ .facets()
+ .get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
+ assertThat(lineageDatasetFacet)
.hasFieldOrPropertyWithValue("properties", kafkaProperties)
.hasFieldOrPropertyWithValue(
"topicIdentifier",
DefaultKafkaDatasetIdentifier.ofTopics(
Collections.singletonList("topic1")));
+ assertThat(((DatasetConfigFacet) lineageDatasetFacet).config())
+ .containsEntry("bootstrap.servers", "host1;host2");
+
assertThat(
lineageVertex
.datasets()
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
index 259668c5..c81d6cd1 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java
@@ -161,6 +161,7 @@ public class KafkaSourceTest {
dataset.facets().get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME);
assertThat(kafkaFacet.getProperties()).containsEntry("bootstrap.servers",
"host1;host2");
+ assertThat(kafkaFacet.config()).containsEntry("bootstrap.servers",
"host1;host2");
assertThat(dataset.facets()).containsKey(DefaultTypeDatasetFacet.TYPE_FACET_NAME);
assertThat(dataset.facets().get(DefaultTypeDatasetFacet.TYPE_FACET_NAME))