This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 8c1033d [FLINK-24282][connectors/kafka] Make topic selector for KafkaSink serializable 8c1033d is described below commit 8c1033d9dfc3133891ee543f4c6b4ea568789ab0 Author: Fabian Paul <fabianp...@ververica.com> AuthorDate: Tue Sep 14 16:26:15 2021 +0200 [FLINK-24282][connectors/kafka] Make topic selector for KafkaSink serializable It is possible to calculate the target topic per record. Therefore users can provide a lambda when constructing the KafkaSink. Before this commit the lambda was not marked as serializable and could not be transferred to the workers. --- .../KafkaRecordSerializationSchemaBuilder.java | 43 ++++++++-------------- .../flink/connector/kafka/sink/TopicSelector.java | 31 ++++++++++++++++ .../KafkaRecordSerializationSchemaBuilderTest.java | 2 +- .../connector/kafka/sink/KafkaSinkITCase.java | 34 ++++++++++------- 4 files changed, 68 insertions(+), 42 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index 30ff2f8..e2d0d29 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -20,16 +20,14 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; -import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader; -import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache; - import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.serialization.Serializer; import javax.annotation.Nullable; +import java.io.Serializable; +import java.util.HashMap; import java.util.Map; import java.util.OptionalInt; import java.util.function.Function; @@ -119,7 +117,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { * @return {@code this} */ public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setTopicSelector( - Function<? super T, String> topicSelector) { + TopicSelector<? super T> topicSelector) { checkState(this.topicSelector == null, "Topic selector already set."); KafkaRecordSerializationSchemaBuilder<T> self = self(); self.topicSelector = new CachingTopicSelector<>(checkNotNull(topicSelector)); @@ -252,34 +250,25 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { checkState(keySerializationSchema == null, "Key serializer already set."); } - private static class CachingTopicSelector<IN> implements Function<IN, String> { - - private final LoadingCache<IN, String> cache; - - CachingTopicSelector(Function<IN, String> topicSelector) { - this.cache = - CacheBuilder.newBuilder() - .maximumSize(5) - .build(new TopicSelectorCacheLoader<>(topicSelector)); - } - - @Override - public String apply(IN in) { - return cache.getUnchecked(in); - } - } - - private static class TopicSelectorCacheLoader<IN> extends CacheLoader<IN, String> { + private static class CachingTopicSelector<IN> implements Function<IN, String>, Serializable { - private final Function<IN, String> topicSelector; + private static final int CACHE_RESET_SIZE = 5; + private final Map<IN, String> cache; + private final TopicSelector<IN> topicSelector; - TopicSelectorCacheLoader(Function<IN, String> topicSelector) { + CachingTopicSelector(TopicSelector<IN> topicSelector) { this.topicSelector = topicSelector; + this.cache = new HashMap<>(); } @Override - public String load(IN in) throws Exception { - return topicSelector.apply(in); + public String apply(IN in) { + final String topic = cache.getOrDefault(in, topicSelector.apply(in)); + cache.put(in, topic); + if (cache.size() == CACHE_RESET_SIZE) { + cache.clear(); + } + return topic; } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java new file mode 100644 index 0000000..2a20754 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TopicSelector.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.function.Function; + +/** + * Selects a topic for the incoming record. + * + * @param <IN> type of the incoming record + */ +@PublicEvolving +public interface TopicSelector<IN> extends Function<IN, String>, Serializable {} diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java index 3eb6450..80c92d3 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java @@ -104,7 +104,7 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { @Test public void testSerializeRecordWithTopicSelector() { - final Function<String, String> topicSelector = + final TopicSelector<String> topicSelector = (e) -> { if (e.equals("a")) { return "topic-a"; diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index fe4ae3c..6fc7511 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -60,7 +61,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.junit.After; import org.junit.AfterClass; @@ -117,6 +117,7 @@ public class KafkaSinkITCase extends TestLogger { private static final int ZK_TIMEOUT_MILLIS = 30000; private static final short TOPIC_REPLICATION_FACTOR = 1; private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1); + private static final RecordSerializer serializer = new RecordSerializer(); private static AdminClient admin; private String topic; @@ -307,7 +308,11 @@ public class KafkaSinkITCase extends TestLogger { new KafkaSinkBuilder<Long>() .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) - .setRecordSerializer(new RecordSerializer(topic)); + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setValueSerializationSchema(new RecordSerializer()) + .build()); if (transactionalIdPrefix == null) { transactionalIdPrefix = "kafka-sink"; } @@ -335,7 +340,11 @@ public class KafkaSinkITCase extends TestLogger { new KafkaSinkBuilder<Long>() .setDeliverGuarantee(guarantee) .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) - .setRecordSerializer(new RecordSerializer(topic)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setValueSerializationSchema(new RecordSerializer()) + .build()) .setTransactionalIdPrefix("kafka-sink") .build()); env.execute(); @@ -361,7 +370,11 @@ public class KafkaSinkITCase extends TestLogger { new KafkaSinkBuilder<Long>() .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) .setDeliverGuarantee(deliveryGuarantee) - .setRecordSerializer(new RecordSerializer(topic)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setValueSerializationSchema(new RecordSerializer()) + .build()) .setTransactionalIdPrefix("kafka-sink") .build()); env.execute(); @@ -447,20 +460,13 @@ public class KafkaSinkITCase extends TestLogger { return collectedRecords; } - private static class RecordSerializer implements KafkaRecordSerializationSchema<Long> { - - private final String topic; - - public RecordSerializer(String topic) { - this.topic = topic; - } + private static class RecordSerializer implements SerializationSchema<Long> { @Override - public ProducerRecord<byte[], byte[]> serialize( - Long element, KafkaSinkContext context, Long timestamp) { + public byte[] serialize(Long element) { final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); buffer.putLong(element); - return new ProducerRecord<>(topic, 0, null, null, buffer.array()); + return buffer.array(); } }