[hotfix] Don't use deprecated writeWithTimestamps in Kafka 0.10 tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a3621b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a3621b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a3621b8 Branch: refs/heads/master Commit: 9a3621b842d2bf6b76e394f1412dd27475180ac2 Parents: 08bfdae Author: Aljoscha Krettek <[email protected]> Authored: Thu Sep 28 14:53:24 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 9 18:58:36 2017 +0200 ---------------------------------------------------------------------- .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9a3621b8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 5a5caad..d0e935b 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -168,7 +168,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) { - return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props); + FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props); + prod.setFlushOnCheckpoint(true); + prod.setWriteTimestampToKafka(true); + return stream.addSink(prod); } @Override
