Repository: flink
Updated Branches:
  refs/heads/release-1.3 168378d98 -> 6714f4a39


[FLINK-7764] [kafka] Enable the operator settings for FlinkKafkaProducer010

[hotfix] [kafka] Fix the config parameter names in KafkaTestBase.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6714f4a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6714f4a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6714f4a3

Branch: refs/heads/release-1.3
Commit: 6714f4a3917bc1fd29529c34115011c71f0686e3
Parents: 168378d
Author: Xingcan Cui <[email protected]>
Authored: Wed Oct 11 23:42:29 2017 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Nov 2 16:29:12 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 63 ++++++++++++++++++--
 1 file changed, 57 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6714f4a3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 7909ba6..1019c09 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -31,7 +31,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -135,8 +138,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
 
                GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
                FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
-               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
+               SingleOutputStreamOperator<Object> streamOperator = 
inStream.transform
+                               ("FlinkKafkaProducer 0.10.x", objectTypeInfo, 
kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<>(streamOperator, 
kafkaProducer);
        }
 
        // ---------------------- Regular constructors w/o timestamp support  
------------------
@@ -255,8 +259,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
                GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
                FlinkKafkaProducer010<T> kafkaProducer =
                                new FlinkKafkaProducer010<>(topicId, 
serializationSchema, producerConfig, new 
FlinkKafkaDelegatePartitioner<>(customPartitioner));
-               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
+               SingleOutputStreamOperator<Object> streamOperator = 
inStream.transform
+                               ("FlinkKafkaProducer 0.10.x", objectTypeInfo, 
kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<>(streamOperator, 
kafkaProducer);
        }
 
        /**
@@ -440,12 +445,14 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
 
                private final FlinkKafkaProducerBase wrappedProducerBase;
                private final FlinkKafkaProducer010 producer;
+               private final StreamTransformation transformation;
 
                private FlinkKafkaProducer010Configuration(DataStream stream, 
FlinkKafkaProducer010<T> producer) {
                        //noinspection unchecked
                        super(stream, producer);
-                       this.producer = producer;
                        this.wrappedProducerBase = (FlinkKafkaProducerBase) 
producer.userFunction;
+                       this.producer = producer;
+                       this.transformation = stream.getTransformation();
                }
 
                /**
@@ -480,7 +487,51 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
                public void setWriteTimestampToKafka(boolean 
writeTimestampToKafka) {
                        this.producer.writeTimestampToKafka = 
writeTimestampToKafka;
                }
-       }
 
+               // 
*************************************************************************
+               //  Override methods to use the transformation in this class.
+               // 
*************************************************************************
+
+               @Override
+               public SinkTransformation<T> getTransformation() {
+                       throw new UnsupportedOperationException("The 
SinkTransformation is not accessible " +
+                                       "from " + 
this.getClass().getSimpleName());
+               }
+
+               @Override
+               public DataStreamSink<T> name(String name) {
+                       transformation.setName(name);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> uid(String uid) {
+                       transformation.setUid(uid);
+                       return this;
+               }
 
+               @Override
+               public DataStreamSink<T> setUidHash(String uidHash) {
+                       transformation.setUidHash(uidHash);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> setParallelism(int parallelism) {
+                       transformation.setParallelism(parallelism);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> disableChaining() {
+                       
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
+                       return this;
+               }
+
+               @Override
+               public DataStreamSink<T> slotSharingGroup(String 
slotSharingGroup) {
+                       transformation.setSlotSharingGroup(slotSharingGroup);
+                       return this;
+               }
+       }
 }

Reply via email to