This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d9c71e323d7765d03ecc0359692ec33d79abf426
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Wed Jun 26 12:01:02 2019 +0200

    [FLINK-11693] Support FlinkKafkaPartitioner functionality with new 
KafkaSerializationSchema
---
 .../connectors/kafka/KafkaContextAware.java        | 61 ++++++++++++++++++++++
 .../connectors/kafka/KafkaSerializationSchema.java |  9 +++-
 .../connectors/kafka/FlinkKafkaProducer.java       | 32 +++++++++++-
 3 files changed, 99 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java
new file mode 100644
index 0000000..22399c0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An interface for {@link KafkaSerializationSchema KafkaSerializationSchemas} 
that need information
+ * about the context where the Kafka Producer is running along with 
information about the available
+ * partitions.
+ *
+ * <p>You only need to override the methods for the information that you need. 
However, {@link
+ * #getTargetTopic(Object)} is required because it is used to determine the 
available partitions.
+ */
+@PublicEvolving
+public interface KafkaContextAware<T> {
+
+
+       /**
+        * Sets the number of the parallel subtask that the Kafka Producer is 
running on. The numbering
+        * starts from 0 and goes up to parallelism-1 (parallelism as returned 
by {@link
+        * #setNumParallelInstances(int)}).
+        */
+       default void setParallelInstanceId(int parallelInstanceId) {
+       }
+
+       /**
+        * Sets the parallelism with which the parallel task of the Kafka 
Producer runs.
+        */
+       default void setNumParallelInstances(int numParallelInstances) {
+       }
+
+       /**
+        * Sets the available partitions for the topic returned from {@link 
#getTargetTopic(Object)}.
+        */
+       default void setPartitions(int[] partitions) {
+       }
+
+       /**
+        * Returns the topic that the presented element should be sent to. This 
is not used for setting
+        * the topic (this is done via the {@link 
org.apache.kafka.clients.producer.ProducerRecord} that
+        * is returned from {@link KafkaSerializationSchema#serialize(Object, 
Long)}, it is only used
+        * for getting the available partitions that are presented to {@link 
#setPartitions(int[])}.
+        */
+       String getTargetTopic(T element);
+}
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
index 31fda0e..37d30fc 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
@@ -26,8 +26,13 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 
 /**
- * A {@link KafkaSerializationSchema} defines how to serialize values of type 
{@code T} into
- * {@link ProducerRecord ProducerRecords}.
+ * A {@link KafkaSerializationSchema} defines how to serialize values of type 
{@code T} into {@link
+ * ProducerRecord ProducerRecords}.
+ *
+ * <p>Please also implement {@link KafkaContextAware} if your serialization 
schema needs
+ * information
+ * about the available partitions and the number of parallel subtasks along 
with the subtask ID on
+ * which the Kafka Producer is running.
  *
  * @param <T> the type of values being serialized
  */
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index a8bff7c..e2c8bbc 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -770,8 +770,30 @@ public class FlinkKafkaProducer<IN>
                        } else {
                                record = new ProducerRecord<>(targetTopic, 
null, timestamp, serializedKey, serializedValue);
                        }
-               } else {
+               } else if (kafkaSchema != null) {
+                       if (kafkaSchema instanceof KafkaContextAware) {
+                               @SuppressWarnings("unchecked")
+                               KafkaContextAware<IN> contextAwareSchema =
+                                               (KafkaContextAware<IN>) 
kafkaSchema;
+
+                               String targetTopic = 
contextAwareSchema.getTargetTopic(next);
+                               if (targetTopic == null) {
+                                       targetTopic = defaultTopicId;
+                               }
+                               int[] partitions = 
topicPartitionsMap.get(targetTopic);
+
+                               if (null == partitions) {
+                                       partitions = 
getPartitionsByTopic(targetTopic, transaction.producer);
+                                       topicPartitionsMap.put(targetTopic, 
partitions);
+                               }
+
+                               contextAwareSchema.setPartitions(partitions);
+                       }
                        record = kafkaSchema.serialize(next, 
context.timestamp());
+               } else {
+                       throw new RuntimeException(
+                                       "We have neither 
KafkaSerializationSchema nor KeyedSerializationSchema, this" +
+                                                       "is a bug.");
                }
 
                pendingRecords.incrementAndGet();
@@ -1101,6 +1123,14 @@ public class FlinkKafkaProducer<IN>
                        flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
                }
 
+               if (kafkaSchema instanceof KafkaContextAware) {
+                       KafkaContextAware<IN> contextAwareSchema =
+                                       (KafkaContextAware<IN>) kafkaSchema;
+
+                       
contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
+                       
contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
+               }
+
                LOG.info("Starting FlinkKafkaInternalProducer ({}/{}) to 
produce into default topic {}",
                        ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
 

Reply via email to