This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch v4.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit c7f75bde7840e93a3aa65e84e4b396a3c4e473a3 Author: Arvid Heise <[email protected]> AuthorDate: Thu Apr 10 09:16:55 2025 +0200 [FLINK-37644] Remove guava from prod code With Java 11 most of the usages can be easily replaced. TypeToken can be replaced by Flink's TypeExtractor. The sole hard thing was a use of Map.difference but it's only needed for logging and we can solve this in a different way. --- .../util/kafka/SQLClientSchemaRegistryITCase.java | 4 +--- flink-connector-kafka/pom.xml | 11 +++++----- .../kafka/dynamic/metadata/ClusterMetadata.java | 7 +----- .../kafka/dynamic/metadata/KafkaStream.java | 13 +++++------ .../kafka/dynamic/source/MetadataUpdateEvent.java | 4 +--- .../enumerator/DynamicKafkaSourceEnumerator.java | 25 ++++++++-------------- .../subscriber/StreamPatternSubscriber.java | 10 ++++----- .../source/reader/DynamicKafkaSourceReader.java | 8 +++---- .../source/split/DynamicKafkaSourceSplit.java | 13 +++++------ .../split/DynamicKafkaSourceSplitSerializer.java | 4 +--- .../KafkaRecordSerializationSchemaBuilder.java | 22 ++++++++++--------- .../DynamicKafkaRecordSerializationSchema.java | 25 ++++++++++++++-------- pom.xml | 2 ++ 13 files changed, 72 insertions(+), 76 deletions(-) diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java index e3b18194..522eff09 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java @@ -65,8 +65,6 @@ public class SQLClientSchemaRegistryITCase { ResourceTestUtils.getResource(".*avro-confluent.jar"); private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.jar"); - private final Path guavaJar = ResourceTestUtils.getResource(".*guava.jar"); - @ClassRule public static final Network NETWORK = Network.newNetwork(); @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); @@ -252,7 +250,7 @@ public class SQLClientSchemaRegistryITCase { private void executeSqlStatements(List<String> sqlLines) throws Exception { flink.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) - .addJars(sqlAvroJar, sqlAvroRegistryJar, sqlConnectorKafkaJar, guavaJar) + .addJars(sqlAvroJar, sqlAvroRegistryJar, sqlConnectorKafkaJar) .build()); } } diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index da9f0bcd..4c4d3e36 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -82,13 +82,14 @@ under the License. <version>${kafka.version}</version> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <!-- Tests --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java index 1ff18930..964e51e5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java @@ -20,8 +20,6 @@ package org.apache.flink.connector.kafka.dynamic.metadata; import org.apache.flink.annotation.Experimental; -import com.google.common.base.MoreObjects; - import java.io.Serializable; import java.util.Objects; import java.util.Properties; @@ -67,10 +65,7 @@ public class ClusterMetadata implements Serializable { @Override public String toString() { - return MoreObjects.toStringHelper(this) - .add("topics", topics) - .add("properties", properties) - .toString(); + return "ClusterMetadata{" + "topics=" + topics + ", properties=" + properties + '}'; } @Override diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java index bea9872c..b08df6c5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java @@ -20,8 +20,6 @@ package org.apache.flink.connector.kafka.dynamic.metadata; import org.apache.flink.annotation.Experimental; -import com.google.common.base.MoreObjects; - import java.io.Serializable; import java.util.Map; import java.util.Objects; @@ -68,10 +66,13 @@ public class KafkaStream implements Serializable { @Override public String toString() { - return MoreObjects.toStringHelper(this) - .add("streamId", streamId) - .add("clusterMetadataMap", clusterMetadataMap) - .toString(); + return "KafkaStream{" + + "streamId='" + + streamId + + '\'' + + ", clusterMetadataMap=" + + clusterMetadataMap + + '}'; } @Override diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java index 09b32d03..c7ae368f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java @@ -23,8 +23,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream; import org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader; -import com.google.common.base.MoreObjects; - import java.util.Objects; import java.util.Set; @@ -55,7 +53,7 @@ public class MetadataUpdateEvent implements SourceEvent { @Override public String toString() { - return MoreObjects.toStringHelper(this).add("kafkaStreams", kafkaStreams).toString(); + return "MetadataUpdateEvent{" + "kafkaStreams=" + kafkaStreams + '}'; } @Override diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java index 48e1c78d..ff7cc21d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java @@ -41,9 +41,6 @@ import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscr import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.util.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -61,6 +58,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; /** @@ -260,17 +258,11 @@ public class DynamicKafkaSourceEnumerator } if (logger.isInfoEnabled()) { - MapDifference<String, Set<String>> metadataDifference = - Maps.difference(latestClusterTopicsMap, newClustersTopicsMap); + // log the maps in a sorted fashion so it's easy to see the changes logger.info( - "Common cluster topics after metadata refresh: {}", - metadataDifference.entriesInCommon()); - logger.info( - "Removed cluster topics after metadata refresh: {}", - metadataDifference.entriesOnlyOnLeft()); - logger.info( - "Additional cluster topics after metadata refresh: {}", - metadataDifference.entriesOnlyOnRight()); + "Detected changed cluster topics after metadata refresh:\nPrevious: {}\nNew: {}", + new TreeMap<>(latestClusterTopicsMap), + new TreeMap<>(newClustersTopicsMap)); } DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState; @@ -456,10 +448,11 @@ public class DynamicKafkaSourceEnumerator public void addSplitsBack(List<DynamicKafkaSourceSplit> splits, int subtaskId) { logger.debug("Adding splits back for {}", subtaskId); // separate splits by cluster - ArrayListMultimap<String, KafkaPartitionSplit> kafkaPartitionSplits = - ArrayListMultimap.create(); + Map<String, List<KafkaPartitionSplit>> kafkaPartitionSplits = new HashMap<>(); for (DynamicKafkaSourceSplit split : splits) { - kafkaPartitionSplits.put(split.getKafkaClusterId(), split.getKafkaPartitionSplit()); + kafkaPartitionSplits + .computeIfAbsent(split.getKafkaClusterId(), unused -> new ArrayList<>()) + .add(split.getKafkaPartitionSplit()); } // add splits back and assign pending splits for all enumerators diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java index d95f7e0f..4000a9b7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java @@ -22,8 +22,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService; import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream; -import com.google.common.collect.ImmutableSet; - +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.regex.Pattern; @@ -40,14 +40,14 @@ public class StreamPatternSubscriber implements KafkaStreamSubscriber { @Override public Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService) { Set<KafkaStream> allStreams = kafkaMetadataService.getAllStreams(); - ImmutableSet.Builder<KafkaStream> builder = ImmutableSet.builder(); + List<KafkaStream> matches = new ArrayList<>(); for (KafkaStream kafkaStream : allStreams) { String streamId = kafkaStream.getStreamId(); if (streamPattern.matcher(streamId).find()) { - builder.add(kafkaStream); + matches.add(kafkaStream); } } - return builder.build(); + return Set.copyOf(matches); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java index 8220ea14..ca745441 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java @@ -49,7 +49,6 @@ import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper; import org.apache.flink.util.Preconditions; import org.apache.flink.util.UserCodeClassLoader; -import com.google.common.collect.ArrayListMultimap; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,10 +184,11 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka return; } - ArrayListMultimap<String, KafkaPartitionSplit> clusterSplitsMap = - ArrayListMultimap.create(); + Map<String, List<KafkaPartitionSplit>> clusterSplitsMap = new HashMap<>(); for (DynamicKafkaSourceSplit split : splits) { - clusterSplitsMap.put(split.getKafkaClusterId(), split); + clusterSplitsMap + .computeIfAbsent(split.getKafkaClusterId(), unused -> new ArrayList<>()) + .add(split); } Set<String> kafkaClusterIds = clusterSplitsMap.keySet(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java index 25ef25b9..b98cc805 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java @@ -21,8 +21,6 @@ package org.apache.flink.connector.kafka.dynamic.source.split; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; -import com.google.common.base.MoreObjects; - import java.util.Objects; /** Split that wraps {@link KafkaPartitionSplit} with Kafka cluster information. */ @@ -56,10 +54,13 @@ public class DynamicKafkaSourceSplit extends KafkaPartitionSplit { @Override public String toString() { - return MoreObjects.toStringHelper(this) - .add("kafkaClusterId", kafkaClusterId) - .add("kafkaPartitionSplit", kafkaPartitionSplit) - .toString(); + return "DynamicKafkaSourceSplit{" + + "kafkaClusterId='" + + kafkaClusterId + + '\'' + + ", kafkaPartitionSplit=" + + kafkaPartitionSplit + + '}'; } @Override diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java index 852894f8..69648374 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java @@ -23,8 +23,6 @@ import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; -import com.google.common.io.ByteStreams; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -69,7 +67,7 @@ public class DynamicKafkaSourceSplitSerializer int kafkaPartitionSplitSerializerVersion = in.readInt(); KafkaPartitionSplit kafkaPartitionSplit = kafkaPartitionSplitSerializer.deserialize( - kafkaPartitionSplitSerializerVersion, ByteStreams.toByteArray(in)); + kafkaPartitionSplitSerializerVersion, in.readAllBytes()); return new DynamicKafkaSourceSplit(kafkaClusterId, kafkaPartitionSplit); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index cc1f803c..1e24137e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; @@ -31,7 +31,6 @@ import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; import org.apache.flink.connector.kafka.source.KafkaSource; -import com.google.common.reflect.TypeToken; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; import org.slf4j.Logger; @@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.Serializable; +import java.lang.reflect.Type; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -433,15 +433,17 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { .getProducedType())); } else { // gets type information from serialize method signature - TypeToken serializationSchemaType = - TypeToken.of(valueSerializationSchema.getClass()); - Class parameterType = - serializationSchemaType - .resolveType(SerializationSchema.class.getTypeParameters()[0]) - .getRawType(); - if (parameterType != Object.class) { + Type type = + TypeExtractor.getParameterType( + SerializationSchema.class, valueSerializationSchema.getClass(), 0); + try { return Optional.of( - new DefaultTypeDatasetFacet(TypeInformation.of(parameterType))); + new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type))); + } catch (Exception e) { + LOG.info( + "Could not extract type information from {}", + valueSerializationSchema.getClass(), + e); } } return Optional.empty(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index f590cc37..febfa533 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; @@ -36,11 +36,13 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; -import com.google.common.reflect.TypeToken; import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -58,6 +60,8 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData>, KafkaDatasetFacetProvider, TypeDatasetFacetProvider { + private static final Logger LOG = + LoggerFactory.getLogger(DynamicKafkaRecordSerializationSchema.class); private final Set<String> topics; private final Pattern topicPattern; @@ -208,13 +212,16 @@ class DynamicKafkaRecordSerializationSchema ((ResultTypeQueryable<?>) this.valueSerialization).getProducedType())); } else { // gets type information from serialize method signature - TypeToken serializationSchemaType = TypeToken.of(valueSerialization.getClass()); - Class parameterType = - serializationSchemaType - .resolveType(SerializationSchema.class.getTypeParameters()[0]) - .getRawType(); - if (parameterType != Object.class) { - return Optional.of(new DefaultTypeDatasetFacet(TypeInformation.of(parameterType))); + Type type = + TypeExtractor.getParameterType( + SerializationSchema.class, valueSerialization.getClass(), 0); + try { + return Optional.of(new DefaultTypeDatasetFacet(TypeExtractor.createTypeInfo(type))); + } catch (Exception e) { + LOG.info( + "Could not extract type information from {}", + valueSerialization.getClass(), + e); } } return Optional.empty(); diff --git a/pom.xml b/pom.xml index 35e88180..652c197f 100644 --- a/pom.xml +++ b/pom.xml @@ -264,6 +264,8 @@ under the License. <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> + <!-- Don't use guava in production code--> + <scope>test</scope> </dependency> <!-- Flink dependencies -->
