[FLINK-4035] Refactor the Kafka 0.10 connector to be based upon the 0.9 connector
Add a test case for Kafka's new timestamp functionality and update the documentation. This closes #2369 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6731ec1e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6731ec1e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6731ec1e Branch: refs/heads/master Commit: 6731ec1e48d0a0092dd2330adda73bcf37fda8d7 Parents: 63859c6 Author: Robert Metzger <rmetz...@apache.org> Authored: Tue Aug 9 16:38:21 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Tue Oct 11 10:04:25 2016 +0200 ---------------------------------------------------------------------- docs/dev/connectors/kafka.md | 67 +++- docs/page/js/flink.js | 3 +- .../flink-connector-kafka-0.10/pom.xml | 50 +-- .../connectors/kafka/FlinkKafkaConsumer010.java | 121 +------ .../connectors/kafka/FlinkKafkaProducer010.java | 315 +++++++++++++++++-- .../kafka/Kafka010JsonTableSource.java | 2 +- .../connectors/kafka/Kafka010TableSource.java | 2 +- .../kafka/internal/Kafka010Fetcher.java | 268 ++-------------- .../connectors/kafka/Kafka010ITCase.java | 266 +++++++++++----- .../connectors/kafka/KafkaProducerTest.java | 119 ------- .../kafka/KafkaTestEnvironmentImpl.java | 80 ++++- .../kafka/internals/SimpleConsumerThread.java | 2 +- .../kafka/KafkaTestEnvironmentImpl.java | 7 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 4 +- .../connectors/kafka/FlinkKafkaProducer09.java | 2 +- .../kafka/internal/Kafka09Fetcher.java | 22 +- .../kafka/KafkaTestEnvironmentImpl.java | 6 +- .../kafka/FlinkKafkaConsumerBase.java | 4 + .../kafka/FlinkKafkaProducerBase.java | 4 +- .../kafka/internals/AbstractFetcher.java | 43 +-- ...picPartitionStateWithPeriodicWatermarks.java | 4 +- ...cPartitionStateWithPunctuatedWatermarks.java | 4 +- .../connectors/kafka/KafkaConsumerTestBase.java | 201 ++++++------ .../connectors/kafka/KafkaProducerTestBase.java | 5 +- .../kafka/KafkaShortRetentionTestBase.java | 4 +- .../connectors/kafka/KafkaTestEnvironment.java | 7 +- .../AbstractFetcherTimestampsTest.java | 68 ++-- .../kafka/testutils/DataGenerators.java | 87 ++--- .../testutils/JobManagerCommunicationUtils.java | 21 +- 29 files changed, 936 insertions(+), 852 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/dev/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index d2221fa..9a360d4 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -46,14 +46,6 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is </thead> <tbody> <tr> - <td>flink-connector-kafka</td> - <td>0.9.1, 0.10</td> - <td>FlinkKafkaConsumer082<br> - FlinkKafkaProducer</td> - <td>0.8.x</td> - <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> - </tr> - <tr> <td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td> <td>1.0.0</td> <td>FlinkKafkaConsumer08<br> @@ -61,7 +53,7 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is <td>0.8.x</td> <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> </tr> - <tr> + <tr> <td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td> <td>1.0.0</td> <td>FlinkKafkaConsumer09<br> @@ -69,6 +61,14 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is <td>0.9.x</td> <td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td> </tr> + <tr> + <td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td> + <td>1.2.0</td> + <td>FlinkKafkaConsumer010<br> + FlinkKafkaProducer010</td> + <td>0.10.x</td> + <td>This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka messages with timestamps</a> both for producing and consuming.</td> + </tr> </tbody> </table> @@ -87,7 +87,6 @@ Note that the streaming connectors are currently not part of the binary distribu ### Installing Apache Kafka * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application). -* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur. * If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address. ### Kafka Consumer @@ -256,17 +255,28 @@ records to partitions. Example: + <div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> +<div data-lang="java, Kafka 0.8+" markdown="1"> {% highlight java %} stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema())); {% endhighlight %} </div> -<div data-lang="scala" markdown="1"> +<div data-lang="java, Kafka 0.10+" markdown="1"> +{% highlight java %} +FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +{% endhighlight %} +</div> +<div data-lang="scala, Kafka 0.8+" markdown="1"> {% highlight scala %} stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema())) {% endhighlight %} </div> +<div data-lang="scala, Kafka 0.10+" markdown="1"> +{% highlight scala %} +FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties); +{% endhighlight %} +</div> </div> You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to @@ -287,3 +297,36 @@ higher value. There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery into a Kafka topic. + +### Using Kafka timestamps and Flink event time in Kafka 0.10 + +Since Apache Kafka 0.10., Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating +the time the event has occurred (see ["event time" in Apache Flink](../event_time.html)) or the time when the message +has been written to the Kafka broker. + +The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if the time characteristic in Flink is +set to `TimeCharacteristic.EventTime` (`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`). + +The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in +"Kafka Consumers and Timestamp Extraction/Watermark Emission" using the `assignTimestampsAndWatermarks` method are applicable. + +There is no need to define a timestamp extractor when using the timestamps from Kafka. The `previousElementTimestamp` argument of +the `extractTimestamp()` method contains the timestamp carried by the Kafka message. + +A timestamp extractor for a Kafka consumer would look like this: +{% highlight java %} +public long extractTimestamp(Long element, long previousElementTimestamp) { + return previousElementTimestamp; +} +{% endhighlight %} + + + +The `FlinkKafkaProducer010` only emits the record timestamp, if `setWriteTimestampToKafka(true)` is set. + +{% highlight java %} +FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps); +config.setWriteTimestampToKafka(true); +{% endhighlight %} + + http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/docs/page/js/flink.js ---------------------------------------------------------------------- diff --git a/docs/page/js/flink.js b/docs/page/js/flink.js index fdf972c..885a8ff 100644 --- a/docs/page/js/flink.js +++ b/docs/page/js/flink.js @@ -42,6 +42,7 @@ function codeTabs() { var image = $(this).data("image"); var notabs = $(this).data("notabs"); var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1); + lang = lang.replace(/[^a-zA-Z0-9]/g, "_"); var id = "tab_" + lang + "_" + counter; $(this).attr("id", id); if (image != null && langImages[lang]) { @@ -99,9 +100,7 @@ function viewSolution() { // A script to fix internal hash links because we have an overlapping top bar. // Based on https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510 function maybeScrollToHash() { - console.log("HERE"); if (window.location.hash && $(window.location.hash).length) { - console.log("HERE2", $(window.location.hash), $(window.location.hash).offset().top); var newTop = $(window.location.hash).offset().top - 57; $(window).scrollTop(newTop); } http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml index f2bcb11..0b426b5 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml @@ -26,7 +26,7 @@ under the License. <parent> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-connectors</artifactId> - <version>1.1-SNAPSHOT</version> + <version>1.2-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -37,7 +37,7 @@ under the License. <!-- Allow users to pass custom connector versions --> <properties> - <kafka.version>0.10.0.0</kafka.version> + <kafka.version>0.10.0.1</kafka.version> </properties> <dependencies> @@ -46,21 +46,16 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> + <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>${project.version}</version> - <scope>provided</scope> </dependency> + <!-- Add Kafka 0.10.x as a dependency --> + <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-base_2.10</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> </dependency> <dependency> @@ -73,20 +68,29 @@ under the License. <optional>true</optional> </dependency> + <!-- test dependencies --> + <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>${kafka.version}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.9_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude Kafka dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> </dependency> - <!-- test dependencies --> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-base_2.10</artifactId> <version>${project.version}</version> <exclusions> - <!-- exclude 0.8 dependencies --> + <!-- exclude Kafka dependencies --> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> @@ -127,6 +131,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index 78ccd4a..267ff57 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -28,20 +28,10 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.util.SerializedValue; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import static org.apache.flink.util.Preconditions.checkNotNull; /** * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from @@ -64,30 +54,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * is constructed. That means that the client that submits the program needs to be able to * reach the Kafka brokers or ZooKeeper.</p> */ -public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> { +public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { private static final long serialVersionUID = 2324564345203409112L; - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer010.class); - - /** Configuration key to change the polling timeout **/ - public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout"; - - /** Boolean configuration key to disable metrics tracking **/ - public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; - - /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not - * available. If 0, returns immediately with any records that are available now. */ - public static final long DEFAULT_POLL_TIMEOUT = 100L; - - // ------------------------------------------------------------------------ - - /** User-supplied properties for Kafka **/ - private final Properties properties; - - /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not - * available. If 0, returns immediately with any records that are available now */ - private final long pollTimeout; // ------------------------------------------------------------------------ @@ -151,51 +121,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> { * The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(deserializer); - - checkNotNull(topics, "topics"); - this.properties = checkNotNull(props, "props"); - setDeserializer(this.properties); - - // configure the polling timeout - try { - if (properties.containsKey(KEY_POLL_TIMEOUT)) { - this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); - } else { - this.pollTimeout = DEFAULT_POLL_TIMEOUT; - } - } - catch (Exception e) { - throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); - } - - // read the partitions that belong to the listed topics - final List<KafkaTopicPartition> partitions = new ArrayList<>(); - - try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) { - for (final String topic: topics) { - // get partitions for each topic - List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic); - // for non existing topics, the list might be null. - if (partitionsForTopic != null) { - partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); - } - } - } - - if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - } - - // we now have a list of partitions which is the same for all parallel consumer instances. - LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics); - - if (LOG.isInfoEnabled()) { - logPartitionInfo(LOG, partitions); - } - - // register these partitions - setSubscribedPartitions(partitions); + super(topics, deserializer, props); } @Override @@ -212,48 +138,5 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> { watermarksPeriodic, watermarksPunctuated, runtimeContext, deserializer, properties, pollTimeout, useMetrics); - - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable) - * - * @param partitions A list of Kafka PartitionInfos. - * @return A list of KafkaTopicPartitions - */ - private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) { - checkNotNull(partitions); - - List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size()); - for (PartitionInfo pi : partitions) { - ret.add(new KafkaTopicPartition(pi.topic(), pi.partition())); - } - return ret; - } - - /** - * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties. - * - * @param props The Kafka properties to register the serializer in. - */ - private static void setDeserializer(Properties props) { - final String deSerName = ByteArrayDeserializer.class.getCanonicalName(); - - Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); - Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); - - if (keyDeSer != null && !keyDeSer.equals(deSerName)) { - LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); - } - if (valDeSer != null && !valDeSer.equals(deSerName)) { - LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); - } - - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index 49bce39..cc0194b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -17,27 +17,123 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + /** - * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8. + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): * - * Please note that this producer does not have any reliability guarantees. + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a second invocation option, approach (b). * - * @param <IN> Type of the messages to write into Kafka. + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. */ -public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> { +public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction { + + /** + * Flag controlling whether we are writing the Flink record's timestamp into Kafka. + */ + private boolean writeTimestampToKafka = false; + + // ---------------------- "Constructors" for timestamp writing ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + */ + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + KeyedSerializationSchema<T> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); + } + + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined (keyless) serialization schema. + * @param producerConfig Properties with the producer configuration. + */ + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + SerializationSchema<T> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + KeyedSerializationSchema<T> serializationSchema, + Properties producerConfig, + KafkaPartitioner<T> customPartitioner) { - private static final long serialVersionUID = 1L; + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); + SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + } - // ------------------- Keyless serialization schema constructors ---------------------- + // ---------------------- Regular constructors w/o timestamp support ------------------ /** * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to @@ -50,8 +146,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema * User defined (keyless) serialization schema. */ - public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>()); } /** @@ -65,8 +161,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> { * @param producerConfig * Properties with the producer configuration. */ - public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); + public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>()); } /** @@ -78,9 +174,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> { * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) */ - public FlinkKafkaProducer010(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - } // ------------------- Key/Value serialization schema constructors ---------------------- @@ -96,8 +191,8 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema * User defined serialization schema supporting key/value messages */ - public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>()); } /** @@ -111,27 +206,193 @@ public class FlinkKafkaProducer010<IN> extends FlinkKafkaProducerBase<IN> { * @param producerConfig * Properties with the producer configuration. */ - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); } /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. + * Create Kafka producer * - * @param topicId The topic to write data to - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) */ - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - super(topicId, serializationSchema, producerConfig, customPartitioner); + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { + // We create a Kafka 09 producer instance here and only "override" (by intercepting) the + // invoke call. + super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); + } + + + // ----------------------------- Generic element processing --------------------------- + + private void invokeInternal(T next, long elementTimestamp) throws Exception { + + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + + internalProducer.checkErroneous(); + + byte[] serializedKey = internalProducer.schema.serializeKey(next); + byte[] serializedValue = internalProducer.schema.serializeValue(next); + String targetTopic = internalProducer.schema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = internalProducer.defaultTopicId; + } + + Long timestamp = null; + if(this.writeTimestampToKafka) { + timestamp = elementTimestamp; + } + + ProducerRecord<byte[], byte[]> record; + if (internalProducer.partitioner == null) { + record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); + } else { + record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue); + } + if (internalProducer.flushOnCheckpoint) { + synchronized (internalProducer.pendingRecordsLock) { + internalProducer.pendingRecords++; + } + } + internalProducer.producer.send(record, internalProducer.callback); } + + // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ---- + + + // ---- Configuration setters + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * Method is only accessible for approach (a) (see above) + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.setLogFailuresOnly(logFailuresOnly); + } + + /** + * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. + * + * Method is only accessible for approach (a) (see above) + * + * @param flush Flag indicating the flushing mode (true = flush on checkpoint) + */ + public void setFlushOnCheckpoint(boolean flush) { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.setFlushOnCheckpoint(flush); + } + + /** + * This method is used for approach (a) (see above) + * + */ + @Override + public void open(Configuration parameters) throws Exception { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.open(parameters); + } + + /** + * This method is used for approach (a) (see above) + */ + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + return internalProducer.getIterationRuntimeContext(); + } + + /** + * This method is used for approach (a) (see above) + */ @Override - protected void flush() { - if (this.producer != null) { - producer.flush(); + public void setRuntimeContext(RuntimeContext t) { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.setRuntimeContext(t); + } + + /** + * Invoke method for using the Sink as DataStream.addSink() sink. + * + * This method is used for approach (a) (see above) + * + * @param value The input record. + */ + @Override + public void invoke(T value) throws Exception { + invokeInternal(value, Long.MAX_VALUE); + } + + + // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ---- + + + /** + * Process method for using the sink with timestamp support. + * + * This method is used for approach (b) (see above) + */ + @Override + public void processElement(StreamRecord<T> element) throws Exception { + invokeInternal(element.getValue(), element.getTimestamp()); + } + + /** + * Configuration object returned by the writeToKafkaWithTimestamps() call. + */ + public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> { + + private final FlinkKafkaProducerBase wrappedProducerBase; + private final FlinkKafkaProducer010 producer; + + private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) { + //noinspection unchecked + super(stream, producer); + this.producer = producer; + this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction; + } + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly); + } + + /** + * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. + * + * @param flush Flag indicating the flushing mode (true = flush on checkpoint) + */ + public void setFlushOnCheckpoint(boolean flush) { + this.wrappedProducerBase.setFlushOnCheckpoint(flush); + } + + /** + * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. + * Timestamps must be positive for Kafka to accept them. + * + * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. + */ + public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { + this.producer.writeTimestampToKafka = writeTimestampToKafka; } } + + } http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java index cda68ce..ddf1ad3 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java @@ -28,7 +28,7 @@ import java.util.Properties; /** * Kafka {@link StreamTableSource} for Kafka 0.10. */ -public class Kafka010JsonTableSource extends KafkaJsonTableSource { +public class Kafka010JsonTableSource extends Kafka09JsonTableSource { /** * Creates a Kafka 0.10 JSON {@link StreamTableSource}. http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java index cee1b90..732440b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java @@ -28,7 +28,7 @@ import java.util.Properties; /** * Kafka {@link StreamTableSource} for Kafka 0.10. */ -public class Kafka010TableSource extends KafkaTableSource { +public class Kafka010TableSource extends Kafka09TableSource { /** * Creates a Kafka 0.10 {@link StreamTableSource}. http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index 70f530b..47bee22 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -18,37 +18,20 @@ package org.apache.flink.streaming.connectors.kafka.internal; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; -import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; /** @@ -56,40 +39,7 @@ import java.util.Properties; * * @param <T> The type of elements produced by the fetcher. */ -public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(Kafka010Fetcher.class); - - // ------------------------------------------------------------------------ - - /** The schema to convert between Kafka's byte messages, and Flink's objects */ - private final KeyedDeserializationSchema<T> deserializer; - - /** The subtask's runtime context */ - private final RuntimeContext runtimeContext; - - /** The configuration for the Kafka consumer */ - private final Properties kafkaProperties; - - /** The maximum number of milliseconds to wait for a fetch batch */ - private final long pollTimeout; - - /** Flag whether to register Kafka metrics as Flink accumulators */ - private final boolean forwardKafkaMetrics; - - /** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */ - private final Object consumerLock = new Object(); - - /** Reference to the Kafka consumer, once it is created */ - private volatile KafkaConsumer<byte[], byte[]> consumer; - - /** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */ - private volatile ExceptionProxy errorHandler; - - /** Flag to mark the main work loop as alive */ - private volatile boolean running = true; - - // ------------------------------------------------------------------------ +public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { public Kafka010Fetcher( SourceContext<T> sourceContext, @@ -100,213 +50,47 @@ public class Kafka010Fetcher<T> extends AbstractFetcher<T, TopicPartition> imple KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, - boolean forwardKafkaMetrics) throws Exception + boolean useMetrics) throws Exception { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext); - - this.deserializer = deserializer; - this.runtimeContext = runtimeContext; - this.kafkaProperties = kafkaProperties; - this.pollTimeout = pollTimeout; - this.forwardKafkaMetrics = forwardKafkaMetrics; - - // if checkpointing is enabled, we are not automatically committing to Kafka. - kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - Boolean.toString(!runtimeContext.isCheckpointingEnabled())); + super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, pollTimeout, useMetrics); } - // ------------------------------------------------------------------------ - // Fetcher work methods - // ------------------------------------------------------------------------ - @Override - public void runFetchLoop() throws Exception { - this.errorHandler = new ExceptionProxy(Thread.currentThread()); - - // rather than running the main fetch loop directly here, we spawn a dedicated thread - // this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code - Thread runner = new Thread(this, "Kafka 0.10 Fetcher for " + runtimeContext.getTaskNameWithSubtasks()); - runner.setDaemon(true); - runner.start(); - - try { - runner.join(); - } catch (InterruptedException e) { - // may be the result of a wake-up after an exception. we ignore this here and only - // restore the interruption state - Thread.currentThread().interrupt(); - } - - // make sure we propagate any exception that occurred in the concurrent fetch thread, - // before leaving this method - this.errorHandler.checkAndThrowException(); + protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) { + consumer.assign(topicPartitions); } @Override - public void cancel() { - // flag the main thread to exit - running = false; - - // NOTE: - // - We cannot interrupt the runner thread, because the Kafka consumer may - // deadlock when the thread is interrupted while in certain methods - // - We cannot call close() on the consumer, because it will actually throw - // an exception if a concurrent call is in progress - - // make sure the consumer finds out faster that we are shutting down - if (consumer != null) { - consumer.wakeup(); - } + protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception { + // get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x) + super.emitRecord(record, partition, offset, consumerRecord.timestamp()); } + /** + * Emit record Kafka-timestamp aware. + */ @Override - public void run() { - // This method initializes the KafkaConsumer and guarantees it is torn down properly. - // This is important, because the consumer has multi-threading issues, - // including concurrent 'close()' calls. - - final KafkaConsumer<byte[], byte[]> consumer; - try { - consumer = new KafkaConsumer<>(kafkaProperties); - } - catch (Throwable t) { - running = false; - errorHandler.reportError(t); - return; - } - - // from here on, the consumer will be closed properly - try { - consumer.assign(convertKafkaPartitions(subscribedPartitions())); - - // register Kafka metrics to Flink accumulators - if (forwardKafkaMetrics) { - Map<MetricName, ? extends Metric> metrics = consumer.metrics(); - if (metrics == null) { - // MapR's Kafka implementation returns null here. - LOG.info("Consumer implementation does not support metrics"); - } else { - // we have metrics, register them where possible - for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) { - String name = "KafkaConsumer-" + metric.getKey().name(); - DefaultKafkaMetricAccumulator kafkaAccumulator = - DefaultKafkaMetricAccumulator.createFor(metric.getValue()); - - // best effort: we only add the accumulator if available. - if (kafkaAccumulator != null) { - runtimeContext.addAccumulator(name, kafkaAccumulator); - } - } - } - } - - // seek the consumer to the initial offsets - for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { - if (partition.isOffsetDefined()) { - consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); - } - } - - // from now on, external operations may call the consumer - this.consumer = consumer; - - // main fetch loop - while (running) { - // get the next batch of records - final ConsumerRecords<byte[], byte[]> records; - synchronized (consumerLock) { - try { - records = consumer.poll(pollTimeout); - } - catch (WakeupException we) { - if (running) { - throw we; - } else { - continue; - } - } - } - - // get the records for each topic partition - for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { - - List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); - - for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { - T value = deserializer.deserialize( - record.key(), record.value(), - record.topic(), record.partition(), record.offset()); - - if (deserializer.isEndOfStream(value)) { - // end of stream signaled - running = false; - break; - } - - // emit the actual record. this also update offset state atomically - // and deals with timestamps and watermark generation - emitRecord(value, partition, record.offset()); - } - } + protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception { + if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { + // fast path logic, in case there are no watermarks + + // emit the record, using the checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, timestamp); + partitionState.setOffset(offset); } - // end main fetch loop } - catch (Throwable t) { - if (running) { - running = false; - errorHandler.reportError(t); - } else { - LOG.debug("Stopped ConsumerThread threw exception", t); - } + else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { + emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp); } - finally { - try { - synchronized (consumerLock) { - consumer.close(); - } - } catch (Throwable t) { - LOG.warn("Error while closing Kafka 0.10 consumer", t); - } + else { + emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp); } } - // ------------------------------------------------------------------------ - // Kafka 0.10 specific fetcher behavior - // ------------------------------------------------------------------------ - @Override - public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { - return new TopicPartition(partition.getTopic(), partition.getPartition()); - } - - @Override - public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { - KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions(); - Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); - - for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { - Long offset = offsets.get(partition.getKafkaTopicPartition()); - if (offset != null) { - offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, "")); - } - } - - if (this.consumer != null) { - synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - } - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - public static Collection<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) { - ArrayList<TopicPartition> result = new ArrayList<>(partitions.length); - for (KafkaTopicPartitionState<TopicPartition> p : partitions) { - result.add(p.getKafkaPartitionHandle()); - } - return result; + protected String getFetcherName() { + return "Kafka 0.10 Fetcher"; } } http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 5427853..28bf6d5 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -17,14 +17,32 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.junit.Test; -import java.util.UUID; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.IOException; public class Kafka010ITCase extends KafkaConsumerTestBase { @@ -33,10 +51,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { // Suite of Tests // ------------------------------------------------------------------------ - @Override - public String getExpectedKafkaVersion() { - return "0.10"; - } @Test(timeout = 60000) public void testFailOnNoBroker() throws Exception { @@ -48,16 +62,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { runSimpleConcurrentProducerConsumerTopology(); } -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumer() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(false); -// } - -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(true); -// } - @Test(timeout = 60000) public void testKeyValueSupport() throws Exception { runKeyValueTest(); @@ -124,68 +128,168 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { @Test(timeout = 60000) public void testMetricsAndEndOfStream() throws Exception { - runMetricsAndEndOfStreamTest(); - } - - @Test - public void testJsonTableSource() throws Exception { - String topic = UUID.randomUUID().toString(); - - // Names and types are determined in the actual test method of the - // base test class. - Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource( - topic, - standardProps, - new String[] { - "long", - "string", - "boolean", - "double", - "missing-field"}, - new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO }); - - // Don't fail on missing field, but set to null (default) - tableSource.setFailOnMissingField(false); - - runJsonTableSource(topic, tableSource); - } - - @Test - public void testJsonTableSourceWithFailOnMissingField() throws Exception { - String topic = UUID.randomUUID().toString(); - - // Names and types are determined in the actual test method of the - // base test class. - Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource( - topic, - standardProps, - new String[] { - "long", - "string", - "boolean", - "double", - "missing-field"}, - new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO }); - - // Don't fail on missing field, but set to null (default) - tableSource.setFailOnMissingField(true); - - try { - runJsonTableSource(topic, tableSource); - fail("Did not throw expected Exception"); - } catch (Exception e) { - Throwable rootCause = e.getCause().getCause().getCause(); - assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException); + runEndOfStreamTest(); + } + + /** + * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka + */ + @Test(timeout = 60000) + public void testTimestamps() throws Exception { + + final String topic = "tstopic"; + createTestTopic(topic, 3, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() { + boolean running = true; + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + long i = 0; + while(running) { + ctx.collectWithTimestamp(i, i*2); + if(i++ == 1000L) { + running = false; + } + } + } + + @Override + public void cancel() { + running = false; + } + }); + + final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig()); + FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() { + @Override + public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return (int)(next % 3); + } + }); + prod.setParallelism(3); + prod.setWriteTimestampToKafka(true); + env.execute("Produce some"); + + // ---------- Consume stream from Kafka ------------------- + + env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps); + kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() { + @Nullable + @Override + public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { + if(lastElement % 10 == 0) { + return new Watermark(lastElement); + } + return null; + } + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return previousElementTimestamp; + } + }); + + DataStream<Long> stream = env.addSource(kafkaSource); + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1); + + env.execute("Consume again"); + + deleteTestTopic(topic); + } + + private static class TimestampValidatingOperator extends StreamSink<Long> { + + public TimestampValidatingOperator() { + super(new SinkFunction<Long>() { + @Override + public void invoke(Long value) throws Exception { + throw new RuntimeException("Unexpected"); + } + }); + } + + long elCount = 0; + long wmCount = 0; + long lastWM = Long.MIN_VALUE; + + @Override + public void processElement(StreamRecord<Long> element) throws Exception { + elCount++; + if(element.getValue() * 2 != element.getTimestamp()) { + throw new RuntimeException("Invalid timestamp: " + element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + wmCount++; + + if(lastWM <= mark.getTimestamp()) { + lastWM = mark.getTimestamp(); + } else { + throw new RuntimeException("Received watermark higher than the last one"); + } + + if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) { + throw new RuntimeException("Invalid watermark: " + mark.getTimestamp()); + } + } + + @Override + public void close() throws Exception { + super.close(); + if(elCount != 1000L) { + throw new RuntimeException("Wrong final element count " + elCount); + } + + if(wmCount <= 2) { + throw new RuntimeException("Almost no watermarks have been sent " + wmCount); + } + } + } + + private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> { + + private final TypeInformation<Long> ti; + private final TypeSerializer<Long> ser; + long cnt = 0; + + public LimitedLongDeserializer() { + this.ti = TypeInfoParser.parse("Long"); + this.ser = ti.createSerializer(new ExecutionConfig()); + } + @Override + public TypeInformation<Long> getProducedType() { + return ti; + } + + @Override + public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + cnt++; + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Long e = ser.deserialize(in); + return e; + } + + @Override + public boolean isEndOfStream(Long nextElement) { + return cnt > 1000L; } } http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java deleted file mode 100644 index 5f5ac63..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.util.TestLogger; - -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.PartitionInfo; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.Future; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(FlinkKafkaProducerBase.class) -public class KafkaProducerTest extends TestLogger { - - @Test - @SuppressWarnings("unchecked") - public void testPropagateExceptions() { - try { - // mock kafka producer - KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); - - // partition setup - when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( - Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); - - // failure when trying to send an element - when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) - .thenAnswer(new Answer<Future<RecordMetadata>>() { - @Override - public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { - Callback callback = (Callback) invocation.getArguments()[1]; - callback.onCompletion(null, new Exception("Test error")); - return null; - } - }); - - // make sure the FlinkKafkaProducer instantiates our mock producer - whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); - - // (1) producer that propagates errors - - FlinkKafkaProducer010<String> producerPropagating = new FlinkKafkaProducer010<>( - "mock_topic", new SimpleStringSchema(), new Properties(), null); - - producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3)); - producerPropagating.open(new Configuration()); - - try { - producerPropagating.invoke("value"); - producerPropagating.invoke("value"); - fail("This should fail with an exception"); - } - catch (Exception e) { - assertNotNull(e.getCause()); - assertNotNull(e.getCause().getMessage()); - assertTrue(e.getCause().getMessage().contains("Test error")); - } - - // (2) producer that only logs errors - - FlinkKafkaProducer010<String> producerLogging = new FlinkKafkaProducer010<>( - "mock_topic", new SimpleStringSchema(), new Properties(), null); - producerLogging.setLogFailuresOnly(true); - - producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3)); - producerLogging.open(new Configuration()); - - producerLogging.invoke("value"); - producerLogging.invoke("value"); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 45f0478..af6d254 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -28,6 +28,8 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -64,6 +66,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String brokerConnectionString = ""; private Properties standardProps; private Properties additionalServerProperties; + private boolean secureMode = false; + // 6 seconds is default. Seems to be too small for travis. 30 seconds + private int zkTimeout = 30000; public String getBrokerConnectionString() { return brokerConnectionString; @@ -75,6 +80,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + public Properties getSecureProperties() { + Properties prop = new Properties(); + if(secureMode) { + prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + prop.put("security.protocol", "SASL_PLAINTEXT"); + prop.put("sasl.kerberos.service.name", "kafka"); + + //add special timeout for Travis + prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("metadata.fetch.timeout.ms","120000"); + } + return prop; + } + + @Override public String getVersion() { return "0.10"; } @@ -90,10 +111,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); - return prod; + return stream.addSink(prod); + /* FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner); + sink.setFlushOnCheckpoint(true); + return sink; */ } @Override @@ -130,8 +154,21 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties) { + public boolean isSecureRunSupported() { + return true; + } + + @Override + public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { + //increase the timeout since in Travis ZK connection takes long time for secure connection. + if(secureMode) { + //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout + numKafkaServers = 1; + zkTimeout = zkTimeout * 15; + } + this.additionalServerProperties = additionalServerProperties; + this.secureMode = secureMode; File tempDir = new File(System.getProperty("java.io.tmpdir")); tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); @@ -151,9 +188,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { brokers = null; try { - LOG.info("Starting Zookeeper"); - zookeeper = new TestingServer(-1, tmpZkDir); + zookeeper = new TestingServer(- 1, tmpZkDir); zookeeperConnectionString = zookeeper.getConnectString(); + LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("Starting KafkaServer"); brokers = new ArrayList<>(numKafkaServers); @@ -161,8 +198,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { for (int i = 0; i < numKafkaServers; i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - SocketServer socketServer = brokers.get(i).socketServer(); - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + if(secureMode) { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; + } else { + brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; + } } LOG.info("ZK and KafkaServer started."); @@ -177,8 +217,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); standardProps.setProperty("auto.commit.enable", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis. - standardProps.setProperty("zookeeper.connection.timeout.ms", "30000"); + standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.10 value) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) } @@ -244,7 +284,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { final long deadline = System.currentTimeMillis() + 30000; do { try { - Thread.sleep(100); + if(secureMode) { + //increase wait time since in Travis ZK timeout occurs frequently + int wait = zkTimeout / 100; + LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); + Thread.sleep(wait); + } else { + Thread.sleep(100); + } } catch (InterruptedException e) { // restore interrupted state } @@ -295,8 +342,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); // for CI stability, increase zookeeper session timeout - kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); - kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); + kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); + kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); if(additionalServerProperties != null) { kafkaProperties.putAll(additionalServerProperties); } @@ -306,6 +353,15 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { for (int i = 1; i <= numTries; i++) { int kafkaPort = NetUtils.getAvailablePort(); kafkaProperties.put("port", Integer.toString(kafkaPort)); + + //to support secure kafka cluster + if(secureMode) { + LOG.info("Adding Kafka secure configurations"); + kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.putAll(getSecureProperties()); + } + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); try { http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java index 35e491a..1302348 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -376,7 +376,7 @@ class SimpleConsumerThread<T> extends Thread { continue partitionsLoop; } - owner.emitRecord(value, currentPartition, offset); + owner.emitRecord(value, currentPartition, offset, Long.MIN_VALUE); } else { // no longer running http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index cbf3d06..a0d5002 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -31,6 +31,9 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; @@ -101,10 +104,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner); prod.setFlushOnCheckpoint(true); - return prod; + return stream.addSink(prod); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 9708777..a97476a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -81,11 +81,11 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { // ------------------------------------------------------------------------ /** User-supplied properties for Kafka **/ - private final Properties properties; + protected final Properties properties; /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not * available. If 0, returns immediately with any records that are available now */ - private final long pollTimeout; + protected final long pollTimeout; // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java index eb3440a..2a3e39d 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java @@ -27,7 +27,7 @@ import java.util.Properties; /** - * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8. + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9. * * Please note that this producer does not have any reliability guarantees. * http://git-wip-us.apache.org/repos/asf/flink/blob/6731ec1e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index aaec9dc..37e40fc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -131,7 +131,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem // rather than running the main fetch loop directly here, we spawn a dedicated thread // this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code - Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + runtimeContext.getTaskNameWithSubtasks()); + Thread runner = new Thread(this, getFetcherName() + " for " + runtimeContext.getTaskNameWithSubtasks()); runner.setDaemon(true); runner.start(); @@ -183,7 +183,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem // from here on, the consumer will be closed properly try { - consumer.assign(convertKafkaPartitions(subscribedPartitions())); + assignPartitionsToConsumer(consumer, convertKafkaPartitions(subscribedPartitions())); + if (useMetrics) { final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer"); @@ -250,7 +251,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem // emit the actual record. this also update offset state atomically // and deals with timestamps and watermark generation - emitRecord(value, partition, record.offset()); + emitRecord(value, partition, record.offset(), record); } } } @@ -274,6 +275,21 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem } } + // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method. + protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception { + emitRecord(record, partition, offset, Long.MIN_VALUE); + } + /** + * Protected method to make the partition assignment pluggable, for different Kafka versions. + */ + protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer, List<TopicPartition> topicPartitions) { + consumer.assign(topicPartitions); + } + + protected String getFetcherName() { + return "Kafka 0.9 Fetcher"; + } + // ------------------------------------------------------------------------ // Kafka 0.9 specific fetcher behavior // ------------------------------------------------------------------------