This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d43e6b82ea16f49aee5006de8d63212af7fea8fc Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Fri Feb 22 15:57:07 2019 +0100 [FLINK-11693] Add KafkaSerializationSchema that uses ProducerRecord --- .../streaming/connectors/kafka/Kafka010ITCase.java | 2 +- .../streaming/connectors/kafka/Kafka011ITCase.java | 2 +- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 1 - .../streaming/connectors/kafka/Kafka08ITCase.java | 2 +- .../streaming/connectors/kafka/Kafka09ITCase.java | 2 +- .../connectors/kafka/Kafka09SecuredRunITCase.java | 2 +- .../connectors/kafka/KafkaSerializationSchema.java | 45 +++++ .../serialization/KeyedSerializationSchema.java | 4 + .../connectors/kafka/KafkaConsumerTestBase.java | 63 ++++++- .../connectors/kafka/KafkaTestEnvironment.java | 5 + .../connectors/kafka/FlinkKafkaProducer.java | 197 +++++++++++++++++---- .../streaming/connectors/kafka/KafkaITCase.java | 9 +- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 10 +- 13 files changed, 288 insertions(+), 56 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 6c1d1e2..0c8bdbb 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -115,7 +115,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { @Test(timeout = 60000) public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); + runProduceConsumeMultipleTopics(true); } @Test(timeout = 60000) diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java index 3677daa..0a68766 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java @@ -123,7 +123,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase { @Test(timeout = 60000) public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); + runProduceConsumeMultipleTopics(true); } @Test(timeout = 60000) diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 23cd57e..a3982ba 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -21,7 +21,6 @@ import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index e360c19..b97b3e9 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -237,7 +237,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { @Test(timeout = 60000) public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); + runProduceConsumeMultipleTopics(true); } @Test(timeout = 60000) diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index 47d910d..8fdbbe7 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -109,7 +109,7 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { @Test(timeout = 60000) public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); + runProduceConsumeMultipleTopics(true); } @Test(timeout = 60000) diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java index 8cd61cc..f8ce2f5 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java @@ -54,7 +54,7 @@ public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase { //The timeout for the test case is 2 times timeout of ZK connection @Test(timeout = 600000) public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); + runProduceConsumeMultipleTopics(true); } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java new file mode 100644 index 0000000..31fda0e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** + * A {@link KafkaSerializationSchema} defines how to serialize values of type {@code T} into + * {@link ProducerRecord ProducerRecords}. + * + * @param <T> the type of values being serialized + */ +@PublicEvolving +public interface KafkaSerializationSchema<T> extends Serializable { + + /** + * Serializes given element and returns it as a {@link ProducerRecord}. + * + * @param element element to be serialized + * @param timestamp timestamp (can be null) + * @return Kafka {@link ProducerRecord} + */ + ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp); +} diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java index 2f610c2..b254f15 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import java.io.Serializable; @@ -27,7 +28,10 @@ import java.io.Serializable; * to them in a specific format (for example as byte strings). * * @param <T> The type to be serialized. + * + * @deprecated Use {@link KafkaSerializationSchema}. */ +@Deprecated @PublicEvolving public interface KeyedSerializationSchema<T> extends Serializable { diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 8ca15b0..1582922 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -81,11 +81,13 @@ import kafka.server.KafkaServer; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; +import javax.annotation.Nullable; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -1128,7 +1130,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { * Test producing and consuming into multiple topics. * @throws Exception */ - public void runProduceConsumeMultipleTopics() throws Exception { + public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exception { final int numTopics = 5; final int numElements = 20; @@ -1166,12 +1168,17 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { } }); - Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig()); - Properties props = new Properties(); props.putAll(standardProps); props.putAll(secureProps); - kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null); + + if (useLegacySchema) { + Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig()); + kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null); + } else { + TestDeSerializer schema = new TestDeSerializer(env.getConfig()); + kafkaServer.produceIntoKafka(stream, "dummy", schema, props); + } env.execute("Write to topics"); @@ -1179,7 +1186,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); - stream = env.addSource(kafkaServer.getConsumer(topics, schema, props)); + if (useLegacySchema) { + Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig()); + stream = env.addSource(kafkaServer.getConsumer(topics, schema, props)); + } else { + TestDeSerializer schema = new TestDeSerializer(env.getConfig()); + stream = env.addSource(kafkaServer.getConsumer(topics, schema, props)); + } stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() { Map<String, Integer> countPerTopic = new HashMap<>(numTopics); @@ -2194,12 +2207,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { } } - private static class Tuple2WithTopicSchema implements KafkaDeserializationSchema<Tuple3<Integer, Integer, String>>, - KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { + private abstract static class TestDeserializer implements + KafkaDeserializationSchema<Tuple3<Integer, Integer, String>> { - private final TypeSerializer<Tuple2<Integer, Integer>> ts; + protected final TypeSerializer<Tuple2<Integer, Integer>> ts; - public Tuple2WithTopicSchema(ExecutionConfig ec) { + public TestDeserializer(ExecutionConfig ec) { ts = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}).createSerializer(ec); } @@ -2219,6 +2232,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { return TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>(){}); } + } + + private static class Tuple2WithTopicSchema extends TestDeserializer + implements KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { + + public Tuple2WithTopicSchema(ExecutionConfig ec) { + super(ec); + } @Override public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { @@ -2242,4 +2263,28 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { return element.f2; } } + + private static class TestDeSerializer extends TestDeserializer + implements KafkaSerializationSchema<Tuple3<Integer, Integer, String>> { + + public TestDeSerializer(ExecutionConfig ec) { + super(ec); + } + + @Override + public ProducerRecord<byte[], byte[]> serialize( + Tuple3<Integer, Integer, String> element, @Nullable Long timestamp) { + ByteArrayOutputStream by = new ByteArrayOutputStream(); + DataOutputView out = new DataOutputViewStreamWrapper(by); + try { + ts.serialize(new Tuple2<>(element.f0, element.f1), out); + } catch (IOException e) { + throw new RuntimeException("Error" , e); + } + byte[] serializedValue = by.toByteArray(); + + return new ProducerRecord<>(element.f2, serializedValue); + } + + } } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 336372d..3682280 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -154,6 +154,11 @@ public abstract class KafkaTestEnvironment { KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner); + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, + KafkaSerializationSchema<T> serSchema, Properties props) { + throw new RuntimeException("KafkaSerializationSchema is only supported on the modern Kafka Connector."); + } + // -- offset handlers /** diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index c7375eb..a8bff7c 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -216,11 +216,20 @@ public class FlinkKafkaProducer<IN> * (Serializable) SerializationSchema for turning objects used with Flink into. * byte[] for Kafka. */ - private final KeyedSerializationSchema<IN> schema; + @Nullable + private final KeyedSerializationSchema<IN> keyedSchema; + + /** + * (Serializable) serialization schema for serializing records to + * {@link ProducerRecord ProducerRecords}. + */ + @Nullable + private final KafkaSerializationSchema<IN> kafkaSchema; /** * User-provided partitioner for assigning an object to a Kafka partition for each topic. */ + @Nullable private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner; /** @@ -404,9 +413,6 @@ public class FlinkKafkaProducer<IN> * partition (i.e. all records received by a sink subtask will end up in the same * Kafka partition). * - * <p>To use a custom partitioner, please use - * {@link #FlinkKafkaProducer(String, KeyedSerializationSchema, Properties, Optional, FlinkKafkaProducer.Semantic, int)} instead. - * * @param topicId * ID of the Kafka topic. * @param serializationSchema @@ -483,25 +489,135 @@ public class FlinkKafkaProducer<IN> * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ public FlinkKafkaProducer( - String defaultTopicId, - KeyedSerializationSchema<IN> serializationSchema, + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner, + FlinkKafkaProducer.Semantic semantic, + int kafkaProducersPoolSize) { + this( + defaultTopicId, + serializationSchema, + customPartitioner.orElse(null), + null, /* kafka serialization schema */ + producerConfig, + semantic, + kafkaProducersPoolSize); + } + + /** + * Creates a {@link FlinkKafkaProducer} for a given topic. The sink produces its input to + * the topic. It accepts a {@link KafkaSerializationSchema} for serializing records to + * a {@link ProducerRecord}, including partitioning information. + * + * @param defaultTopic The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param semantic Defines semantic that will be used by this producer (see {@link FlinkKafkaProducer.Semantic}). + */ + public FlinkKafkaProducer( + String defaultTopic, + KafkaSerializationSchema<IN> serializationSchema, + Properties producerConfig, + FlinkKafkaProducer.Semantic semantic) { + this( + defaultTopic, + serializationSchema, + producerConfig, + semantic, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. It accepts a {@link KafkaSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. + * + * @param defaultTopic The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param semantic Defines semantic that will be used by this producer (see {@link FlinkKafkaProducer.Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer( + String defaultTopic, + KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, - Optional<FlinkKafkaPartitioner<IN>> customPartitioner, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { + this( + defaultTopic, + null, null, /* keyed schema and FlinkKafkaPartitioner */ + serializationSchema, + producerConfig, + semantic, + kafkaProducersPoolSize); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. It accepts a {@link KafkaSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. + * + * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each + * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not + * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they + * will be distributed to Kafka partitions in a round-robin fashion. + * + * @param defaultTopic The default topic to write data to + * @param keyedSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * If a partitioner is not provided, records will be partitioned by the key of each record + * (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys + * are {@code null}, then records will be distributed to Kafka partitions in a + * round-robin fashion. + * @param kafkaSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param semantic Defines semantic that will be used by this producer (see {@link FlinkKafkaProducer.Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}). + */ + private FlinkKafkaProducer( + String defaultTopic, + KeyedSerializationSchema<IN> keyedSchema, + FlinkKafkaPartitioner<IN> customPartitioner, + KafkaSerializationSchema<IN> kafkaSchema, + Properties producerConfig, + FlinkKafkaProducer.Semantic semantic, + int kafkaProducersPoolSize) { super(new FlinkKafkaProducer.TransactionStateSerializer(), new FlinkKafkaProducer.ContextStateSerializer()); - this.defaultTopicId = checkNotNull(defaultTopicId, "defaultTopicId is null"); - this.schema = checkNotNull(serializationSchema, "serializationSchema is null"); + this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null"); + + if (kafkaSchema != null) { + this.keyedSchema = null; + this.kafkaSchema = kafkaSchema; + this.flinkKafkaPartitioner = null; + ClosureCleaner.clean( + this.kafkaSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + + if (customPartitioner != null) { + throw new IllegalArgumentException("Customer partitioner can only be used when" + + "using a KeyedSerializationSchema or SerializationSchema."); + } + } else if (keyedSchema != null) { + this.kafkaSchema = null; + this.keyedSchema = keyedSchema; + this.flinkKafkaPartitioner = customPartitioner; + ClosureCleaner.clean( + this.flinkKafkaPartitioner, + ExecutionConfig.ClosureCleanerLevel.RECURSIVE, + true); + ClosureCleaner.clean( + this.keyedSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + } else { + throw new IllegalArgumentException( + "You must provide either a KafkaSerializationSchema or a" + + "KeyedSerializationSchema."); + } + this.producerConfig = checkNotNull(producerConfig, "producerConfig is null"); - this.flinkKafkaPartitioner = checkNotNull(customPartitioner, "customPartitioner is null").orElse(null); this.semantic = checkNotNull(semantic, "semantic is null"); this.kafkaProducersPoolSize = kafkaProducersPoolSize; checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty"); - ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - ClosureCleaner.ensureSerializable(serializationSchema); - // set the producer configuration properties for kafka record key value serializers. if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); @@ -539,7 +655,7 @@ public class FlinkKafkaProducer<IN> transactionTimeout = ((Number) object).longValue(); } else { throw new IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG - + " must be numeric, was " + object); + + " must be numeric, was " + object); } super.setTransactionTimeout(transactionTimeout); super.enableTransactionTimeoutWarnings(0.8); @@ -625,34 +741,39 @@ public class FlinkKafkaProducer<IN> public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException { checkErroneous(); - byte[] serializedKey = schema.serializeKey(next); - byte[] serializedValue = schema.serializeValue(next); - String targetTopic = schema.getTargetTopic(next); - if (targetTopic == null) { - targetTopic = defaultTopicId; - } + ProducerRecord<byte[], byte[]> record; + if (keyedSchema != null) { + byte[] serializedKey = keyedSchema.serializeKey(next); + byte[] serializedValue = keyedSchema.serializeValue(next); + String targetTopic = keyedSchema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } - Long timestamp = null; - if (this.writeTimestampToKafka) { - timestamp = context.timestamp(); - } + Long timestamp = null; + if (this.writeTimestampToKafka) { + timestamp = context.timestamp(); + } - ProducerRecord<byte[], byte[]> record; - int[] partitions = topicPartitionsMap.get(targetTopic); - if (null == partitions) { - partitions = getPartitionsByTopic(targetTopic, transaction.producer); - topicPartitionsMap.put(targetTopic, partitions); - } - if (flinkKafkaPartitioner != null) { - record = new ProducerRecord<>( - targetTopic, - flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), - timestamp, - serializedKey, - serializedValue); + int[] partitions = topicPartitionsMap.get(targetTopic); + if (null == partitions) { + partitions = getPartitionsByTopic(targetTopic, transaction.producer); + topicPartitionsMap.put(targetTopic, partitions); + } + if (flinkKafkaPartitioner != null) { + record = new ProducerRecord<>( + targetTopic, + flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), + timestamp, + serializedKey, + serializedValue); + } else { + record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); + } } else { - record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); + record = kafkaSchema.serialize(next, context.timestamp()); } + pendingRecords.incrementAndGet(); transaction.producer.send(record, callback); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 8729319..d79d509 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -122,8 +122,13 @@ public class KafkaITCase extends KafkaConsumerTestBase { } @Test(timeout = 60000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); + public void testMultipleTopicsWithLegacySerializer() throws Exception { + runProduceConsumeMultipleTopics(true); + } + + @Test(timeout = 60000) + public void testMultipleTopicsWithKafkaSerializer() throws Exception { + runProduceConsumeMultipleTopics(false); } @Test(timeout = 60000) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 710c753..d5c955e 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -21,7 +21,6 @@ import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -274,6 +273,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KafkaSerializationSchema<T> serSchema, Properties props) { + return stream.addSink(new FlinkKafkaProducer<T>( + topic, + serSchema, + props, + producerSemantic)); + } + + @Override public KafkaOffsetHandler createOffsetHandler() { return new KafkaOffsetHandlerImpl(); }