This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d9c71e323d7765d03ecc0359692ec33d79abf426 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Wed Jun 26 12:01:02 2019 +0200 [FLINK-11693] Support FlinkKafkaPartitioner functionality with new KafkaSerializationSchema --- .../connectors/kafka/KafkaContextAware.java | 61 ++++++++++++++++++++++ .../connectors/kafka/KafkaSerializationSchema.java | 9 +++- .../connectors/kafka/FlinkKafkaProducer.java | 32 +++++++++++- 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java new file mode 100644 index 0000000..22399c0 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * An interface for {@link KafkaSerializationSchema KafkaSerializationSchemas} that need information + * about the context where the Kafka Producer is running along with information about the available + * partitions. + * + * <p>You only need to override the methods for the information that you need. However, {@link + * #getTargetTopic(Object)} is required because it is used to determine the available partitions. + */ +@PublicEvolving +public interface KafkaContextAware<T> { + + + /** + * Sets the number of the parallel subtask that the Kafka Producer is running on. The numbering + * starts from 0 and goes up to parallelism-1 (parallelism as returned by {@link + * #setNumParallelInstances(int)}). + */ + default void setParallelInstanceId(int parallelInstanceId) { + } + + /** + * Sets the parallelism with which the parallel task of the Kafka Producer runs. + */ + default void setNumParallelInstances(int numParallelInstances) { + } + + /** + * Sets the available partitions for the topic returned from {@link #getTargetTopic(Object)}. + */ + default void setPartitions(int[] partitions) { + } + + /** + * Returns the topic that the presented element should be sent to. This is not used for setting + * the topic (this is done via the {@link org.apache.kafka.clients.producer.ProducerRecord} that + * is returned from {@link KafkaSerializationSchema#serialize(Object, Long)}, it is only used + * for getting the available partitions that are presented to {@link #setPartitions(int[])}. + */ + String getTargetTopic(T element); +} diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java index 31fda0e..37d30fc 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java @@ -26,8 +26,13 @@ import javax.annotation.Nullable; import java.io.Serializable; /** - * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into - * {@link ProducerRecord ProducerRecords}. + * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into {@link + * ProducerRecord ProducerRecords}. + * + * <p>Please also implement {@link KafkaContextAware} if your serialization schema needs + * information + * about the available partitions and the number of parallel subtasks along with the subtask ID on + * which the Kafka Producer is running. * * @param <T> the type of values being serialized */ diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index a8bff7c..e2c8bbc 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -770,8 +770,30 @@ public class FlinkKafkaProducer<IN> } else { record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); } - } else { + } else if (kafkaSchema != null) { + if (kafkaSchema instanceof KafkaContextAware) { + @SuppressWarnings("unchecked") + KafkaContextAware<IN> contextAwareSchema = + (KafkaContextAware<IN>) kafkaSchema; + + String targetTopic = contextAwareSchema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } + int[] partitions = topicPartitionsMap.get(targetTopic); + + if (null == partitions) { + partitions = getPartitionsByTopic(targetTopic, transaction.producer); + topicPartitionsMap.put(targetTopic, partitions); + } + + contextAwareSchema.setPartitions(partitions); + } record = kafkaSchema.serialize(next, context.timestamp()); + } else { + throw new RuntimeException( + "We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this" + + "is a bug."); } pendingRecords.incrementAndGet(); @@ -1101,6 +1123,14 @@ public class FlinkKafkaProducer<IN> flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); } + if (kafkaSchema instanceof KafkaContextAware) { + KafkaContextAware<IN> contextAwareSchema = + (KafkaContextAware<IN>) kafkaSchema; + + contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask()); + contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks()); + } + LOG.info("Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}", ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);