rmetzger closed pull request #2705: [FLINK-2597][FLINK-4050] Add wrappers for 
Kafka serializers, test for partitioner and documentation
URL: https://github.com/apache/flink/pull/2705
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 9a360d47bfb..32f08d353fe 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -279,7 +279,7 @@ FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, 
"my-topic", new SimpleS
 </div>
 </div>
 
-You can also define a custom Kafka producer configuration for the KafkaSink 
with the constructor. Please refer to
+You can also define a custom Kafka producer configuration for the 
FlinkKafkaProducer with the constructor. Please refer to
 the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) 
for details on how to configure
 Kafka Producers.
 
@@ -289,6 +289,11 @@ one producer instance can send data to multiple topics.
 
 The interface of the serialization schema is called `KeyedSerializationSchema`.
 
+The last argument of the `FlinkKafkaProducer` allows passing a custom 
partitioner, extending Flink's `KafkaPartitioner`
+class. The partitioner provides some Flink specific data, like the number of 
parallel sources, and the source id.
+
+The producer supports Kafka's `Partitioner` interface as well. To use it, pass 
`null` as the partitioner to the
+constructor and set the `partitioner.class` property, pointing to your 
implementation.
 
 **Note**: By default, the number of retries is set to "0". This means that the 
producer fails immediately on errors,
 including leader changes. The value is set to "0" by default to avoid 
duplicate messages in the target topic.
@@ -330,3 +335,74 @@ config.setWriteTimestampToKafka(true);
 {% endhighlight %}
 
 
