http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-flume/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-flume/pom.xml b/flink-connectors/flink-connector-flume/pom.xml new file mode 100644 index 0000000..64860de --- /dev/null +++ b/flink-connectors/flink-connector-flume/pom.xml @@ -0,0 +1,175 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-flume_2.10</artifactId> + <name>flink-connector-flume</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <flume-ng.version>1.5.0</flume-ng.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume-ng.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>org.tukaani</groupId> + <artifactId>xz</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.velocity</groupId> + <artifactId>velocity</artifactId> + </exclusion> + <exclusion> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <configuration> + <artifactSet> + <includes combine.children="append"> + <!-- We include all dependencies that transitively depend on guava --> + <include>org.apache.flume:*</include> + </includes> + </artifactSet> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java new file mode 100644 index 0000000..2dc043b --- /dev/null +++ b/flink-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.flume; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlumeSink<IN> extends RichSinkFunction<IN> { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class); + + private transient FlinkRpcClientFacade client; + boolean initDone = false; + String host; + int port; + SerializationSchema<IN> schema; + + public FlumeSink(String host, int port, SerializationSchema<IN> schema) { + this.host = host; + this.port = port; + this.schema = schema; + } + + /** + * Receives tuples from the Apache Flink {@link DataStream} and forwards + * them to Apache Flume. + * + * @param value + * The tuple arriving from the datastream + */ + @Override + public void invoke(IN value) { + + byte[] data = schema.serialize(value); + client.sendDataToFlume(data); + + } + + private class FlinkRpcClientFacade { + private RpcClient client; + private String hostname; + private int port; + + /** + * Initializes the connection to Apache Flume. + * + * @param hostname + * The host + * @param port + * The port. + */ + public void init(String hostname, int port) { + // Setup the RPC connection + this.hostname = hostname; + this.port = port; + int initCounter = 0; + while (true) { + if (initCounter >= 90) { + throw new RuntimeException("Cannot establish connection with" + port + " at " + + host); + } + try { + this.client = RpcClientFactory.getDefaultInstance(hostname, port); + } catch (FlumeException e) { + // Wait one second if the connection failed before the next + // try + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + if (LOG.isErrorEnabled()) { + LOG.error("Interrupted while trying to connect {} at {}", port, host); + } + } + } + if (client != null) { + break; + } + initCounter++; + } + initDone = true; + } + + /** + * Sends byte arrays as {@link Event} series to Apache Flume. + * + * @param data + * The byte array to send to Apache FLume + */ + public void sendDataToFlume(byte[] data) { + Event event = EventBuilder.withBody(data); + + try { + client.append(event); + + } catch (EventDeliveryException e) { + // clean up and recreate the client + client.close(); + client = null; + client = RpcClientFactory.getDefaultInstance(hostname, port); + } + } + + } + + @Override + public void close() { + client.client.close(); + } + + @Override + public void open(Configuration config) { + client = new FlinkRpcClientFacade(); + client.init(host, port); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml new file mode 100644 index 0000000..26352bb --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml @@ -0,0 +1,205 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-0.10_2.10</artifactId> + <name>flink-connector-kafka-0.10</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.10.0.1</kafka.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.9_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Add Kafka 0.10.x as a dependency --> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, + won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.9_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude Kafka dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude Kafka dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <!-- include 0.10 server for tests --> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-test-sources</id> + <goals> + <goal>test-jar-no-fork</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java new file mode 100644 index 0000000..a9ce336 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.util.SerializedValue; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer + * has consumed a topic.</p> + * + * <p>Please refer to Kafka's documentation for the available configuration properties: + * http://kafka.apache.org/documentation.html#newconsumerconfigs</p> + * + * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer + * is constructed. That means that the client that submits the program needs to be able to + * reach the Kafka brokers or ZooKeeper.</p> + */ +public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { + + private static final long serialVersionUID = 2324564345203409112L; + + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * @param topic + * The name of the topic that should be consumed. + * @param valueDeserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + this(Collections.singletonList(topic), valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic + * The name of the topic that should be consumed. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + this(Collections.singletonList(topic), deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * This constructor allows passing multiple topics to the consumer. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.10.x + * + * This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(topics, deserializer, props); + } + + @Override + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + + boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); + + return new Kafka010Fetcher<>( + sourceContext, + thisSubtaskPartitions, + watermarksPeriodic, + watermarksPunctuated, + runtimeContext.getProcessingTimeService(), + runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), + runtimeContext.getUserCodeClassLoader(), + runtimeContext.isCheckpointingEnabled(), + runtimeContext.getTaskNameWithSubtasks(), + runtimeContext.getMetricGroup(), + deserializer, + properties, + pollTimeout, + useMetrics); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java new file mode 100644 index 0000000..cc0194b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a second invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction { + + /** + * Flag controlling whether we are writing the Flink record's timestamp into Kafka. + */ + private boolean writeTimestampToKafka = false; + + // ---------------------- "Constructors" for timestamp writing ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + */ + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + KeyedSerializationSchema<T> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); + } + + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined (keyless) serialization schema. + * @param producerConfig Properties with the producer configuration. + */ + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + SerializationSchema<T> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + KeyedSerializationSchema<T> serializationSchema, + Properties producerConfig, + KafkaPartitioner<T> customPartitioner) { + + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); + SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + } + + // ---------------------- Regular constructors w/o timestamp support ------------------ + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); + } + + /** + * Create Kafka producer + * + * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + */ + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { + // We create a Kafka 09 producer instance here and only "override" (by intercepting) the + // invoke call. + super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); + } + + + // ----------------------------- Generic element processing --------------------------- + + private void invokeInternal(T next, long elementTimestamp) throws Exception { + + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + + internalProducer.checkErroneous(); + + byte[] serializedKey = internalProducer.schema.serializeKey(next); + byte[] serializedValue = internalProducer.schema.serializeValue(next); + String targetTopic = internalProducer.schema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = internalProducer.defaultTopicId; + } + + Long timestamp = null; + if(this.writeTimestampToKafka) { + timestamp = elementTimestamp; + } + + ProducerRecord<byte[], byte[]> record; + if (internalProducer.partitioner == null) { + record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); + } else { + record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue); + } + if (internalProducer.flushOnCheckpoint) { + synchronized (internalProducer.pendingRecordsLock) { + internalProducer.pendingRecords++; + } + } + internalProducer.producer.send(record, internalProducer.callback); + } + + + // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ---- + + + // ---- Configuration setters + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * Method is only accessible for approach (a) (see above) + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.setLogFailuresOnly(logFailuresOnly); + } + + /** + * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. + * + * Method is only accessible for approach (a) (see above) + * + * @param flush Flag indicating the flushing mode (true = flush on checkpoint) + */ + public void setFlushOnCheckpoint(boolean flush) { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.setFlushOnCheckpoint(flush); + } + + /** + * This method is used for approach (a) (see above) + * + */ + @Override + public void open(Configuration parameters) throws Exception { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.open(parameters); + } + + /** + * This method is used for approach (a) (see above) + */ + @Override + public IterationRuntimeContext getIterationRuntimeContext() { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + return internalProducer.getIterationRuntimeContext(); + } + + /** + * This method is used for approach (a) (see above) + */ + @Override + public void setRuntimeContext(RuntimeContext t) { + final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + internalProducer.setRuntimeContext(t); + } + + /** + * Invoke method for using the Sink as DataStream.addSink() sink. + * + * This method is used for approach (a) (see above) + * + * @param value The input record. + */ + @Override + public void invoke(T value) throws Exception { + invokeInternal(value, Long.MAX_VALUE); + } + + + // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ---- + + + /** + * Process method for using the sink with timestamp support. + * + * This method is used for approach (b) (see above) + */ + @Override + public void processElement(StreamRecord<T> element) throws Exception { + invokeInternal(element.getValue(), element.getTimestamp()); + } + + /** + * Configuration object returned by the writeToKafkaWithTimestamps() call. + */ + public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> { + + private final FlinkKafkaProducerBase wrappedProducerBase; + private final FlinkKafkaProducer010 producer; + + private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) { + //noinspection unchecked + super(stream, producer); + this.producer = producer; + this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction; + } + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly); + } + + /** + * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. + * + * @param flush Flag indicating the flushing mode (true = flush on checkpoint) + */ + public void setFlushOnCheckpoint(boolean flush) { + this.wrappedProducerBase.setFlushOnCheckpoint(flush); + } + + /** + * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. + * Timestamps must be positive for Kafka to accept them. + * + * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. + */ + public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { + this.producer.writeTimestampToKafka = writeTimestampToKafka; + } + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java new file mode 100644 index 0000000..ddf1ad3 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.10. + */ +public class Kafka010JsonTableSource extends Kafka09JsonTableSource { + + /** + * Creates a Kafka 0.10 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.10 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java new file mode 100644 index 0000000..732440b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.10. + */ +public class Kafka010TableSource extends Kafka09TableSource { + + /** + * Creates a Kafka 0.10 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.10 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka010TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java new file mode 100644 index 0000000..71dd29a --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; +import java.util.Properties; + +/** + * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API. + * + * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally + * takes the KafkaRecord-attached timestamp and attaches it to the Flink records. + * + * @param <T> The type of elements produced by the fetcher. + */ +public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { + + public Kafka010Fetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, + boolean enableCheckpointing, + String taskNameWithSubtasks, + MetricGroup metricGroup, + KeyedDeserializationSchema<T> deserializer, + Properties kafkaProperties, + long pollTimeout, + boolean useMetrics) throws Exception + { + super( + sourceContext, + assignedPartitions, + watermarksPeriodic, + watermarksPunctuated, + processingTimeProvider, + autoWatermarkInterval, + userCodeClassLoader, + enableCheckpointing, + taskNameWithSubtasks, + metricGroup, + deserializer, + kafkaProperties, + pollTimeout, + useMetrics); + } + + @Override + protected void emitRecord( + T record, + KafkaTopicPartitionState<TopicPartition> partition, + long offset, + ConsumerRecord<?, ?> consumerRecord) throws Exception { + + // we attach the Kafka 0.10 timestamp here + emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); + } + + /** + * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10, + * changing binary signatures + */ + @Override + protected KafkaConsumerCallBridge010 createCallBridge() { + return new KafkaConsumerCallBridge010(); + } + + @Override + protected String getFetcherName() { + return "Kafka 0.10 Fetcher"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java new file mode 100644 index 0000000..a81b098 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +/** + * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method. + * + * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10, + * changing {@code assign(List)} to {@code assign(Collection)}. + * + * Because of that, we need two versions whose compiled code goes against different method signatures. + */ +public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { + + @Override + public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception { + consumer.assign(topicPartitions); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java new file mode 100644 index 0000000..6ee0429 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.internal.Handover; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; +import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka010Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Kafka010FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( + sourceContext, + topics, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10, + getClass().getClassLoader(), + false, /* checkpointing */ + "taskname-with-subtask", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- + + final AtomicReference<Throwable> commitError = new AtomicReference<>(); + final Thread committer = new Thread("committer runner") { + @Override + public void run() { + try { + fetcher.commitInternalOffsetsToKafka(testCommitData); + } catch (Throwable t) { + commitError.set(t); + } + } + }; + committer.start(); + + // ----- ensure that the committer finishes in time ----- + committer.join(30000); + assertFalse("The committer did not finish in time", committer.isAlive()); + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable fetcherError = error.get(); + if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) { + throw new Exception("Exception in the fetcher", fetcherError); + } + final Throwable committerError = commitError.get(); + if (committerError != null) { + throw new Exception("Exception in the committer", committerError); + } + } + + @Test + public void ensureOffsetsGetCommitted() throws Exception { + + // test data + final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); + final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); + + final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>(); + testCommitData1.put(testPartition1, 11L); + testCommitData1.put(testPartition2, 18L); + + final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>(); + testCommitData2.put(testPartition1, 19L); + testCommitData2.put(testPartition2, 28L); + + final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>(); + + + // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- + + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + Map<TopicPartition, OffsetAndMetadata> offsets = + (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0]; + + OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; + + commitStore.add(offsets); + callback.onComplete(offsets, null); + + return null; + } + }).when(mockConsumer).commitAsync( + Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class)); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = mock(SourceContext.class); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + + final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( + sourceContext, + topics, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10, + getClass().getClassLoader(), + false, /* checkpointing */ + "taskname-with-subtask", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // ----- trigger the first offset commit ----- + + fetcher.commitInternalOffsetsToKafka(testCommitData1); + Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); + + for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(12L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(18L, entry.getValue().offset()); + } + } + + // ----- trigger the second offset commit ----- + + fetcher.commitInternalOffsetsToKafka(testCommitData2); + Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); + + for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(20L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(28L, entry.getValue().offset()); + } + } + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) { + throw new Exception("Exception in the fetcher", caughtError); + } + } + + @Test + public void testCancellationWhenEmitBlocks() throws Exception { + + // ----- some test data ----- + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4}; + + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload), + new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload), + new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload)); + + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); + + // ----- the test consumer ----- + + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- build a fetcher ----- + + BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + + final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( + sourceContext, + topics, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the thread started to emit records to the source context + sourceContext.waitTillHasBlocker(); + + // now we try to cancel the fetcher, including the interruption usually done on the task thread + // once it has finished, there must be no more thread blocked on the source context + fetcher.cancel(); + fetcherRunner.interrupt(); + fetcherRunner.join(); + + assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + private static final class BlockingSourceContext<T> implements SourceContext<T> { + + private final ReentrantLock lock = new ReentrantLock(); + private final OneShotLatch inBlocking = new OneShotLatch(); + + @Override + public void collect(T element) { + block(); + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + block(); + } + + @Override + public void emitWatermark(Watermark mark) { + block(); + } + + @Override + public Object getCheckpointLock() { + return new Object(); + } + + @Override + public void close() {} + + public void waitTillHasBlocker() throws InterruptedException { + inBlocking.await(); + } + + public boolean isStillBlocking() { + return lock.isLocked(); + } + + @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"}) + private void block() { + lock.lock(); + try { + inBlocking.trigger(); + + // put this thread to sleep indefinitely + final Object o = new Object(); + while (true) { + synchronized (o) { + o.wait(); + } + } + } + catch (InterruptedException e) { + // exit cleanly, simply reset the interruption flag + Thread.currentThread().interrupt(); + } + finally { + lock.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java new file mode 100644 index 0000000..08511c9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.IOException; + + +public class Kafka010ITCase extends KafkaConsumerTestBase { + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + @Test(timeout = 60000) + public void testFailOnDeploy() throws Exception { + runFailOnDeployTest(); + } + + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout = 60000) + public void testMetricsAndEndOfStream() throws Exception { + runEndOfStreamTest(); + } + + // --- offset committing --- + + @Test(timeout = 60000) + public void testCommitOffsetsToKafka() throws Exception { + runCommitOffsetsToKafka(); + } + + @Test(timeout = 60000) + public void testStartFromKafkaCommitOffsets() throws Exception { + runStartFromKafkaCommitOffsets(); + } + + @Test(timeout = 60000) + public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); + } + + /** + * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka + */ + @Test(timeout = 60000) + public void testTimestamps() throws Exception { + + final String topic = "tstopic"; + createTestTopic(topic, 3, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() { + boolean running = true; + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + long i = 0; + while(running) { + ctx.collectWithTimestamp(i, i*2); + if(i++ == 1000L) { + running = false; + } + } + } + + @Override + public void cancel() { + running = false; + } + }); + + final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig()); + FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() { + @Override + public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return (int)(next % 3); + } + }); + prod.setParallelism(3); + prod.setWriteTimestampToKafka(true); + env.execute("Produce some"); + + // ---------- Consume stream from Kafka ------------------- + + env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps); + kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() { + @Nullable + @Override + public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { + if(lastElement % 10 == 0) { + return new Watermark(lastElement); + } + return null; + } + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return previousElementTimestamp; + } + }); + + DataStream<Long> stream = env.addSource(kafkaSource); + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1); + + env.execute("Consume again"); + + deleteTestTopic(topic); + } + + private static class TimestampValidatingOperator extends StreamSink<Long> { + + public TimestampValidatingOperator() { + super(new SinkFunction<Long>() { + @Override + public void invoke(Long value) throws Exception { + throw new RuntimeException("Unexpected"); + } + }); + } + + long elCount = 0; + long wmCount = 0; + long lastWM = Long.MIN_VALUE; + + @Override + public void processElement(StreamRecord<Long> element) throws Exception { + elCount++; + if(element.getValue() * 2 != element.getTimestamp()) { + throw new RuntimeException("Invalid timestamp: " + element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + wmCount++; + + if(lastWM <= mark.getTimestamp()) { + lastWM = mark.getTimestamp(); + } else { + throw new RuntimeException("Received watermark higher than the last one"); + } + + if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) { + throw new RuntimeException("Invalid watermark: " + mark.getTimestamp()); + } + } + + @Override + public void close() throws Exception { + super.close(); + if(elCount != 1000L) { + throw new RuntimeException("Wrong final element count " + elCount); + } + + if(wmCount <= 2) { + throw new RuntimeException("Almost no watermarks have been sent " + wmCount); + } + } + } + + private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> { + + private final TypeInformation<Long> ti; + private final TypeSerializer<Long> ser; + long cnt = 0; + + public LimitedLongDeserializer() { + this.ti = TypeInfoParser.parse("Long"); + this.ser = ti.createSerializer(new ExecutionConfig()); + } + @Override + public TypeInformation<Long> getProducedType() { + return ti; + } + + @Override + public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + cnt++; + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Long e = ser.deserialize(in); + return e; + } + + @Override + public boolean isEndOfStream(Long nextElement) { + return cnt > 1000L; + } + } + +}