This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push: new a7785630 [FLINK-31049] [flink-connector-kafka] Add support for Kafka record headers to KafkaSink a7785630 is described below commit a7785630e714af303b224c38d9a6caa89a551265 Author: Alex Gout <alex.g...@shopify.com> AuthorDate: Mon Mar 27 10:07:33 2023 -0400 [FLINK-31049] [flink-connector-kafka] Add support for Kafka record headers to KafkaSink Co-Authored-By: Tzu-Li (Gordon) Tai <tzuli...@apache.org> This closes #18. --- .../flink/connector/kafka/sink/HeaderProvider.java | 32 +++++++++++++++ .../KafkaRecordSerializationSchemaBuilder.java | 48 ++++++++++++++++++---- .../KafkaRecordSerializationSchemaBuilderTest.java | 24 +++++++++++ 3 files changed, 96 insertions(+), 8 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProvider.java new file mode 100644 index 00000000..2c0c080b --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import java.io.Serializable; + +/** Creates an {@link Iterable} of {@link Header}s from the input element. */ +@PublicEvolving +public interface HeaderProvider<IN> extends Serializable { + Headers getHeaders(IN input); +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java index 59864a37..1cc92201 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java @@ -84,6 +84,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { @Nullable private SerializationSchema<? super IN> valueSerializationSchema; @Nullable private FlinkKafkaPartitioner<? super IN> partitioner; @Nullable private SerializationSchema<? super IN> keySerializationSchema; + @Nullable private HeaderProvider<? super IN> headerProvider; /** * Sets a custom partitioner determining the target partition of the target topic. @@ -190,6 +191,20 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { return self; } + /** + * Sets a {@link HeaderProvider} which is used to add headers to the {@link ProducerRecord} for + * the current element. + * + * @param headerProvider + * @return {@code this} + */ + public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setHeaderProvider( + HeaderProvider<? super T> headerProvider) { + KafkaRecordSerializationSchemaBuilder<T> self = self(); + self.headerProvider = checkNotNull(headerProvider); + return self; + } + @SuppressWarnings("unchecked") private <T extends IN> KafkaRecordSerializationSchemaBuilder<T> self() { return (KafkaRecordSerializationSchemaBuilder<T>) this; @@ -239,7 +254,11 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { checkState(valueSerializationSchema != null, "No value serializer is configured."); checkState(topicSelector != null, "No topic selector is configured."); return new KafkaRecordSerializationSchemaWrapper<>( - topicSelector, valueSerializationSchema, keySerializationSchema, partitioner); + topicSelector, + valueSerializationSchema, + keySerializationSchema, + partitioner, + headerProvider); } private void checkValueSerializerNotSet() { @@ -278,16 +297,19 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { private final Function<? super IN, String> topicSelector; private final FlinkKafkaPartitioner<? super IN> partitioner; private final SerializationSchema<? super IN> keySerializationSchema; + private final HeaderProvider<? super IN> headerProvider; KafkaRecordSerializationSchemaWrapper( Function<? super IN, String> topicSelector, SerializationSchema<? super IN> valueSerializationSchema, @Nullable SerializationSchema<? super IN> keySerializationSchema, - @Nullable FlinkKafkaPartitioner<? super IN> partitioner) { + @Nullable FlinkKafkaPartitioner<? super IN> partitioner, + @Nullable HeaderProvider<? super IN> headerProvider) { this.topicSelector = checkNotNull(topicSelector); this.valueSerializationSchema = checkNotNull(valueSerializationSchema); this.partitioner = partitioner; this.keySerializationSchema = keySerializationSchema; + this.headerProvider = headerProvider; } @Override @@ -325,12 +347,22 @@ public class KafkaRecordSerializationSchemaBuilder<IN> { context.getPartitionsForTopic(targetTopic))) : OptionalInt.empty(); - return new ProducerRecord<>( - targetTopic, - partition.isPresent() ? partition.getAsInt() : null, - timestamp == null || timestamp < 0L ? null : timestamp, - key, - value); + if (headerProvider != null) { + return new ProducerRecord<>( + targetTopic, + partition.isPresent() ? partition.getAsInt() : null, + timestamp == null || timestamp < 0L ? null : timestamp, + key, + value, + headerProvider.getHeaders(element)); + } else { + return new ProducerRecord<>( + targetTopic, + partition.isPresent() ? partition.getAsInt() : null, + timestamp == null || timestamp < 0L ? null : timestamp, + key, + value); + } } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java index 614624ea..6dd5baed 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java @@ -28,12 +28,16 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Before; import org.junit.Test; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -145,6 +149,26 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { assertThat(opened.get()).isTrue(); } + @Test + public void testSerializeRecordWithHeaderProvider() throws Exception { + final HeaderProvider<String> headerProvider = + (ignored) -> + new RecordHeaders(ImmutableList.of(new RecordHeader("a", "a".getBytes()))); + + final KafkaRecordSerializationSchema<String> schema = + KafkaRecordSerializationSchema.builder() + .setTopic(DEFAULT_TOPIC) + .setValueSerializationSchema(new SimpleStringSchema()) + .setHeaderProvider(headerProvider) + .build(); + final ProducerRecord<byte[], byte[]> record = schema.serialize("a", null, null); + assertThat(record).isNotNull(); + assertThat(record.headers()) + .singleElement() + .extracting(Header::key, Header::value) + .containsExactly("a", "a".getBytes(StandardCharsets.UTF_8)); + } + @Test public void testSerializeRecordWithKey() { final SerializationSchema<String> serializationSchema = new SimpleStringSchema();