+### Using Flink's Kafka connector with Kafka serializers
+
+Flink uses its own interfaces for specifying serializers for the Kafka 
connector. However, we also provide a wrapper to use
+Kafka serializers with Flink.
+
+The wrappers are called `KafkaSerializerWrapper` and 
`KafkaDeserializerWrapper`. The following example shows how to use Confluent's
+`KafkaAvroSerializer` with the schema registry.
+
+For using the Avro serializers in a Maven project, we first need to add 
Confluent's repository and the serializer dependency.
+
+{% highlight xml %}
+
+<!-- Add repository, because the packages are not available in Maven central 
-->
+<repositories>
+    <repository>
+        <id>confluent</id>
+        <url>http://packages.confluent.io/maven</url>
+    </repository>
+</repositories>
+       
+<dependency>
+    <groupId>io.confluent</groupId>
+    <artifactId>kafka-avro-serializer</artifactId>
+    <version>3.0.0</version>
+    <scope>test</scope>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+{% endhighlight %}
+
+
+**The serializer for the `FlinkKafkaProducer` is specified like this**:
+
+{% highlight java %}
+Map<String, String> config = null;
+KeyedSerializationSchema<Tuple2<Void, GenericRecord>> serSchema = new 
KafkaSerializerWrapper<>(KafkaAvroSerializer.class, KafkaAvroSerializer.class, 
config);
+FlinkKafkaProducer09<Tuple2<Void, GenericRecord>> producer = new 
FlinkKafkaProducer09<>("topic", serSchema, new Properties());
+{% endhighlight %}
+
+The first two arguments are the Kafka serializer classes for the keys and 
values in the topic. The last argument `config` allows
+you to pass configuration values to the serializers. The `KafkaAvroSerializer` 
in this example uses these configs for
+the schema registry URL.
+
+The wrapper is internally calling Kafka's `Serializer.serialize(topic, 
message)` method, which expects the topic as an argument as well.
+Since Flink's serialize*() methods don't provide the topic, there is a special 
configuration key to 
+set the topic for the serializer through this wrapper. The configuration key 
is stored in the `KafkaSerializerWrapper.SERIALIZER_TOPIC` constant.
+
+**The deserializer for the `FlinkKafkaConsumer` is specified like this**:
+
+{% highlight java %}
+Map<String, String> config = null;
+KafkaDeserializerWrapper<Void, GenericRecord> kvDeSer = new 
KafkaDeserializerWrapper<>(KafkaAvroDeserializer.class, 
KafkaAvroDeserializer.class, Void.class, GenericRecord.class, config);
+FlinkKafkaConsumer09<Tuple2<Void, GenericRecord>> consumer = new 
FlinkKafkaConsumer09<>("topic", kvDeSer, new Properties());
+{% endhighlight %}
+
+Similar to the serializer, the deserializer takes the two Kafka deserializers 
for the key and values. The deserializer wrapper also needs to know the target 
types
+the deserializer is creating.
+
+
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index 8108afc1a22..644eeef6c13 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -40,6 +40,13 @@ under the License.
                <kafka.version>0.10.0.1</kafka.version>
        </properties>
 
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>http://packages.confluent.io/maven</url>
+               </repository>
+       </repositories>
+
        <dependencies>
 
                <!-- core dependencies -->
@@ -146,6 +153,26 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-avro-serializer</artifactId>
+                       <version>3.0.0</version>
+            <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       <artifactId>kafka-clients</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-log4j12</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
 
        </dependencies>
 
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 08511c9a9ce..00ab6b8fe19 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -131,6 +131,13 @@ public void testMetricsAndEndOfStream() throws Exception {
                runEndOfStreamTest();
        }
 
+       @Test(timeout = 60000)
+       public void testKafkaAvroSerializerTest() throws Exception {
+               runKafkaAvroSerializerTest();
+       }
+
+
+
        // --- offset committing ---
 
        @Test(timeout = 60000)
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index 42b96828403..1b40b572dd0 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -19,8 +19,34 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 
 @SuppressWarnings("serial")
 public class Kafka010ProducerITCase extends KafkaProducerTestBase {
@@ -30,4 +56,158 @@ public void testCustomPartitioning() {
                runCustomPartitioningTest();
        }
 
+       /**
+        * Test that the Kafka producer supports a custom Kafka partitioner
+        */
+       @Test
+       public void testKafkaPartitioner() throws Exception {
+               Properties props = new Properties();
+               
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
+               props.putAll(secureProps);
+               props.setProperty("partitioner.class", 
"org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase$CustomKafkaPartitioner");
+               FlinkKafkaProducer010<String> producer = new 
FlinkKafkaProducer010<>("topic", new SimpleStringSchema(), props, /* pass no 
partitioner*/ null);
+               producer.setRuntimeContext(new MockRuntimeContext());
+               producer.open(new Configuration());
+               try {
+                       producer.invoke("abc");
+               } catch(RuntimeException re) {
+                       if(!re.getMessage().equals("Success")) {
+                               throw re;
+                       }
+               }
+               producer.close();
+       }
+
+       public static class CustomKafkaPartitioner implements Partitioner {
+               boolean configureCalled = false;
+               @Override
+               public int partition(String topic, Object key, byte[] keyBytes, 
Object value, byte[] valueBytes, Cluster cluster) {
+                       if(!configureCalled) {
+                               throw new IllegalArgumentException("Configure 
has not been called");
+                       }
+                       throw new RuntimeException("Success");
+               }
+
+               @Override
+               public void close() {
+
+               }
+
+               @Override
+               public void configure(Map<String, ?> configs) {
+                       configureCalled = true;
+               }
+       }
+
+       private static class MockRuntimeContext implements RuntimeContext {
+               @Override
+               public String getTaskName() {
+                       return null;
+               }
+
+               @Override
+               public MetricGroup getMetricGroup() {
+                       return new UnregisteredTaskMetricsGroup();
+               }
+
+               @Override
+               public int getNumberOfParallelSubtasks() {
+                       return 2;
+               }
+
+               @Override
+               public int getIndexOfThisSubtask() {
+                       return 0;
+               }
+
+               @Override
+               public int getAttemptNumber() {
+                       return 0;
+               }
+
+               @Override
+               public String getTaskNameWithSubtasks() {
+                       return null;
+               }
+
+               @Override
+               public ExecutionConfig getExecutionConfig() {
+                       return null;
+               }
+
+               @Override
+               public ClassLoader getUserCodeClassLoader() {
+                       return null;
+               }
+
+               @Override
+               public <V, A extends Serializable> void addAccumulator(String 
name, Accumulator<V, A> accumulator) {
+
+               }
+
+               @Override
+               public <V, A extends Serializable> Accumulator<V, A> 
getAccumulator(String name) {
+                       return null;
+               }
+
+               @Override
+               public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+                       return null;
+               }
+
+               @Override
+               public IntCounter getIntCounter(String name) {
+                       return null;
+               }
+
+               @Override
+               public LongCounter getLongCounter(String name) {
+                       return null;
+               }
+
+               @Override
+               public DoubleCounter getDoubleCounter(String name) {
+                       return null;
+               }
+
+               @Override
+               public Histogram getHistogram(String name) {
+                       return null;
+               }
+
+               @Override
+               public boolean hasBroadcastVariable(String name) {
+                       return false;
+               }
+
+               @Override
+               public <RT> List<RT> getBroadcastVariable(String name) {
+                       return null;
+               }
+
+               @Override
+               public <T, C> C getBroadcastVariableWithInitializer(String 
name, BroadcastVariableInitializer<T, C> initializer) {
+                       return null;
+               }
+
+               @Override
+               public DistributedCache getDistributedCache() {
+                       return null;
+               }
+
+               @Override
+               public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
+                       return null;
+               }
+
+               @Override
+               public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
+                       return null;
+               }
+
+               @Override
+               public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
+                       return null;
+               }
+       }
 }
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index f17f9aea612..41ec8b741d9 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -40,6 +40,14 @@ under the License.
                <kafka.version>0.8.2.2</kafka.version>
        </properties>
 
+
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>http://packages.confluent.io/maven</url>
+               </repository>
+       </repositories>
+
        <dependencies>
 
                <!-- core dependencies -->
@@ -173,6 +181,27 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-avro-serializer</artifactId>
+                       <version>3.0.0</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       <artifactId>kafka-clients</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-log4j12</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
        </dependencies>
 
 
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index fabb0fec75a..d6ddb0aaff3 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -245,4 +245,9 @@ public void testEndOfStream() throws Exception {
        public void testMetrics() throws Throwable {
                runMetricsTest();
        }
+
+       @Test(timeout = 60000)
+       public void testKafkaAvroSerializerTest() throws Exception {
+               runKafkaAvroSerializerTest();
+       }
 }
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index f638c7aca2b..45561c3ee4c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -40,6 +40,13 @@ under the License.
                <kafka.version>0.9.0.1</kafka.version>
        </properties>
 
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>http://packages.confluent.io/maven</url>
+               </repository>
+       </repositories>
+
        <dependencies>
 
                <!-- core dependencies -->
@@ -149,6 +156,27 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-avro-serializer</artifactId>
+                       <version>3.0.0</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       <artifactId>kafka-clients</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-log4j12</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
        </dependencies>
 
        <build>
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index d18e2a9a563..793a954f26b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -110,6 +110,11 @@ public void testMetrics() throws Throwable {
                runMetricsTest();
        }
 
+       @Test(timeout = 60000)
+       public void testKafkaAvroSerializerTest() throws Exception {
+               runKafkaAvroSerializerTest();
+       }
+
        // --- offset committing ---
 
        @Test(timeout = 60000)
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 1802e0c9b78..dee9f2c4036 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -75,6 +75,7 @@
        // 6 seconds is default. Seems to be too small for travis. 30 seconds
        private String zkTimeout = "30000";
 
+       @Override
        public String getBrokerConnectionString() {
                return brokerConnectionString;
        }
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
index ef71bde9d74..abb145e0028 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
@@ -40,6 +40,13 @@ under the License.
                <kafka.version>0.8.2.2</kafka.version>
        </properties>
 
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>http://packages.confluent.io/maven</url>
+               </repository>
+       </repositories>
+
        <dependencies>
 
                <!-- core dependencies -->
@@ -169,6 +176,27 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-avro-serializer</artifactId>
+                       <version>3.0.0</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       <artifactId>*</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-log4j12</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
        </dependencies>
 
        <dependencyManagement>
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KafkaDeserializerWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KafkaDeserializerWrapper.java
new file mode 100644
index 00000000000..a96f245ac29
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KafkaDeserializerWrapper.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Wrapper for using a {@see 
org.apache.kafka.common.serialization.Deserializer} with Apache Flink.
+ */
+public class KafkaDeserializerWrapper<K, V> implements 
KeyedDeserializationSchema<Tuple2<K,V>> {
+
+       private final Class<Deserializer<K>> keyDeserializerClass;
+       private final Class<Deserializer<V>> valueDeserializerClass;
+       private final TupleTypeInfo<Tuple2<K,V>> producedType;
+       private Map<String, ?> configs;
+       private boolean isInitialized = false;
+       private Deserializer<K> keyDeserializer;
+       private Deserializer<V> valueDeserializer;
+
+       @SuppressWarnings("unchecked")
+       public KafkaDeserializerWrapper(Class keyDeserializerClass, Class 
valueDeserializerClass, Class<K> keyType, Class<V> valueType) {
+               InstantiationUtil.checkForInstantiation(keyDeserializerClass);
+               InstantiationUtil.checkForInstantiation(valueDeserializerClass);
+               if(!Deserializer.class.isAssignableFrom(keyDeserializerClass)) {
+                       throw new IllegalArgumentException("Key serializer is 
not implementing the org.apache.kafka.common.serialization.Deserializer 
interface");
+               }
+               
if(!Deserializer.class.isAssignableFrom(valueDeserializerClass)) {
+                       throw new IllegalArgumentException("Value serializer is 
not implementing the org.apache.kafka.common.serialization.Deserializer 
interface");
+               }
+
+               this.keyDeserializerClass = 
Objects.requireNonNull(keyDeserializerClass, "Key deserializer is null");
+               this.valueDeserializerClass = 
Objects.requireNonNull(valueDeserializerClass, "Value deserializer is null");
+               this.producedType = new 
TupleTypeInfo<>(TypeExtractor.getForClass(keyType), 
TypeExtractor.getForClass(valueType));
+       }
+
+       public KafkaDeserializerWrapper(Class keyDeserializerClass, Class 
valueDeserializerClass, Class<K> keyType, Class<V> valueType, Map<String, ?> 
configs) {
+               this(keyDeserializerClass, valueDeserializerClass, keyType, 
valueType);
+               this.configs = configs;
+       }
+
+       // ------- Methods for deserialization -------
+
+       @Override
+       public TypeInformation<Tuple2<K,V>> getProducedType() {
+               return this.producedType;
+       }
+
+       @Override
+       public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
+               if(!isInitialized) {
+                       keyDeserializer = 
InstantiationUtil.instantiate(keyDeserializerClass, Deserializer.class);
+                       keyDeserializer.configure(configs, true);
+                       valueDeserializer = 
InstantiationUtil.instantiate(valueDeserializerClass, Deserializer.class);
+                       valueDeserializer.configure(configs, false);
+                       isInitialized = true;
+               }
+               K key = keyDeserializer.deserialize(topic, messageKey);
+               V value = valueDeserializer.deserialize(topic, message);
+               return Tuple2.of(key, value);
+       }
+
+
+       @Override
+       public boolean isEndOfStream(Tuple2<K, V> nextElement) {
+               return false;
+       }
+}
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KafkaSerializerWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KafkaSerializerWrapper.java
new file mode 100644
index 00000000000..85cec2f9ba1
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KafkaSerializerWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Wrapper for using a {@see org.apache.kafka.common.serialization.Serializer} 
with Apache Flink.
+ */
+public class KafkaSerializerWrapper<K, V> implements 
KeyedSerializationSchema<Tuple2<K,V>> {
+
+       private final Class<Serializer<K>> keySerializerClass;
+       private final Class<Serializer<V>> valueSerializerClass;
+       private Map<String, ?> configs;
+       private boolean isInitialized = false;
+       private Serializer<K> keySerializer;
+       private Serializer<V> valueSerializer;
+
+       // Kafka's Serializer.serialize(topic, message) method expects a 
topic-argument.
+       // Flink's serialize*() methods don't provide the topic.
+       // Some Kafka serializers expect the topic to be set. Therefore, there 
is a special
+       // configuration key to set the topic for the serializer through this 
wrapper.
+       private String topic = null;
+       public final static String SERIALIZER_TOPIC = 
"__FLINK_SERIALIZER_TOPIC";
+
+       @SuppressWarnings("unchecked")
+       public KafkaSerializerWrapper(Class keySerializerClass, Class 
valueSerializerClass) {
+               InstantiationUtil.checkForInstantiation(keySerializerClass);
+               InstantiationUtil.checkForInstantiation(valueSerializerClass);
+               if(!Serializer.class.isAssignableFrom(keySerializerClass)) {
+                       throw new IllegalArgumentException("Key serializer is 
not implementing the org.apache.kafka.common.serialization.Serializer 
interface");
+               }
+               if(!Serializer.class.isAssignableFrom(valueSerializerClass)) {
+                       throw new IllegalArgumentException("Value serializer is 
not implementing the org.apache.kafka.common.serialization.Serializer 
interface");
+               }
+
+               this.keySerializerClass = 
Objects.requireNonNull(keySerializerClass, "Key serializer is null");
+               this.valueSerializerClass = 
Objects.requireNonNull(valueSerializerClass, "Value serializer is null");
+       }
+
+       public KafkaSerializerWrapper(Class keySerializerClass, Class 
valueSerializerClass, Map<String, ?> configs) {
+               this(keySerializerClass, valueSerializerClass);
+               this.configs = configs;
+               Object topic = configs.get(SERIALIZER_TOPIC);
+               if(topic != null) {
+                       if(!(topic instanceof String)) {
+                               throw new IllegalArgumentException("The 
provided topic is not of type String");
+                       }
+                       this.topic = (String) topic;
+               }
+       }
+
+       private void initialize() {
+               keySerializer = 
InstantiationUtil.instantiate(keySerializerClass, Serializer.class);
+               keySerializer.configure(configs, true);
+               valueSerializer = 
InstantiationUtil.instantiate(valueSerializerClass, Serializer.class);
+               valueSerializer.configure(configs, false);
+               isInitialized = true;
+       }
+
+       @Override
+       public byte[] serializeKey(Tuple2<K, V> element) {
+               if(!isInitialized) {
+                       initialize();
+               }
+               return keySerializer.serialize(this.topic, element.f0);
+       }
+
+       @Override
+       public byte[] serializeValue(Tuple2<K, V> element) {
+               if(!isInitialized) {
+                       initialize();
+               }
+               return valueSerializer.serialize(this.topic, element.f1);
+       }
+
+       @Override
+       public String getTargetTopic(Tuple2<K, V> element) {
+               return null;
+       }
+}
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7b06cfd9593..94a5799f6e9 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -25,9 +30,11 @@
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import kafka.server.KafkaServer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -53,19 +60,14 @@
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -73,8 +75,9 @@
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KafkaDeserializerWrapper;
+import org.apache.flink.streaming.util.serialization.KafkaSerializerWrapper;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -88,6 +91,7 @@
 import org.apache.flink.util.Collector;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.sling.commons.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -96,7 +100,9 @@
 import javax.management.ObjectName;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
@@ -1428,11 +1434,127 @@ public void flatMap(Tuple2<Integer, Integer> value, 
Collector<Void> out) throws
                        }
                });
 
-               JobExecutionResult result = tryExecute(env1, "Consume " + 
ELEMENT_COUNT + " elements from Kafka");
+               tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from 
Kafka");
 
                deleteTestTopic(topic);
        }
 
+
+       private static class SchemaRegistryFakeHandler implements HttpHandler {
+               static int n = 0;
+               @Override
+               public void handle(HttpExchange t) throws IOException {
+                       String response = "{\"id\":"+(n++)+"}";
+                       
if(t.getRequestURI().toString().equals("/schemas/ids/0")) {
+                               String userSchema = "{\"type\":\"record\"," +
+                                               "\"name\":\"myrecord\"," +
+                                               
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"}]}";
+                               response = "{\"schema\": 
"+JSONObject.quote(userSchema)+"}";
+
+                       }
+
+                       LOG.info("Received request. Method {}, URI {}. Res: 
{}", t.getRequestMethod(), t.getRequestURI(), response);
+
+                       t.sendResponseHeaders(200, response.length());
+                       OutputStream os = t.getResponseBody();
+                       os.write(response.getBytes());
+                       os.close();
+               }
+       }
+
+
+       /**
+        * Test Flink's Kafka consumer / producer with a Kafka serializer and 
partitioner.
+        *
+        */
+       public void runKafkaAvroSerializerTest() throws Exception {
+
+               final String topic = "kafkaAvro";
+
+               // start a web server faking a registry server
+               InetSocketAddress port = new InetSocketAddress(0);
+               HttpServer server = HttpServer.create(port, 0);
+               server.createContext("/", new SchemaRegistryFakeHandler());
+               server.setExecutor(null); // creates a default executor
+               server.start();
+
+               createTestTopic(topic, 2, 1);
+
+               // write Avro data
+               final StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env1.setParallelism(1);
+               
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env1.getConfig().disableSysoutLogging();
+               env1.getConfig().enableObjectReuse(); // otherwise, the schema 
registry cache is not working
+
+               Properties props = new Properties();
+               props.putAll(standardProps);
+               props.putAll(secureProps);
+               props.setProperty("schema.registry.url", 
"http://localhost:"+server.getAddress().getPort());
+               props.setProperty(KafkaSerializerWrapper.SERIALIZER_TOPIC, 
topic);
+
+               DataStream<Tuple2<Void, GenericRecord>> finiteAvroStream = 
env1.addSource(new SourceFunction<GenericRecord>() {
+                       @Override
+                       public void run(SourceContext<GenericRecord> ctx) 
throws Exception {
+                               // This code has been taken from
+                               // 
https://github.com/confluentinc/schema-registry/blob/master/docs/serializer-formatter.rst
+                               // (ASL 2.0 license)
+                               String userSchema = "{\"type\":\"record\"," +
+                                               "\"name\":\"myrecord\"," +
+                                               
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"}]}";
+                               Schema.Parser parser = new Schema.Parser();
+                               Schema schema = parser.parse(userSchema);
+                               int i = 0;
+                               while(i++ < 2000) {
+                                       GenericRecord element = new 
GenericData.Record(schema);
+                                       element.put("id", i);
+                                       element.put("f1", 
UUID.randomUUID().toString());
+                                       ctx.collect(element);
+                               }
+                               ctx.close();
+                       }
+
+                       @Override
+                       public void cancel() {
+                               // ign
+                       }
+               }).map(new MapFunction<GenericRecord, Tuple2<Void, 
GenericRecord>>() {
+                       @Override
+                       public Tuple2<Void, GenericRecord> map(GenericRecord 
value) throws Exception {
+                               return Tuple2.of(null, value);
+                       }
+               });
+
+               KeyedSerializationSchema<Tuple2<Void, GenericRecord>> serSchema 
= new KafkaSerializerWrapper<>(KafkaAvroSerializer.class, 
KafkaAvroSerializer.class, (Map)props);
+               kafkaServer.produceIntoKafka(finiteAvroStream, topic, 
serSchema, props, null);
+
+               env1.execute("Produce Avro records into Kafka");
+
+               // read Avro data
+               KafkaDeserializerWrapper<Void, GenericRecord> kvSer = new 
KafkaDeserializerWrapper<>(KafkaAvroDeserializer.class, 
KafkaAvroDeserializer.class, Void.class, GenericRecord.class, (Map)props);
+               DataStream<Tuple2<Void, GenericRecord>> fromKafka = 
env1.addSource(kafkaServer.getConsumer(topic, kvSer, props));
+
+               // validate data
+               fromKafka.flatMap(new FlatMapFunction<Tuple2<Void, 
GenericRecord>, Object>() {
+                       int count = 0;
+                       @Override
+                       public void flatMap(Tuple2<Void, GenericRecord> value, 
Collector<Object> out) throws Exception {
+                               int id = (int)value.f1.get("id");
+                               if(id < 1 || id > 2000) {
+                                       throw new RuntimeException("Id out of 
range");
+                               }
+                               if(++count == 2000) {
+                                       throw new SuccessException();
+                               }
+                       }
+               });
+
+               tryExecute(env1, "Consume Avro records from Kafka");
+
+               server.stop(0);
+               deleteTestTopic(topic);
+       }
+
        /**
         * Test metrics reporting for consumer
         *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to