This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch v4.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit c7f75bde7840e93a3aa65e84e4b396a3c4e473a3
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Apr 10 09:16:55 2025 +0200

    [FLINK-37644] Remove guava from prod code
    
    With Java 11 most of the usages can be easily replaced. TypeToken can be 
replaced by Flink's TypeExtractor. The sole hard thing was a use of 
Map.difference but it's only needed for logging and we can solve this in a 
different way.
---
 .../util/kafka/SQLClientSchemaRegistryITCase.java  |  4 +---
 flink-connector-kafka/pom.xml                      | 11 +++++-----
 .../kafka/dynamic/metadata/ClusterMetadata.java    |  7 +-----
 .../kafka/dynamic/metadata/KafkaStream.java        | 13 +++++------
 .../kafka/dynamic/source/MetadataUpdateEvent.java  |  4 +---
 .../enumerator/DynamicKafkaSourceEnumerator.java   | 25 ++++++++--------------
 .../subscriber/StreamPatternSubscriber.java        | 10 ++++-----
 .../source/reader/DynamicKafkaSourceReader.java    |  8 +++----
 .../source/split/DynamicKafkaSourceSplit.java      | 13 +++++------
 .../split/DynamicKafkaSourceSplitSerializer.java   |  4 +---
 .../KafkaRecordSerializationSchemaBuilder.java     | 22 ++++++++++---------
 .../DynamicKafkaRecordSerializationSchema.java     | 25 ++++++++++++++--------
 pom.xml                                            |  2 ++
 13 files changed, 72 insertions(+), 76 deletions(-)

diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index e3b18194..522eff09 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -65,8 +65,6 @@ public class SQLClientSchemaRegistryITCase {
             ResourceTestUtils.getResource(".*avro-confluent.jar");
     private final Path sqlConnectorKafkaJar = 
ResourceTestUtils.getResource(".*kafka.jar");
 
-    private final Path guavaJar = ResourceTestUtils.getResource(".*guava.jar");
-
     @ClassRule public static final Network NETWORK = Network.newNetwork();
 
     @ClassRule public static final Timeout TIMEOUT = new Timeout(10, 
TimeUnit.MINUTES);
@@ -252,7 +250,7 @@ public class SQLClientSchemaRegistryITCase {
     private void executeSqlStatements(List<String> sqlLines) throws Exception {
         flink.submitSQLJob(
                 new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
-                        .addJars(sqlAvroJar, sqlAvroRegistryJar, 
sqlConnectorKafkaJar, guavaJar)
+                        .addJars(sqlAvroJar, sqlAvroRegistryJar, 
sqlConnectorKafkaJar)
                         .build());
     }
 }
diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index da9f0bcd..4c4d3e36 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -82,13 +82,14 @@ under the License.
             <version>${kafka.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-
         <!-- Tests -->
 
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
index 1ff18930..964e51e5 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
@@ -20,8 +20,6 @@ package org.apache.flink.connector.kafka.dynamic.metadata;
 
 import org.apache.flink.annotation.Experimental;
 
-import com.google.common.base.MoreObjects;
-
 import java.io.Serializable;
 import java.util.Objects;
 import java.util.Properties;
@@ -67,10 +65,7 @@ public class ClusterMetadata implements Serializable {
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("topics", topics)
-                .add("properties", properties)
-                .toString();
+        return "ClusterMetadata{" + "topics=" + topics + ", properties=" + 
properties + '}';
     }
 
     @Override
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java
index bea9872c..b08df6c5 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java
@@ -20,8 +20,6 @@ package org.apache.flink.connector.kafka.dynamic.metadata;
 
 import org.apache.flink.annotation.Experimental;
 
-import com.google.common.base.MoreObjects;
-
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
@@ -68,10 +66,13 @@ public class KafkaStream implements Serializable {
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("streamId", streamId)
-                .add("clusterMetadataMap", clusterMetadataMap)
-                .toString();
+        return "KafkaStream{"
+                + "streamId='"
+                + streamId
+                + '\''
+                + ", clusterMetadataMap="
+                + clusterMetadataMap
+                + '}';
     }
 
     @Override
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java
index 09b32d03..c7ae368f 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
 import 
org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader;
 
-import com.google.common.base.MoreObjects;
-
 import java.util.Objects;
 import java.util.Set;
 
@@ -55,7 +53,7 @@ public class MetadataUpdateEvent implements SourceEvent {
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this).add("kafkaStreams", 
kafkaStreams).toString();
+        return "MetadataUpdateEvent{" + "kafkaStreams=" + kafkaStreams + '}';
     }
 
     @Override
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index 48e1c78d..ff7cc21d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -41,9 +41,6 @@ import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscr
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.util.Preconditions;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -61,6 +58,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
@@ -260,17 +258,11 @@ public class DynamicKafkaSourceEnumerator
         }
 
         if (logger.isInfoEnabled()) {
-            MapDifference<String, Set<String>> metadataDifference =
-                    Maps.difference(latestClusterTopicsMap, 
newClustersTopicsMap);
+            // log the maps in a sorted fashion so it's easy to see the changes
             logger.info(
-                    "Common cluster topics after metadata refresh: {}",
-                    metadataDifference.entriesInCommon());
-            logger.info(
-                    "Removed cluster topics after metadata refresh: {}",
-                    metadataDifference.entriesOnlyOnLeft());
-            logger.info(
-                    "Additional cluster topics after metadata refresh: {}",
-                    metadataDifference.entriesOnlyOnRight());
+                    "Detected changed cluster topics after metadata 
refresh:\nPrevious: {}\nNew: {}",
+                    new TreeMap<>(latestClusterTopicsMap),
+                    new TreeMap<>(newClustersTopicsMap));
         }
 
         DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState;
