This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d43e6b82ea16f49aee5006de8d63212af7fea8fc
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Fri Feb 22 15:57:07 2019 +0100

    [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord
---
 .../streaming/connectors/kafka/Kafka010ITCase.java |   2 +-
 .../streaming/connectors/kafka/Kafka011ITCase.java |   2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |   1 -
 .../streaming/connectors/kafka/Kafka08ITCase.java  |   2 +-
 .../streaming/connectors/kafka/Kafka09ITCase.java  |   2 +-
 .../connectors/kafka/Kafka09SecuredRunITCase.java  |   2 +-
 .../connectors/kafka/KafkaSerializationSchema.java |  45 +++++
 .../serialization/KeyedSerializationSchema.java    |   4 +
 .../connectors/kafka/KafkaConsumerTestBase.java    |  63 ++++++-
 .../connectors/kafka/KafkaTestEnvironment.java     |   5 +
 .../connectors/kafka/FlinkKafkaProducer.java       | 197 +++++++++++++++++----
 .../streaming/connectors/kafka/KafkaITCase.java    |   9 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  10 +-
 13 files changed, 288 insertions(+), 56 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 6c1d1e2..0c8bdbb 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -115,7 +115,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
        @Test(timeout = 60000)
        public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
+               runProduceConsumeMultipleTopics(true);
        }
 
        @Test(timeout = 60000)
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index 3677daa..0a68766 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -123,7 +123,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
 
        @Test(timeout = 60000)
        public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
+               runProduceConsumeMultipleTopics(true);
        }
 
        @Test(timeout = 60000)
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 23cd57e..a3982ba 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -21,7 +21,6 @@ import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index e360c19..b97b3e9 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -237,7 +237,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
        @Test(timeout = 60000)
        public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
+               runProduceConsumeMultipleTopics(true);
        }
 
        @Test(timeout = 60000)
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 47d910d..8fdbbe7 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -109,7 +109,7 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 
        @Test(timeout = 60000)
        public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
+               runProduceConsumeMultipleTopics(true);
        }
 
        @Test(timeout = 60000)
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
index 8cd61cc..f8ce2f5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -54,7 +54,7 @@ public class Kafka09SecuredRunITCase extends 
KafkaConsumerTestBase {
        //The timeout for the test case is 2 times timeout of ZK connection
        @Test(timeout = 600000)
        public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
+               runProduceConsumeMultipleTopics(true);
        }
 
 }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
new file mode 100644
index 0000000..31fda0e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * A {@link KafkaSerializationSchema} defines how to serialize values of type 
{@code T} into
+ * {@link ProducerRecord ProducerRecords}.
+ *
+ * @param <T> the type of values being serialized
+ */
+@PublicEvolving
+public interface KafkaSerializationSchema<T> extends Serializable {
+
+       /**
+        * Serializes given element and returns it as a {@link ProducerRecord}.
+        *
+        * @param element element to be serialized
+        * @param timestamp timestamp (can be null)
+        * @return Kafka {@link ProducerRecord}
+        */
+       ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long 
timestamp);
+}
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
index 2f610c2..b254f15 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 
 import java.io.Serializable;
 
@@ -27,7 +28,10 @@ import java.io.Serializable;
  * to them in a specific format (for example as byte strings).
  *
  * @param <T> The type to be serialized.
+ *
+ * @deprecated Use {@link KafkaSerializationSchema}.
  */
+@Deprecated
 @PublicEvolving
 public interface KeyedSerializationSchema<T> extends Serializable {
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8ca15b0..1582922 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -81,11 +81,13 @@ import kafka.server.KafkaServer;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 
+import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -1128,7 +1130,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
         * Test producing and consuming into multiple topics.
         * @throws Exception
         */
-       public void runProduceConsumeMultipleTopics() throws Exception {
+       public void runProduceConsumeMultipleTopics(boolean useLegacySchema) 
throws Exception {
                final int numTopics = 5;
                final int numElements = 20;
 
@@ -1166,12 +1168,17 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                        }
                });
 
-               Tuple2WithTopicSchema schema = new 
Tuple2WithTopicSchema(env.getConfig());
-
                Properties props = new Properties();
                props.putAll(standardProps);
                props.putAll(secureProps);
-               kafkaServer.produceIntoKafka(stream, "dummy", schema, props, 
null);
+
+               if (useLegacySchema) {
+                       Tuple2WithTopicSchema schema = new 
Tuple2WithTopicSchema(env.getConfig());
+                       kafkaServer.produceIntoKafka(stream, "dummy", schema, 
props, null);
+               } else {
+                       TestDeSerializer schema = new 
TestDeSerializer(env.getConfig());
+                       kafkaServer.produceIntoKafka(stream, "dummy", schema, 
props);
+               }
 
                env.execute("Write to topics");
 
