Repository: flink Updated Branches: refs/heads/master 8198967ea -> 786a6cbb3
[FLINK-7764][kafka]FlinkKafkaProducer010 does not accept name, uid, or parallelism Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/786a6cbb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/786a6cbb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/786a6cbb Branch: refs/heads/master Commit: 786a6cbb37d34ca918d178c36118dd2e142eacda Parents: 8198967 Author: Xingcan Cui <[email protected]> Authored: Tue Oct 10 21:15:02 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Nov 2 16:25:41 2017 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer010.java | 57 ++++++++++++++++++-- 1 file changed, 54 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/786a6cbb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index 3b43a7e..8575268 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; @@ -320,18 +322,23 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> { * Configuration object returned by the writeToKafkaWithTimestamps() call. * * <p>This is only kept because it's part of the public API. It is not necessary anymore, now - * that the {@link SinkFunction} interface provides timestamps. + * that the {@link SinkFunction} interface provides timestamps.</p> + * + * <p>To enable the settings, this fake sink must override all the public methods + * in {@link DataStreamSink}.</p> */ public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> { private final FlinkKafkaProducer010 producer; + private final SinkTransformation<T> transformation; private FlinkKafkaProducer010Configuration( - DataStreamSink originalSink, + DataStreamSink<T> originalSink, DataStream<T> inputStream, FlinkKafkaProducer010<T> producer) { //noinspection unchecked super(inputStream, originalSink.getTransformation().getOperator()); + this.transformation = originalSink.getTransformation(); this.producer = producer; } @@ -367,6 +374,50 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> { public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { producer.writeTimestampToKafka = writeTimestampToKafka; } - } + // ************************************************************************* + // Override methods to use the transformation in this class. + // ************************************************************************* + + @Override + public SinkTransformation<T> getTransformation() { + return transformation; + } + + @Override + public DataStreamSink<T> name(String name) { + transformation.setName(name); + return this; + } + + @Override + public DataStreamSink<T> uid(String uid) { + transformation.setUid(uid); + return this; + } + + @Override + public DataStreamSink<T> setUidHash(String uidHash) { + transformation.setUidHash(uidHash); + return this; + } + + @Override + public DataStreamSink<T> setParallelism(int parallelism) { + transformation.setParallelism(parallelism); + return this; + } + + @Override + public DataStreamSink<T> disableChaining() { + this.transformation.setChainingStrategy(ChainingStrategy.NEVER); + return this; + } + + @Override + public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) { + transformation.setSlotSharingGroup(slotSharingGroup); + return this; + } + } }