@@ -456,10 +448,11 @@ public class DynamicKafkaSourceEnumerator
     public void addSplitsBack(List<DynamicKafkaSourceSplit> splits, int 
subtaskId) {
         logger.debug("Adding splits back for {}", subtaskId);
         // separate splits by cluster
-        ArrayListMultimap<String, KafkaPartitionSplit> kafkaPartitionSplits =
-                ArrayListMultimap.create();
+        Map<String, List<KafkaPartitionSplit>> kafkaPartitionSplits = new 
HashMap<>();
         for (DynamicKafkaSourceSplit split : splits) {
-            kafkaPartitionSplits.put(split.getKafkaClusterId(), 
split.getKafkaPartitionSplit());
+            kafkaPartitionSplits
+                    .computeIfAbsent(split.getKafkaClusterId(), unused -> new 
ArrayList<>())
+                    .add(split.getKafkaPartitionSplit());
         }
 
         // add splits back and assign pending splits for all enumerators
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java
index d95f7e0f..4000a9b7 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
 import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
 
-import com.google.common.collect.ImmutableSet;
-
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -40,14 +40,14 @@ public class StreamPatternSubscriber implements 
KafkaStreamSubscriber {
     @Override
     public Set<KafkaStream> getSubscribedStreams(KafkaMetadataService 
kafkaMetadataService) {
         Set<KafkaStream> allStreams = kafkaMetadataService.getAllStreams();
-        ImmutableSet.Builder<KafkaStream> builder = ImmutableSet.builder();
+        List<KafkaStream> matches = new ArrayList<>();
         for (KafkaStream kafkaStream : allStreams) {
             String streamId = kafkaStream.getStreamId();
             if (streamPattern.matcher(streamId).find()) {
-                builder.add(kafkaStream);
+                matches.add(kafkaStream);
             }
         }
 
-        return builder.build();
+        return Set.copyOf(matches);
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index 8220ea14..ca745441 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.UserCodeClassLoader;
 
-import com.google.common.collect.ArrayListMultimap;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -185,10 +184,11 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
             return;
         }
 
-        ArrayListMultimap<String, KafkaPartitionSplit> clusterSplitsMap =
-                ArrayListMultimap.create();
+        Map<String, List<KafkaPartitionSplit>> clusterSplitsMap = new 
HashMap<>();
         for (DynamicKafkaSourceSplit split : splits) {
-            clusterSplitsMap.put(split.getKafkaClusterId(), split);
+            clusterSplitsMap
+                    .computeIfAbsent(split.getKafkaClusterId(), unused -> new 
ArrayList<>())
+                    .add(split);
         }
 
         Set<String> kafkaClusterIds = clusterSplitsMap.keySet();
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java
index 25ef25b9..b98cc805 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java
@@ -21,8 +21,6 @@ package org.apache.flink.connector.kafka.dynamic.source.split;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
-import com.google.common.base.MoreObjects;
-
 import java.util.Objects;
 
 /** Split that wraps {@link KafkaPartitionSplit} with Kafka cluster 
information. */
@@ -56,10 +54,13 @@ public class DynamicKafkaSourceSplit extends 
KafkaPartitionSplit {
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("kafkaClusterId", kafkaClusterId)
-                .add("kafkaPartitionSplit", kafkaPartitionSplit)
-                .toString();
+        return "DynamicKafkaSourceSplit{"
+                + "kafkaClusterId='"
+                + kafkaClusterId
+                + '\''
+                + ", kafkaPartitionSplit="
+                + kafkaPartitionSplit
+                + '}';
     }
 
     @Override
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java
index 852894f8..69648374 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java
@@ -23,8 +23,6 @@ import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
-import com.google.common.io.ByteStreams;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -69,7 +67,7 @@ public class DynamicKafkaSourceSplitSerializer
             int kafkaPartitionSplitSerializerVersion = in.readInt();
             KafkaPartitionSplit kafkaPartitionSplit =
                     kafkaPartitionSplitSerializer.deserialize(
-                            kafkaPartitionSplitSerializerVersion, 
ByteStreams.toByteArray(in));
+                            kafkaPartitionSplitSerializerVersion, 
in.readAllBytes());
             return new DynamicKafkaSourceSplit(kafkaClusterId, 
kafkaPartitionSplit);
         }
     }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index cc1f803c..1e24137e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -19,8 +19,8 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
 import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
 import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