@@ -1179,7 +1186,13 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
 
-               stream = env.addSource(kafkaServer.getConsumer(topics, schema, 
props));
+               if (useLegacySchema) {
+                       Tuple2WithTopicSchema schema = new 
Tuple2WithTopicSchema(env.getConfig());
+                       stream = env.addSource(kafkaServer.getConsumer(topics, 
schema, props));
+               } else {
+                       TestDeSerializer schema = new 
TestDeSerializer(env.getConfig());
+                       stream = env.addSource(kafkaServer.getConsumer(topics, 
schema, props));
+               }
 
                stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, 
String>, Integer>() {
                        Map<String, Integer> countPerTopic = new 
HashMap<>(numTopics);
@@ -2194,12 +2207,12 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                }
        }
 
-       private static class Tuple2WithTopicSchema implements 
KafkaDeserializationSchema<Tuple3<Integer, Integer, String>>,
-               KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+       private abstract static class TestDeserializer implements
+                       KafkaDeserializationSchema<Tuple3<Integer, Integer, 
String>> {
 
-               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+               protected final TypeSerializer<Tuple2<Integer, Integer>> ts;
 
-               public Tuple2WithTopicSchema(ExecutionConfig ec) {
+               public TestDeserializer(ExecutionConfig ec) {
                        ts = TypeInformation.of(new TypeHint<Tuple2<Integer, 
Integer>>(){}).createSerializer(ec);
                }
 
@@ -2219,6 +2232,14 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
                        return TypeInformation.of(new TypeHint<Tuple3<Integer, 
Integer, String>>(){});
                }
+       }
+
+       private static class Tuple2WithTopicSchema extends TestDeserializer
+                       implements KeyedSerializationSchema<Tuple3<Integer, 
Integer, String>> {
+
+               public Tuple2WithTopicSchema(ExecutionConfig ec) {
+                       super(ec);
+               }
 
                @Override
                public byte[] serializeKey(Tuple3<Integer, Integer, String> 
element) {
@@ -2242,4 +2263,28 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                        return element.f2;
                }
        }
