http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-streaming-connectors/flink-connector-flume/pom.xml deleted file mode 100644 index 1b1b810..0000000 --- a/flink-streaming-connectors/flink-connector-flume/pom.xml +++ /dev/null @@ -1,175 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-flume_2.10</artifactId> - <name>flink-connector-flume</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <flume-ng.version>1.5.0</flume-ng.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-core</artifactId> - <version>${flume-ng.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </exclusion> - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - <exclusion> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </exclusion> - <exclusion> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>com.thoughtworks.paranamer</groupId> - <artifactId>paranamer</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </exclusion> - <exclusion> - <groupId>org.tukaani</groupId> - <artifactId>xz</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> - </exclusion> - <exclusion> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. --> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <configuration> - <artifactSet> - <includes combine.children="append"> - <!-- We include all dependencies that transitively depend on guava --> - <include>org.apache.flume:*</include> - </includes> - </artifactSet> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java deleted file mode 100644 index 2dc043b..0000000 --- a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.flume; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.api.RpcClient; -import org.apache.flume.api.RpcClientFactory; -import org.apache.flume.event.EventBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlumeSink<IN> extends RichSinkFunction<IN> { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class); - - private transient FlinkRpcClientFacade client; - boolean initDone = false; - String host; - int port; - SerializationSchema<IN> schema; - - public FlumeSink(String host, int port, SerializationSchema<IN> schema) { - this.host = host; - this.port = port; - this.schema = schema; - } - - /** - * Receives tuples from the Apache Flink {@link DataStream} and forwards - * them to Apache Flume. - * - * @param value - * The tuple arriving from the datastream - */ - @Override - public void invoke(IN value) { - - byte[] data = schema.serialize(value); - client.sendDataToFlume(data); - - } - - private class FlinkRpcClientFacade { - private RpcClient client; - private String hostname; - private int port; - - /** - * Initializes the connection to Apache Flume. - * - * @param hostname - * The host - * @param port - * The port. - */ - public void init(String hostname, int port) { - // Setup the RPC connection - this.hostname = hostname; - this.port = port; - int initCounter = 0; - while (true) { - if (initCounter >= 90) { - throw new RuntimeException("Cannot establish connection with" + port + " at " - + host); - } - try { - this.client = RpcClientFactory.getDefaultInstance(hostname, port); - } catch (FlumeException e) { - // Wait one second if the connection failed before the next - // try - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - if (LOG.isErrorEnabled()) { - LOG.error("Interrupted while trying to connect {} at {}", port, host); - } - } - } - if (client != null) { - break; - } - initCounter++; - } - initDone = true; - } - - /** - * Sends byte arrays as {@link Event} series to Apache Flume. - * - * @param data - * The byte array to send to Apache FLume - */ - public void sendDataToFlume(byte[] data) { - Event event = EventBuilder.withBody(data); - - try { - client.append(event); - - } catch (EventDeliveryException e) { - // clean up and recreate the client - client.close(); - client = null; - client = RpcClientFactory.getDefaultInstance(hostname, port); - } - } - - } - - @Override - public void close() { - client.client.close(); - } - - @Override - public void open(Configuration config) { - client = new FlinkRpcClientFacade(); - client.init(host, port); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml deleted file mode 100644 index 04019f8..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml +++ /dev/null @@ -1,205 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-kafka-0.10_2.10</artifactId> - <name>flink-connector-kafka-0.10</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <kafka.version>0.10.0.1</kafka.version> - </properties> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-0.9_2.10</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- Add Kafka 0.10.x as a dependency --> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>${kafka.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> - <optional>true</optional> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-0.9_2.10</artifactId> - <version>${project.version}</version> - <exclusions> - <!-- exclude Kafka dependencies --> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - </exclusion> - </exclusions> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-base_2.10</artifactId> - <version>${project.version}</version> - <exclusions> - <!-- exclude Kafka dependencies --> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - </exclusion> - </exclusions> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <!-- include 0.10 server for tests --> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>${kafka.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-jmx</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - <configuration> - <includes> - <include>**/KafkaTestEnvironmentImpl*</include> - </includes> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <executions> - <execution> - <id>attach-test-sources</id> - <goals> - <goal>test-jar-no-fork</goal> - </goals> - <configuration> - <includes> - <include>**/KafkaTestEnvironmentImpl*</include> - </includes> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> - <forkCount>1</forkCount> - <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java deleted file mode 100644 index a9ce336..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.util.SerializedValue; - -import java.util.Collections; -import java.util.List; -import java.util.Properties; - - -/** - * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from - * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull - * data from one or more Kafka partitions. - * - * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost - * during a failure, and that the computation processes elements "exactly once". - * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> - * - * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets - * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view - * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer - * has consumed a topic.</p> - * - * <p>Please refer to Kafka's documentation for the available configuration properties: - * http://kafka.apache.org/documentation.html#newconsumerconfigs</p> - * - * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer - * is constructed. That means that the client that submits the program needs to be able to - * reach the Kafka brokers or ZooKeeper.</p> - */ -public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { - - private static final long serialVersionUID = 2324564345203409112L; - - - // ------------------------------------------------------------------------ - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * @param topic - * The name of the topic that should be consumed. - * @param valueDeserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { - this(Collections.singletonList(topic), valueDeserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value - * pairs, offsets, and topic names from Kafka. - * - * @param topic - * The name of the topic that should be consumed. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { - this(Collections.singletonList(topic), deserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * This constructor allows passing multiple topics to the consumer. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.10.x - * - * This constructor allows passing multiple topics and a key/value deserialization schema. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(topics, deserializer, props); - } - - @Override - protected AbstractFetcher<T, ?> createFetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext) throws Exception { - - boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); - - return new Kafka010Fetcher<>( - sourceContext, - thisSubtaskPartitions, - watermarksPeriodic, - watermarksPunctuated, - runtimeContext.getProcessingTimeService(), - runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), - runtimeContext.getUserCodeClassLoader(), - runtimeContext.isCheckpointingEnabled(), - runtimeContext.getTaskNameWithSubtasks(), - runtimeContext.getMetricGroup(), - deserializer, - properties, - pollTimeout, - useMetrics); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java deleted file mode 100644 index cc0194b..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.util.Properties; - -import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; - - -/** - * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x - * - * Implementation note: This producer is a hybrid between a regular regular sink function (a) - * and a custom operator (b). - * - * For (a), the class implements the SinkFunction and RichFunction interfaces. - * For (b), it extends the StreamTask class. - * - * Details about approach (a): - * - * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the - * DataStream.addSink() method. - * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record - * the Kafka 0.10 producer has a second invocation option, approach (b). - * - * Details about approach (b): - * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the - * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer - * can access the internal record timestamp of the record and write it to Kafka. - * - * All methods and constructors in this class are marked with the approach they are needed for. - */ -public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction { - - /** - * Flag controlling whether we are writing the Flink record's timestamp into Kafka. - */ - private boolean writeTimestampToKafka = false; - - // ---------------------- "Constructors" for timestamp writing ------------------ - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId ID of the Kafka topic. - * @param serializationSchema User defined serialization schema supporting key/value messages - * @param producerConfig Properties with the producer configuration. - */ - public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, - String topicId, - KeyedSerializationSchema<T> serializationSchema, - Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); - } - - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId ID of the Kafka topic. - * @param serializationSchema User defined (keyless) serialization schema. - * @param producerConfig Properties with the producer configuration. - */ - public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, - String topicId, - SerializationSchema<T> serializationSchema, - Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId The name of the target topic - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. - */ - public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, - String topicId, - KeyedSerializationSchema<T> serializationSchema, - Properties producerConfig, - KafkaPartitioner<T> customPartitioner) { - - GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); - FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); - SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); - } - - // ---------------------- Regular constructors w/o timestamp support ------------------ - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - */ - public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) - */ - public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - } - - // ------------------- Key/Value serialization schema constructors ---------------------- - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - */ - public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>()); - } - - /** - * Create Kafka producer - * - * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) - */ - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { - // We create a Kafka 09 producer instance here and only "override" (by intercepting) the - // invoke call. - super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); - } - - - // ----------------------------- Generic element processing --------------------------- - - private void invokeInternal(T next, long elementTimestamp) throws Exception { - - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - - internalProducer.checkErroneous(); - - byte[] serializedKey = internalProducer.schema.serializeKey(next); - byte[] serializedValue = internalProducer.schema.serializeValue(next); - String targetTopic = internalProducer.schema.getTargetTopic(next); - if (targetTopic == null) { - targetTopic = internalProducer.defaultTopicId; - } - - Long timestamp = null; - if(this.writeTimestampToKafka) { - timestamp = elementTimestamp; - } - - ProducerRecord<byte[], byte[]> record; - if (internalProducer.partitioner == null) { - record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); - } else { - record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue); - } - if (internalProducer.flushOnCheckpoint) { - synchronized (internalProducer.pendingRecordsLock) { - internalProducer.pendingRecords++; - } - } - internalProducer.producer.send(record, internalProducer.callback); - } - - - // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ---- - - - // ---- Configuration setters - - /** - * Defines whether the producer should fail on errors, or only log them. - * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to - * fail (and enter recovery). - * - * Method is only accessible for approach (a) (see above) - * - * @param logFailuresOnly The flag to indicate logging-only on exceptions. - */ - public void setLogFailuresOnly(boolean logFailuresOnly) { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.setLogFailuresOnly(logFailuresOnly); - } - - /** - * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers - * to be acknowledged by the Kafka producer on a checkpoint. - * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. - * - * Method is only accessible for approach (a) (see above) - * - * @param flush Flag indicating the flushing mode (true = flush on checkpoint) - */ - public void setFlushOnCheckpoint(boolean flush) { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.setFlushOnCheckpoint(flush); - } - - /** - * This method is used for approach (a) (see above) - * - */ - @Override - public void open(Configuration parameters) throws Exception { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.open(parameters); - } - - /** - * This method is used for approach (a) (see above) - */ - @Override - public IterationRuntimeContext getIterationRuntimeContext() { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - return internalProducer.getIterationRuntimeContext(); - } - - /** - * This method is used for approach (a) (see above) - */ - @Override - public void setRuntimeContext(RuntimeContext t) { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.setRuntimeContext(t); - } - - /** - * Invoke method for using the Sink as DataStream.addSink() sink. - * - * This method is used for approach (a) (see above) - * - * @param value The input record. - */ - @Override - public void invoke(T value) throws Exception { - invokeInternal(value, Long.MAX_VALUE); - } - - - // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ---- - - - /** - * Process method for using the sink with timestamp support. - * - * This method is used for approach (b) (see above) - */ - @Override - public void processElement(StreamRecord<T> element) throws Exception { - invokeInternal(element.getValue(), element.getTimestamp()); - } - - /** - * Configuration object returned by the writeToKafkaWithTimestamps() call. - */ - public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> { - - private final FlinkKafkaProducerBase wrappedProducerBase; - private final FlinkKafkaProducer010 producer; - - private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) { - //noinspection unchecked - super(stream, producer); - this.producer = producer; - this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction; - } - - /** - * Defines whether the producer should fail on errors, or only log them. - * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to - * fail (and enter recovery). - * - * @param logFailuresOnly The flag to indicate logging-only on exceptions. - */ - public void setLogFailuresOnly(boolean logFailuresOnly) { - this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly); - } - - /** - * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers - * to be acknowledged by the Kafka producer on a checkpoint. - * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. - * - * @param flush Flag indicating the flushing mode (true = flush on checkpoint) - */ - public void setFlushOnCheckpoint(boolean flush) { - this.wrappedProducerBase.setFlushOnCheckpoint(flush); - } - - /** - * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. - * Timestamps must be positive for Kafka to accept them. - * - * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. - */ - public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { - this.producer.writeTimestampToKafka = writeTimestampToKafka; - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java deleted file mode 100644 index ddf1ad3..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.10. - */ -public class Kafka010JsonTableSource extends Kafka09JsonTableSource { - - /** - * Creates a Kafka 0.10 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.10 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { - return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java deleted file mode 100644 index 732440b..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.10. - */ -public class Kafka010TableSource extends Kafka09TableSource { - - /** - * Creates a Kafka 0.10 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.10 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka010TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { - return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java deleted file mode 100644 index 71dd29a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.SerializedValue; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; - -import java.util.List; -import java.util.Properties; - -/** - * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API. - * - * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally - * takes the KafkaRecord-attached timestamp and attaches it to the Flink records. - * - * @param <T> The type of elements produced by the fetcher. - */ -public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { - - public Kafka010Fetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - ProcessingTimeService processingTimeProvider, - long autoWatermarkInterval, - ClassLoader userCodeClassLoader, - boolean enableCheckpointing, - String taskNameWithSubtasks, - MetricGroup metricGroup, - KeyedDeserializationSchema<T> deserializer, - Properties kafkaProperties, - long pollTimeout, - boolean useMetrics) throws Exception - { - super( - sourceContext, - assignedPartitions, - watermarksPeriodic, - watermarksPunctuated, - processingTimeProvider, - autoWatermarkInterval, - userCodeClassLoader, - enableCheckpointing, - taskNameWithSubtasks, - metricGroup, - deserializer, - kafkaProperties, - pollTimeout, - useMetrics); - } - - @Override - protected void emitRecord( - T record, - KafkaTopicPartitionState<TopicPartition> partition, - long offset, - ConsumerRecord<?, ?> consumerRecord) throws Exception { - - // we attach the Kafka 0.10 timestamp here - emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); - } - - /** - * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10, - * changing binary signatures - */ - @Override - protected KafkaConsumerCallBridge010 createCallBridge() { - return new KafkaConsumerCallBridge010(); - } - - @Override - protected String getFetcherName() { - return "Kafka 0.10 Fetcher"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java deleted file mode 100644 index a81b098..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; - -import java.util.List; - -/** - * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method. - * - * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10, - * changing {@code assign(List)} to {@code assign(Collection)}. - * - * Because of that, we need two versions whose compiled code goes against different method signatures. - */ -public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { - - @Override - public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception { - consumer.assign(topicPartitions); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties deleted file mode 100644 index 6bdfb48..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java deleted file mode 100644 index 6ee0429..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.core.testutils.MultiShotLatch; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internal.Handover; -import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; -import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.TopicPartition; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; -import static org.powermock.api.mockito.PowerMockito.doAnswer; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -/** - * Unit tests for the {@link Kafka010Fetcher}. - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaConsumerThread.class) -public class Kafka010FetcherTest { - - @Test - public void testCommitDoesNotBlock() throws Exception { - - // test data - final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); - final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); - testCommitData.put(testPartition, 11L); - - // to synchronize when the consumer is in its blocking method - final OneShotLatch sync = new OneShotLatch(); - - // ----- the mock consumer with blocking poll calls ---- - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { - sync.trigger(); - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic assigner */ - null, /* punctuated assigner */ - new TestProcessingTimeService(), - 10, - getClass().getClassLoader(), - false, /* checkpointing */ - "taskname-with-subtask", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // wait until the fetcher has reached the method of interest - sync.await(); - - // ----- trigger the offset commit ----- - - final AtomicReference<Throwable> commitError = new AtomicReference<>(); - final Thread committer = new Thread("committer runner") { - @Override - public void run() { - try { - fetcher.commitInternalOffsetsToKafka(testCommitData); - } catch (Throwable t) { - commitError.set(t); - } - } - }; - committer.start(); - - // ----- ensure that the committer finishes in time ----- - committer.join(30000); - assertFalse("The committer did not finish in time", committer.isAlive()); - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable fetcherError = error.get(); - if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", fetcherError); - } - final Throwable committerError = commitError.get(); - if (committerError != null) { - throw new Exception("Exception in the committer", committerError); - } - } - - @Test - public void ensureOffsetsGetCommitted() throws Exception { - - // test data - final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); - final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); - - final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>(); - testCommitData1.put(testPartition1, 11L); - testCommitData1.put(testPartition2, 18L); - - final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>(); - testCommitData2.put(testPartition1, 19L); - testCommitData2.put(testPartition2, 28L); - - final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>(); - - - // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- - - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - @SuppressWarnings("unchecked") - Map<TopicPartition, OffsetAndMetadata> offsets = - (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0]; - - OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; - - commitStore.add(offsets); - callback.onComplete(offsets, null); - - return null; - } - }).when(mockConsumer).commitAsync( - Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class)); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); - - final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic assigner */ - null, /* punctuated assigner */ - new TestProcessingTimeService(), - 10, - getClass().getClassLoader(), - false, /* checkpointing */ - "taskname-with-subtask", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // ----- trigger the first offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData1); - Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); - - for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(12L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(18L, entry.getValue().offset()); - } - } - - // ----- trigger the second offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData2); - Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); - - for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(20L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(28L, entry.getValue().offset()); - } - } - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable caughtError = error.get(); - if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", caughtError); - } - } - - @Test - public void testCancellationWhenEmitBlocks() throws Exception { - - // ----- some test data ----- - - final String topic = "test-topic"; - final int partition = 3; - final byte[] payload = new byte[] {1, 2, 3, 4}; - - final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( - new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload), - new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload), - new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload)); - - final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); - data.put(new TopicPartition(topic, partition), records); - - final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); - - // ----- the test consumer ----- - - final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { - return consumerRecords; - } - }); - - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- build a fetcher ----- - - BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( - sourceContext, - topics, - null, /* periodic watermark extractor */ - null, /* punctuated watermark extractor */ - new TestProcessingTimeService(), - 10, /* watermark interval */ - this.getClass().getClassLoader(), - true, /* checkpointing */ - "task_name", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // wait until the thread started to emit records to the source context - sourceContext.waitTillHasBlocker(); - - // now we try to cancel the fetcher, including the interruption usually done on the task thread - // once it has finished, there must be no more thread blocked on the source context - fetcher.cancel(); - fetcherRunner.interrupt(); - fetcherRunner.join(); - - assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); - } - - // ------------------------------------------------------------------------ - // test utilities - // ------------------------------------------------------------------------ - - private static final class BlockingSourceContext<T> implements SourceContext<T> { - - private final ReentrantLock lock = new ReentrantLock(); - private final OneShotLatch inBlocking = new OneShotLatch(); - - @Override - public void collect(T element) { - block(); - } - - @Override - public void collectWithTimestamp(T element, long timestamp) { - block(); - } - - @Override - public void emitWatermark(Watermark mark) { - block(); - } - - @Override - public Object getCheckpointLock() { - return new Object(); - } - - @Override - public void close() {} - - public void waitTillHasBlocker() throws InterruptedException { - inBlocking.await(); - } - - public boolean isStillBlocking() { - return lock.isLocked(); - } - - @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"}) - private void block() { - lock.lock(); - try { - inBlocking.trigger(); - - // put this thread to sleep indefinitely - final Object o = new Object(); - while (true) { - synchronized (o) { - o.wait(); - } - } - } - catch (InterruptedException e) { - // exit cleanly, simply reset the interruption flag - Thread.currentThread().interrupt(); - } - finally { - lock.unlock(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java deleted file mode 100644 index 08511c9..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; -import org.junit.Test; - -import javax.annotation.Nullable; -import java.io.ByteArrayInputStream; -import java.io.IOException; - - -public class Kafka010ITCase extends KafkaConsumerTestBase { - - // ------------------------------------------------------------------------ - // Suite of Tests - // ------------------------------------------------------------------------ - - - @Test(timeout = 60000) - public void testFailOnNoBroker() throws Exception { - runFailOnNoBrokerTest(); - } - - @Test(timeout = 60000) - public void testConcurrentProducerConsumerTopology() throws Exception { - runSimpleConcurrentProducerConsumerTopology(); - } - - @Test(timeout = 60000) - public void testKeyValueSupport() throws Exception { - runKeyValueTest(); - } - - // --- canceling / failures --- - - @Test(timeout = 60000) - public void testCancelingEmptyTopic() throws Exception { - runCancelingOnEmptyInputTest(); - } - - @Test(timeout = 60000) - public void testCancelingFullTopic() throws Exception { - runCancelingOnFullInputTest(); - } - - @Test(timeout = 60000) - public void testFailOnDeploy() throws Exception { - runFailOnDeployTest(); - } - - - // --- source to partition mappings and exactly once --- - - @Test(timeout = 60000) - public void testOneToOneSources() throws Exception { - runOneToOneExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testOneSourceMultiplePartitions() throws Exception { - runOneSourceMultiplePartitionsExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testMultipleSourcesOnePartition() throws Exception { - runMultipleSourcesOnePartitionExactlyOnceTest(); - } - - // --- broker failure --- - - @Test(timeout = 60000) - public void testBrokerFailure() throws Exception { - runBrokerFailureTest(); - } - - // --- special executions --- - - @Test(timeout = 60000) - public void testBigRecordJob() throws Exception { - runBigRecordTestTopology(); - } - - @Test(timeout = 60000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); - } - - @Test(timeout = 60000) - public void testAllDeletes() throws Exception { - runAllDeletesTest(); - } - - @Test(timeout = 60000) - public void testMetricsAndEndOfStream() throws Exception { - runEndOfStreamTest(); - } - - // --- offset committing --- - - @Test(timeout = 60000) - public void testCommitOffsetsToKafka() throws Exception { - runCommitOffsetsToKafka(); - } - - @Test(timeout = 60000) - public void testStartFromKafkaCommitOffsets() throws Exception { - runStartFromKafkaCommitOffsets(); - } - - @Test(timeout = 60000) - public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { - runAutoOffsetRetrievalAndCommitToKafka(); - } - - /** - * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka - */ - @Test(timeout = 60000) - public void testTimestamps() throws Exception { - - final String topic = "tstopic"; - createTestTopic(topic, 3, 1); - - // ---------- Produce an event time stream into Kafka ------------------- - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() { - boolean running = true; - - @Override - public void run(SourceContext<Long> ctx) throws Exception { - long i = 0; - while(running) { - ctx.collectWithTimestamp(i, i*2); - if(i++ == 1000L) { - running = false; - } - } - } - - @Override - public void cancel() { - running = false; - } - }); - - final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig()); - FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() { - @Override - public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - return (int)(next % 3); - } - }); - prod.setParallelism(3); - prod.setWriteTimestampToKafka(true); - env.execute("Produce some"); - - // ---------- Consume stream from Kafka ------------------- - - env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setParallelism(1); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - FlinkKafkaConsumer010<Long> kafkaSource = new FlinkKafkaConsumer010<>(topic, new LimitedLongDeserializer(), standardProps); - kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() { - @Nullable - @Override - public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { - if(lastElement % 10 == 0) { - return new Watermark(lastElement); - } - return null; - } - - @Override - public long extractTimestamp(Long element, long previousElementTimestamp) { - return previousElementTimestamp; - } - }); - - DataStream<Long> stream = env.addSource(kafkaSource); - GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); - stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1); - - env.execute("Consume again"); - - deleteTestTopic(topic); - } - - private static class TimestampValidatingOperator extends StreamSink<Long> { - - public TimestampValidatingOperator() { - super(new SinkFunction<Long>() { - @Override - public void invoke(Long value) throws Exception { - throw new RuntimeException("Unexpected"); - } - }); - } - - long elCount = 0; - long wmCount = 0; - long lastWM = Long.MIN_VALUE; - - @Override - public void processElement(StreamRecord<Long> element) throws Exception { - elCount++; - if(element.getValue() * 2 != element.getTimestamp()) { - throw new RuntimeException("Invalid timestamp: " + element); - } - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - wmCount++; - - if(lastWM <= mark.getTimestamp()) { - lastWM = mark.getTimestamp(); - } else { - throw new RuntimeException("Received watermark higher than the last one"); - } - - if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) { - throw new RuntimeException("Invalid watermark: " + mark.getTimestamp()); - } - } - - @Override - public void close() throws Exception { - super.close(); - if(elCount != 1000L) { - throw new RuntimeException("Wrong final element count " + elCount); - } - - if(wmCount <= 2) { - throw new RuntimeException("Almost no watermarks have been sent " + wmCount); - } - } - } - - private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> { - - private final TypeInformation<Long> ti; - private final TypeSerializer<Long> ser; - long cnt = 0; - - public LimitedLongDeserializer() { - this.ti = TypeInfoParser.parse("Long"); - this.ser = ti.createSerializer(new ExecutionConfig()); - } - @Override - public TypeInformation<Long> getProducedType() { - return ti; - } - - @Override - public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - cnt++; - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); - Long e = ser.deserialize(in); - return e; - } - - @Override - public boolean isEndOfStream(Long nextElement) { - return cnt > 1000L; - } - } - -}