Repository: flink
Updated Branches:
  refs/heads/master 8198967ea -> 786a6cbb3


[FLINK-7764][kafka]FlinkKafkaProducer010 does not accept name, uid, or 
parallelism


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/786a6cbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/786a6cbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/786a6cbb

Branch: refs/heads/master
Commit: 786a6cbb37d34ca918d178c36118dd2e142eacda
Parents: 8198967
Author: Xingcan Cui <[email protected]>
Authored: Tue Oct 10 21:15:02 2017 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Nov 2 16:25:41 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 57 ++++++++++++++++++--
 1 file changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/786a6cbb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 3b43a7e..8575268 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -320,18 +322,23 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
         * Configuration object returned by the writeToKafkaWithTimestamps() 
call.
         *
         * <p>This is only kept because it's part of the public API. It is not 
necessary anymore, now
-        * that the {@link SinkFunction} interface provides timestamps.
+        * that the {@link SinkFunction} interface provides timestamps.</p>
+        *
+        * <p>To enable the settings, this fake sink must override all the 
public methods
+        * in {@link DataStreamSink}.</p>
         */
        public static class FlinkKafkaProducer010Configuration<T> extends 
DataStreamSink<T> {
 
                private final FlinkKafkaProducer010 producer;
+               private final SinkTransformation<T> transformation;
 
                private FlinkKafkaProducer010Configuration(
-                               DataStreamSink originalSink,
+                               DataStreamSink<T> originalSink,
                                DataStream<T> inputStream,
                                FlinkKafkaProducer010<T> producer) {
                        //noinspection unchecked
                        super(inputStream, 
originalSink.getTransformation().getOperator());
+                       this.transformation = originalSink.getTransformation();
                        this.producer = producer;
                }
 
@@ -367,6 +374,50 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
                public void setWriteTimestampToKafka(boolean 
writeTimestampToKafka) {
                        producer.writeTimestampToKafka = writeTimestampToKafka;
                }
-       }
 
+               // 
*************************************************************************
+               //  Override methods to use the transformation in this class.
+               // 
*************************************************************************
+
+               @Override
+               public SinkTransformation<T> getTransformation() {
+                       return transformation;
+               }
+
+               @Override
+               public DataStreamSink<T> name(String name) {
+                       transformation.setName(name);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> uid(String uid) {
+                       transformation.setUid(uid);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> setUidHash(String uidHash) {
+                       transformation.setUidHash(uidHash);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> setParallelism(int parallelism) {
+                       transformation.setParallelism(parallelism);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> disableChaining() {
+                       
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> slotSharingGroup(String 
slotSharingGroup) {
+                       transformation.setSlotSharingGroup(slotSharingGroup);
+                       return this;
+               }
+       }
 }

Reply via email to