@@ -31,7 +31,6 @@ import 
org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
 import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 
-import com.google.common.reflect.TypeToken;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.slf4j.Logger;
@@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.lang.reflect.Type;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -433,15 +433,17 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
                                         .getProducedType()));
             } else {
                 // gets type information from serialize method signature
-                TypeToken serializationSchemaType =
-                        TypeToken.of(valueSerializationSchema.getClass());
-                Class parameterType =
-                        serializationSchemaType
-                                
.resolveType(SerializationSchema.class.getTypeParameters()[0])
-                                .getRawType();
-                if (parameterType != Object.class) {
+                Type type =
+                        TypeExtractor.getParameterType(
+                                SerializationSchema.class, 
valueSerializationSchema.getClass(), 0);
+                try {
                     return Optional.of(
-                            new 
DefaultTypeDatasetFacet(TypeInformation.of(parameterType)));
+                            new 
DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type)));
+                } catch (Exception e) {
+                    LOG.info(
+                            "Could not extract type information from {}",
+                            valueSerializationSchema.getClass(),
+                            e);
                 }
             }
             return Optional.empty();
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
index f590cc37..febfa533 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
 import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
 import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
@@ -36,11 +36,13 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
-import com.google.common.reflect.TypeToken;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -58,6 +60,8 @@ class DynamicKafkaRecordSerializationSchema
         implements KafkaRecordSerializationSchema<RowData>,
                 KafkaDatasetFacetProvider,
                 TypeDatasetFacetProvider {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(DynamicKafkaRecordSerializationSchema.class);
 
     private final Set<String> topics;
     private final Pattern topicPattern;
@@ -208,13 +212,16 @@ class DynamicKafkaRecordSerializationSchema
                             ((ResultTypeQueryable<?>) 
this.valueSerialization).getProducedType()));
         } else {
             // gets type information from serialize method signature
-            TypeToken serializationSchemaType = 
TypeToken.of(valueSerialization.getClass());
-            Class parameterType =
-                    serializationSchemaType
-                            
.resolveType(SerializationSchema.class.getTypeParameters()[0])
-                            .getRawType();
-            if (parameterType != Object.class) {
-                return Optional.of(new 
DefaultTypeDatasetFacet(TypeInformation.of(parameterType)));
+            Type type =
+                    TypeExtractor.getParameterType(
+                            SerializationSchema.class, 
valueSerialization.getClass(), 0);
+            try {
+                return Optional.of(new 
DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type)));
+            } catch (Exception e) {
+                LOG.info(
+                        "Could not extract type information from {}",
+                        valueSerialization.getClass(),
+                        e);
             }
         }
         return Optional.empty();
diff --git a/pom.xml b/pom.xml
index 35e88180..652c197f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -264,6 +264,8 @@ under the License.
                                <groupId>com.google.guava</groupId>
                                <artifactId>guava</artifactId>
                                <version>${guava.version}</version>
+                               <!-- Don't use guava in production code-->
+                               <scope>test</scope>
                        </dependency>
 
             <!-- Flink dependencies -->

Reply via email to