+
+       private static class TestDeSerializer extends TestDeserializer
+                       implements KafkaSerializationSchema<Tuple3<Integer, 
Integer, String>> {
+
+               public TestDeSerializer(ExecutionConfig ec) {
+                       super(ec);
+               }
+
+               @Override
+               public ProducerRecord<byte[], byte[]> serialize(
+                               Tuple3<Integer, Integer, String> element, 
@Nullable Long timestamp) {
+                       ByteArrayOutputStream by = new ByteArrayOutputStream();
+                       DataOutputView out = new 
DataOutputViewStreamWrapper(by);
+                       try {
+                               ts.serialize(new Tuple2<>(element.f0, 
element.f1), out);
+                       } catch (IOException e) {
+                               throw new RuntimeException("Error" , e);
+                       }
+                       byte[] serializedValue = by.toByteArray();
+
+                       return new ProducerRecord<>(element.f2, 
serializedValue);
+               }
+
+       }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 336372d..3682280 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -154,6 +154,11 @@ public abstract class KafkaTestEnvironment {
                                                                                
                                KeyedSerializationSchema<T> serSchema, 
Properties props,
                                                                                
                                FlinkKafkaPartitioner<T> partitioner);
 
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic,
+                       KafkaSerializationSchema<T> serSchema, Properties 
props) {
+               throw new RuntimeException("KafkaSerializationSchema is only 
supported on the modern Kafka Connector.");
+       }
+
        // -- offset handlers
 
        /**
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index c7375eb..a8bff7c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -216,11 +216,20 @@ public class FlinkKafkaProducer<IN>
         * (Serializable) SerializationSchema for turning objects used with 
Flink into.
         * byte[] for Kafka.
         */
-       private final KeyedSerializationSchema<IN> schema;
+       @Nullable
+       private final KeyedSerializationSchema<IN> keyedSchema;
+
+       /**
+        * (Serializable) serialization schema for serializing records to
+        * {@link ProducerRecord ProducerRecords}.
+        */
+       @Nullable
+       private final KafkaSerializationSchema<IN> kafkaSchema;
 
        /**
         * User-provided partitioner for assigning an object to a Kafka 
partition for each topic.
         */
+       @Nullable
        private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
 
        /**
@@ -404,9 +413,6 @@ public class FlinkKafkaProducer<IN>
         * partition (i.e. all records received by a sink subtask will end up 
in the same
         * Kafka partition).
         *
-        * <p>To use a custom partitioner, please use
-        * {@link #FlinkKafkaProducer(String, KeyedSerializationSchema, 
Properties, Optional, FlinkKafkaProducer.Semantic, int)} instead.
-        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
@@ -483,25 +489,135 @@ public class FlinkKafkaProducer<IN>
         * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
         */
        public FlinkKafkaProducer(
-               String defaultTopicId,
-               KeyedSerializationSchema<IN> serializationSchema,
+                       String defaultTopicId,
+                       KeyedSerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
+                       FlinkKafkaProducer.Semantic semantic,
+                       int kafkaProducersPoolSize) {
+               this(
+                               defaultTopicId,
+                               serializationSchema,
+                               customPartitioner.orElse(null),
+                               null, /* kafka serialization schema */
+                               producerConfig,
+                               semantic,
+                               kafkaProducersPoolSize);
+       }
+
+       /**
+        * Creates a {@link FlinkKafkaProducer} for a given topic. The sink 
produces its input to
+        * the topic. It accepts a {@link KafkaSerializationSchema} for 
serializing records to
+        * a {@link ProducerRecord}, including partitioning information.
+        *
+        * @param defaultTopic The default topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param semantic Defines semantic that will be used by this producer 
(see {@link FlinkKafkaProducer.Semantic}).
+        */
+       public FlinkKafkaProducer(
+                       String defaultTopic,
+                       KafkaSerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       FlinkKafkaProducer.Semantic semantic) {
+               this(
+                               defaultTopic,
+                               serializationSchema,
+                               producerConfig,
+                               semantic,
+                               DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a {@link KafkaSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * @param defaultTopic The default topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param semantic Defines semantic that will be used by this producer 
(see {@link FlinkKafkaProducer.Semantic}).
+        * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+        */
+       public FlinkKafkaProducer(
+               String defaultTopic,
+               KafkaSerializationSchema<IN> serializationSchema,
                Properties producerConfig,
-               Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
                FlinkKafkaProducer.Semantic semantic,
                int kafkaProducersPoolSize) {
+               this(
+                               defaultTopic,
+                               null, null, /* keyed schema and 
FlinkKafkaPartitioner */
+                               serializationSchema,
+                               producerConfig,
+                               semantic,
+                               kafkaProducersPoolSize);
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a {@link KafkaSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
+        *
+        * @param defaultTopic The default topic to write data to
+        * @param keyedSchema A serializable serialization schema for turning 
user objects into a kafka-consumable byte[] supporting key/value messages
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If a partitioner is not provided, records 
will be partitioned by the key of each record
+        *                          (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                          are {@code null}, then records will be 
distributed to Kafka partitions in a
+        *                          round-robin fashion.
+        * @param kafkaSchema A serializable serialization schema for turning 
user objects into a kafka-consumable byte[] supporting key/value messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param semantic Defines semantic that will be used by this producer 
(see {@link FlinkKafkaProducer.Semantic}).
+        * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
+        */
+       private FlinkKafkaProducer(
+                       String defaultTopic,
+                       KeyedSerializationSchema<IN> keyedSchema,
+                       FlinkKafkaPartitioner<IN> customPartitioner,
+                       KafkaSerializationSchema<IN> kafkaSchema,
+                       Properties producerConfig,
+                       FlinkKafkaProducer.Semantic semantic,
+                       int kafkaProducersPoolSize) {
                super(new FlinkKafkaProducer.TransactionStateSerializer(), new 
FlinkKafkaProducer.ContextStateSerializer());
 
-               this.defaultTopicId = checkNotNull(defaultTopicId, 
"defaultTopicId is null");
-               this.schema = checkNotNull(serializationSchema, 
"serializationSchema is null");
+               this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic 
is null");
+
+               if (kafkaSchema != null) {
+                       this.keyedSchema = null;
+                       this.kafkaSchema = kafkaSchema;
+                       this.flinkKafkaPartitioner = null;
+                       ClosureCleaner.clean(
+                                       this.kafkaSchema, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+
+                       if (customPartitioner != null) {
+                               throw new IllegalArgumentException("Customer 
partitioner can only be used when" +
+                                               "using a 
KeyedSerializationSchema or SerializationSchema.");
+                       }
+               } else if (keyedSchema != null) {
+                       this.kafkaSchema = null;
+                       this.keyedSchema = keyedSchema;
+                       this.flinkKafkaPartitioner = customPartitioner;
+                       ClosureCleaner.clean(
+                                       this.flinkKafkaPartitioner,
+                                       
ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
+                                       true);
+                       ClosureCleaner.clean(
+                                       this.keyedSchema, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               } else {
+                       throw new IllegalArgumentException(
+                                       "You must provide either a 
KafkaSerializationSchema or a" +
+                                                       
"KeyedSerializationSchema.");
+               }
+
                this.producerConfig = checkNotNull(producerConfig, 
"producerConfig is null");
-               this.flinkKafkaPartitioner = checkNotNull(customPartitioner, 
"customPartitioner is null").orElse(null);
                this.semantic = checkNotNull(semantic, "semantic is null");
                this.kafkaProducersPoolSize = kafkaProducersPoolSize;
                checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize 
must be non empty");
 
-               ClosureCleaner.clean(this.flinkKafkaPartitioner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-               ClosureCleaner.ensureSerializable(serializationSchema);
-
                // set the producer configuration properties for kafka record 
key value serializers.
                if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
                        
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
@@ -539,7 +655,7 @@ public class FlinkKafkaProducer<IN>
                                transactionTimeout = ((Number) 
object).longValue();
                        } else {
                                throw new 
IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
-                                       + " must be numeric, was " + object);
+                                               + " must be numeric, was " + 
object);
                        }
                        super.setTransactionTimeout(transactionTimeout);
                        super.enableTransactionTimeoutWarnings(0.8);
@@ -625,34 +741,39 @@ public class FlinkKafkaProducer<IN>
        public void invoke(FlinkKafkaProducer.KafkaTransactionState 
transaction, IN next, Context context) throws FlinkKafkaException {
                checkErroneous();
 
-               byte[] serializedKey = schema.serializeKey(next);
-               byte[] serializedValue = schema.serializeValue(next);
-               String targetTopic = schema.getTargetTopic(next);
-               if (targetTopic == null) {
-                       targetTopic = defaultTopicId;
-               }
+               ProducerRecord<byte[], byte[]> record;
+               if (keyedSchema != null) {
+                       byte[] serializedKey = keyedSchema.serializeKey(next);
+                       byte[] serializedValue = 
keyedSchema.serializeValue(next);
+                       String targetTopic = keyedSchema.getTargetTopic(next);
+                       if (targetTopic == null) {
+                               targetTopic = defaultTopicId;
+                       }
 
-               Long timestamp = null;
-               if (this.writeTimestampToKafka) {
-                       timestamp = context.timestamp();
-               }
+                       Long timestamp = null;
+                       if (this.writeTimestampToKafka) {
+                               timestamp = context.timestamp();
+                       }
 
-               ProducerRecord<byte[], byte[]> record;
-               int[] partitions = topicPartitionsMap.get(targetTopic);
-               if (null == partitions) {
-                       partitions = getPartitionsByTopic(targetTopic, 
transaction.producer);
-                       topicPartitionsMap.put(targetTopic, partitions);
-               }
-               if (flinkKafkaPartitioner != null) {
-                       record = new ProducerRecord<>(
-                               targetTopic,
-                               flinkKafkaPartitioner.partition(next, 
serializedKey, serializedValue, targetTopic, partitions),
-                               timestamp,
-                               serializedKey,
-                               serializedValue);
+                       int[] partitions = topicPartitionsMap.get(targetTopic);
+                       if (null == partitions) {
+                               partitions = getPartitionsByTopic(targetTopic, 
transaction.producer);
+                               topicPartitionsMap.put(targetTopic, partitions);
+                       }
+                       if (flinkKafkaPartitioner != null) {
+                               record = new ProducerRecord<>(
+                                               targetTopic,
+                                               
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, 
targetTopic, partitions),
+                                               timestamp,
+                                               serializedKey,
+                                               serializedValue);
+                       } else {
+                               record = new ProducerRecord<>(targetTopic, 
null, timestamp, serializedKey, serializedValue);
+                       }
                } else {
-                       record = new ProducerRecord<>(targetTopic, null, 
timestamp, serializedKey, serializedValue);
+                       record = kafkaSchema.serialize(next, 
context.timestamp());
                }
+
                pendingRecords.incrementAndGet();
                transaction.producer.send(record, callback);
        }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 8729319..d79d509 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -122,8 +122,13 @@ public class KafkaITCase extends KafkaConsumerTestBase {
        }
 
        @Test(timeout = 60000)
-       public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
+       public void testMultipleTopicsWithLegacySerializer() throws Exception {
+               runProduceConsumeMultipleTopics(true);
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+               runProduceConsumeMultipleTopics(false);
        }
 
        @Test(timeout = 60000)
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 710c753..d5c955e 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -21,7 +21,6 @@ import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -274,6 +273,15 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KafkaSerializationSchema<T> serSchema, Properties props) {
+               return stream.addSink(new FlinkKafkaProducer<T>(
+                               topic,
+                               serSchema,
+                               props,
+                               producerSemantic));
+       }
+
+       @Override
        public KafkaOffsetHandler createOffsetHandler() {
                return new KafkaOffsetHandlerImpl();
        }

Reply via email to