[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656443#comment-16656443 ]
ASF GitHub Bot commented on FLINK-7964: --------------------------------------- yanghua closed pull request #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index b4416e8a209..73a331e6194 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -193,7 +193,7 @@ /** * User defined properties for the Producer. */ - private final Properties producerConfig; + protected final Properties producerConfig; /** * The name of the default topic this producer is writing data to. @@ -239,7 +239,7 @@ /** * Semantic chosen for this instance. */ - private Semantic semantic; + protected Semantic semantic; // -------------------------------- Runtime fields ------------------------------------------ @@ -893,6 +893,10 @@ protected void finishRecoveringContext() { LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds); } + protected FlinkKafkaProducer createProducer() { + return new FlinkKafkaProducer<>(this.producerConfig); + } + /** * After initialization make sure that all previous transactions from the current user context have been completed. */ @@ -958,7 +962,7 @@ private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> pro } private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) { - FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig); + FlinkKafkaProducer<byte[], byte[]> producer = createProducer(); RuntimeContext ctx = getRuntimeContext(); diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java index 8faff38749f..2f47bf15a62 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java @@ -106,10 +106,10 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class); - private final KafkaProducer<K, V> kafkaProducer; + protected final KafkaProducer<K, V> kafkaProducer; @Nullable - private final String transactionalId; + protected final String transactionalId; public FlinkKafkaProducer(Properties properties) { transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); @@ -257,7 +257,7 @@ private TransactionalRequestResult enqueueNewPartitions() { } } - private static Enum<?> getEnum(String enumFullName) { + protected static Enum<?> getEnum(String enumFullName) { String[] x = enumFullName.split("\\.(?=[^\\.]+$)"); if (x.length == 2) { String enumClassName = x[0]; @@ -272,7 +272,7 @@ private TransactionalRequestResult enqueueNewPartitions() { return null; } - private static Object invoke(Object object, String methodName, Object... args) { + protected static Object invoke(Object object, String methodName, Object... args) { Class<?>[] argTypes = new Class[args.length]; for (int i = 0; i < args.length; i++) { argTypes[i] = args[i].getClass(); @@ -290,7 +290,7 @@ private static Object invoke(Object object, String methodName, Class<?>[] argTyp } } - private static Object getValue(Object object, String fieldName) { + protected static Object getValue(Object object, String fieldName) { return getValue(object, object.getClass(), fieldName); } @@ -304,7 +304,7 @@ private static Object getValue(Object object, Class<?> clazz, String fieldName) } } - private static void setValue(Object object, String fieldName, Object value) { + protected static void setValue(Object object, String fieldName, Object value) { try { Field field = object.getClass().getDeclaredField(fieldName); field.setAccessible(true); diff --git a/flink-connectors/flink-connector-kafka-1.0/pom.xml b/flink-connectors/flink-connector-kafka-1.0/pom.xml new file mode 100644 index 00000000000..b82dfefcf38 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/pom.xml @@ -0,0 +1,315 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.7-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-1.0_${scala.binary.version}</artifactId> + <name>flink-connector-kafka-1.0</name> + + <packaging>jar</packaging> + + <properties> + <kafka.version>1.0.0</kafka.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- streaming-java dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Add Kafka 1.0.x as a dependency --> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <!-- Projects depending on this project, won't depend on flink-avro. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${project.version}</version> + <!-- Projects depending on this project, won't depend on flink-json. --> + <optional>true</optional> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude Kafka dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude Kafka dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <!-- include 1.0 server for tests --> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <profiles> + <!-- Create SQL Client uber jars by default --> + <profile> + <id>sql-jars</id> + <activation> + <property> + <name>!skipSqlJars</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>sql-jar</shadedClassifierName> + <artifactSet> + <includes combine.children="append"> + <include>org.apache.kafka:*</include> + <include>org.apache.flink:flink-connector-kafka-base_${scala.binary.version}</include> + <include>org.apache.flink:flink-connector-kafka-0.9_${scala.binary.version}</include> + <include>org.apache.flink:flink-connector-kafka-0.10_${scala.binary.version}</include> + <include>org.apache.flink:flink-connector-kafka-0.11_${scala.binary.version}</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>kafka/kafka-version.properties</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>org.apache.kafka</pattern> + <shadedPattern>org.apache.flink.kafka10.shaded.org.apache.kafka</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-test-sources</id> + <goals> + <goal>test-jar-no-fork</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + <argLine>-Xms256m -Xmx2048m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java new file mode 100644 index 00000000000..e45b97846c7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer10.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; + +import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 1.0.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + */ +public class FlinkKafkaConsumer10<T> extends FlinkKafkaConsumer011<T> { + + /** + * Creates a new Kafka streaming source consumer for Kafka 1.0.x. + * + * @param topic The name of the topic that should be consumed. + * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer10(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + super(topic, valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 1.0.x + * + * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic The name of the topic that should be consumed. + * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer10(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(topic, deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 1.0.x + * + * <p>This constructor allows passing multiple topics to the consumer. + * + * @param topics The Kafka topics to read from. + * @param deserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer10(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + super(topics, deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 1.0.x + * + * <p>This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics The Kafka topics to read from. + * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer10(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(topics, deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 1.0.x. Use this constructor to + * subscribe to multiple topics based on a regular expression pattern. + * + * <p>If partition discovery is enabled (by setting a non-negative value for + * {@link FlinkKafkaConsumer10#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics + * with names matching the pattern will also be subscribed to as they are created on the fly. + * + * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to. + * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer10(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) { + super(subscriptionPattern, valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 1.0.x. Use this constructor to + * subscribe to multiple topics based on a regular expression pattern. + * + * <p>If partition discovery is enabled (by setting a non-negative value for + * {@link FlinkKafkaConsumer10#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics + * with names matching the pattern will also be subscribed to as they are created on the fly. + * + * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to. + * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + */ + public FlinkKafkaConsumer10(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(subscriptionPattern, deserializer, props); + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java new file mode 100644 index 00000000000..de7e0b7b7c4 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafka10Producer; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.Properties; + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 1.0.x. By default producer + * will use {@link FlinkKafkaProducer10.Semantic#AT_LEAST_ONCE} semantic. + * Before using {@link FlinkKafkaProducer10.Semantic#EXACTLY_ONCE} please refer to Flink's + * Kafka connector documentation. + */ +public class FlinkKafkaProducer10<IN> extends FlinkKafkaProducer011<IN> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer10.class); + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList Comma separated addresses of the brokers + * @param topicId ID of the Kafka topic. + * @param serializationSchema + */ + public FlinkKafkaProducer10(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + super(brokerList, topicId, serializationSchema); + } + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>Using this constructor, the default + * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} will be used as + * the partitioner. This default partitioner maps each sink subtask to a single Kafka + * partition (i.e. all records received by a sink subtask will end up in the same + * Kafka partition). + * + * <p>To use a custom partitioner, please use + * {@link #FlinkKafkaProducer10(String, SerializationSchema, Properties, Optional)} instead. + * + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined key-less serialization schema. + * @param producerConfig + */ + public FlinkKafkaProducer10(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + super(topicId, serializationSchema, producerConfig); + } + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces its input to + * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. + * + * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an + * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka + * partitions in a round-robin fashion. + * + * @param topicId The topic to write data to + * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * If a partitioner is not provided, records will be distributed to Kafka partitions + */ + public FlinkKafkaProducer10( + String topicId, + SerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>Using this constructor, the default + * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} will be used as + * the partitioner. This default partitioner maps each sink subtask to a single Kafka + * partition (i.e. all records received by a sink subtask will end up in the same + * Kafka partition). + * + * <p>To use a custom partitioner, please use + * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema, Properties, Optional)} instead. + * + * @param brokerList Comma separated addresses of the brokers + * @param topicId ID of the Kafka topic. + * @param serializationSchema + */ + public FlinkKafkaProducer10( + String brokerList, + String topicId, + KeyedSerializationSchema<IN> serializationSchema) { + super(brokerList, topicId, serializationSchema); + } + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>Using this constructor, the default + * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} will be used as + * the partitioner. This default partitioner maps each sink subtask to a single Kafka + * partition (i.e. all records received by a sink subtask will end up in the same + * Kafka partition). + * + * <p>To use a custom partitioner, please use + * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema, Properties, Optional)} instead. + * + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig + */ + public FlinkKafkaProducer10( + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig) { + super(topicId, serializationSchema, producerConfig); + } + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>Using this constructor, the default + * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner} will be used as + * the partitioner. This default partitioner maps each sink subtask to a single Kafka + * partition (i.e. all records received by a sink subtask will end up in the same + * Kafka partition). + * + * <p>To use a custom partitioner, please use + * {@link #FlinkKafkaProducer10(String, KeyedSerializationSchema, Properties, Optional, Semantic, int)} instead. + * + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + */ + public FlinkKafkaProducer10( + String topicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Semantic semantic) { + super(topicId, serializationSchema, producerConfig, semantic); + } + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces its input to + * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. + * + * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each + * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not + * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they + * will be distributed to Kafka partitions in a round-robin fashion. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * If a partitioner is not provided, records will be partitioned by the key of each record + * (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys + * are {@code null}, then records will be distributed to Kafka partitions in a + */ + public FlinkKafkaProducer10( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { + super(defaultTopicId, serializationSchema, producerConfig, customPartitioner); + } + + /** + * Creates a FlinkKafka10Producer for a given topic. The sink produces its input to + * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. + * + * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each + * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not + * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they + * will be distributed to Kafka partitions in a round-robin fashion. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * If a partitioner is not provided, records will be partitioned by the key of each record + * (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys + * are {@code null}, then records will be distributed to Kafka partitions in a + * round-robin fashion. + * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}). + * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}). + */ + public FlinkKafkaProducer10( + String defaultTopicId, + KeyedSerializationSchema<IN> serializationSchema, + Properties producerConfig, + Optional<FlinkKafkaPartitioner<IN>> customPartitioner, + Semantic semantic, + int kafkaProducersPoolSize) { + super(defaultTopicId, serializationSchema, producerConfig, customPartitioner, semantic, kafkaProducersPoolSize); + } + + @Override + protected FlinkKafka10Producer createProducer() { + return new FlinkKafka10Producer(this.producerConfig); + } + +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java new file mode 100644 index 00000000000..4f61b33bba1 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSink.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +import java.util.Optional; +import java.util.Properties; + +/** + * Kafka 1.0 table sink for writing data into Kafka. + */ +@Internal +public class Kafka10TableSink extends KafkaTableSink { + + public Kafka10TableSink( + TableSchema schema, + String topic, + Properties properties, + Optional<FlinkKafkaPartitioner<Row>> partitioner, + SerializationSchema<Row> serializationSchema) { + + super(schema, topic, properties, partitioner, serializationSchema); + } + + @Override + protected SinkFunction<Row> createKafkaProducer( + String topic, + Properties properties, + SerializationSchema<Row> serializationSchema, + Optional<FlinkKafkaPartitioner<Row>> partitioner) { + return new FlinkKafkaProducer10<>( + topic, + new KeyedSerializationSchemaWrapper<>(serializationSchema), + properties, + partitioner); + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java new file mode 100644 index 00000000000..2421ac764a5 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSource.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 1.0. + */ +@Internal +public class Kafka10TableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka {@link StreamTableSource}. + * + * @param schema Schema of the produced table. + * @param proctimeAttribute Field name of the processing time attribute. + * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute + * @param fieldMapping Mapping for the fields of the table schema to + * fields of the physical returned type. + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema for decoding records from Kafka. + * @param startupMode Startup mode for the contained consumer. + * @param specificStartupOffsets Specific startup offsets; only relevant when startup + * mode is {@link StartupMode#SPECIFIC_OFFSETS}. + */ + public Kafka10TableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Optional<Map<String, String>> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + super( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + fieldMapping, + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets); + } + + /** + * Creates a generic Kafka {@link StreamTableSource}. + * + * @param schema Schema of the produced table. + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema for decoding records from Kafka. + */ + public Kafka10TableSource( + TableSchema schema, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema) { + + super(schema, topic, properties, deserializationSchema); + } + + @Override + protected FlinkKafkaConsumerBase<Row> createKafkaConsumer( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema) { + + return new FlinkKafkaConsumer10<Row>(topic, deserializationSchema, properties); + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java new file mode 100644 index 00000000000..5bfe479ccbe --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Factory for creating configured instances of {@link Kafka10TableSource}. + */ +public class Kafka10TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase { + + @Override + protected String kafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_10; + } + + @Override + protected boolean supportsKafkaTimestamps() { + return true; + } + + @Override + protected KafkaTableSource createKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka10TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets); + } + + @Override + protected KafkaTableSink createKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + Optional<FlinkKafkaPartitioner<Row>> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka10TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema); + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java new file mode 100644 index 00000000000..5eca36e57a4 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafka10Producer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Inner flink kafka producer. + */ +@PublicEvolving +public class FlinkKafka10Producer<K, V> extends FlinkKafkaProducer<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafka10Producer.class); + + public FlinkKafka10Producer(Properties properties) { + super(properties); + } + + /** + * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones, + * so that we can resume transaction after a restart. Implementation of this method is based on + * {@link KafkaProducer#initTransactions}. + * https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630 + */ + public void resumeTransaction(long producerId, short epoch) { + Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch); + LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch); + + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + synchronized (transactionManager) { + Object nextSequence = getValue(transactionManager, "nextSequence"); + + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); + invoke(nextSequence, "clear"); + + Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); + setValue(producerIdAndEpoch, "producerId", producerId); + setValue(producerIdAndEpoch, "epoch", epoch); + + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY")); + + invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION")); + setValue(transactionManager, "transactionStarted", true); + } + } + +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 00000000000..b8951b9c3ed --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.streaming.connectors.kafka.Kafka10TableSourceSinkFactory diff --git a/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties new file mode 100644 index 00000000000..6eef1747ddf --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/main/resources/log4j.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java new file mode 100644 index 00000000000..cd137f7c7bc --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka10ProducerITCase.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafka10Producer; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for our own {@link FlinkKafka10Producer}. + */ +@SuppressWarnings("serial") +public class FlinkKafka10ProducerITCase extends KafkaTestBase { + protected String transactionalId; + protected Properties extraProperties; + + @Before + public void before() { + transactionalId = UUID.randomUUID().toString(); + extraProperties = new Properties(); + extraProperties.putAll(standardProps); + extraProperties.put("transactional.id", transactionalId); + extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("isolation.level", "read_committed"); + } + + @Test(timeout = 30000L) + public void testHappyPath() throws IOException { + String topicName = "flink-kafka-producer-happy-path"; + try (Producer<String, String> kafkaProducer = new FlinkKafka10Producer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.commitTransaction(); + } + assertRecord(topicName, "42", "42"); + deleteTestTopic(topicName); + } + + @Test(timeout = 30000L) + public void testResumeTransaction() throws IOException { + String topicName = "flink-kafka-producer-resume-transaction"; + try (FlinkKafka10Producer<String, String> kafkaProducer = new FlinkKafka10Producer<>(extraProperties)) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); + kafkaProducer.flush(); + long producerId = kafkaProducer.getProducerId(); + short epoch = kafkaProducer.getEpoch(); + + try (FlinkKafka10Producer<String, String> resumeProducer = new FlinkKafka10Producer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + + assertRecord(topicName, "42", "42"); + + // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction + kafkaProducer.commitTransaction(); + + // this shouldn't fail also, for same reason as above + try (FlinkKafka10Producer<String, String> resumeProducer = new FlinkKafka10Producer<>(extraProperties)) { + resumeProducer.resumeTransaction(producerId, epoch); + resumeProducer.commitTransaction(); + } + } + deleteTestTopic(topicName); + } + + private void assertRecord(String topicName, String expectedKey, String expectedValue) { + try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) { + kafkaConsumer.subscribe(Collections.singletonList(topicName)); + ConsumerRecords<String, String> records = kafkaConsumer.poll(10000); + + ConsumerRecord<String, String> record = Iterables.getOnlyElement(records); + assertEquals(expectedKey, record.key()); + assertEquals(expectedValue, record.value()); + } + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java new file mode 100644 index 00000000000..b178890773f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10ITCase.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; + +import kafka.server.KafkaServer; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.Preconditions.checkState; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * IT cases for the {@link FlinkKafkaProducer10}. + */ +public class FlinkKafkaProducer10ITCase extends KafkaTestBase { + + protected String transactionalId; + protected Properties extraProperties; + + protected TypeInformationSerializationSchema<Integer> integerSerializationSchema = + new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema = + new KeyedSerializationSchemaWrapper<>(integerSerializationSchema); + + @Before + public void before() { + transactionalId = UUID.randomUUID().toString(); + extraProperties = new Properties(); + extraProperties.putAll(standardProps); + extraProperties.put("transactional.id", transactionalId); + extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + extraProperties.put("isolation.level", "read_committed"); + } + + @Test + public void resourceCleanUpNone() throws Exception { + resourceCleanUp(FlinkKafkaProducer10.Semantic.NONE); + } + + @Test + public void resourceCleanUpAtLeastOnce() throws Exception { + resourceCleanUp(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE); + } + + /** + * This tests checks whether there is some resource leak in form of growing threads number. + */ + public void resourceCleanUp(FlinkKafkaProducer10.Semantic semantic) throws Exception { + String topic = "flink-kafka-producer-resource-cleanup-" + semantic; + + final int allowedEpsilonThreadCountGrow = 50; + + Optional<Integer> initialActiveThreads = Optional.empty(); + for (int i = 0; i < allowedEpsilonThreadCountGrow * 2; i++) { + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = + createTestHarness(topic, 1, 1, 0, semantic)) { + testHarness1.setup(); + testHarness1.open(); + } + + if (initialActiveThreads.isPresent()) { + assertThat("active threads count", + Thread.activeCount(), + lessThan(initialActiveThreads.get() + allowedEpsilonThreadCountGrow)); + } + else { + initialActiveThreads = Optional.of(Thread.activeCount()); + } + } + } + + /** + * This test ensures that transactions reusing transactional.ids (after returning to the pool) will not clash + * with previous transactions using same transactional.ids. + */ + @Test + public void testRestoreToCheckpointAfterExceedingProducersPool() throws Exception { + String topic = "flink-kafka-producer-fail-before-notify"; + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = createTestHarness(topic)) { + testHarness1.setup(); + testHarness1.open(); + testHarness1.processElement(42, 0); + OperatorSubtaskState snapshot = testHarness1.snapshot(0, 0); + testHarness1.processElement(43, 0); + testHarness1.notifyOfCompletedCheckpoint(0); + try { + for (int i = 0; i < FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE; i++) { + testHarness1.snapshot(i + 1, 0); + testHarness1.processElement(i, 0); + } + throw new IllegalStateException("This should not be reached."); + } + catch (Exception ex) { + if (!isCausedBy(FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY, ex)) { + throw ex; + } + } + + // Resume transactions before testHarness1 is being closed (in case of failures close() might not be called) + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) { + testHarness2.setup(); + // restore from snapshot1, transactions with records 43 and 44 should be aborted + testHarness2.initializeState(snapshot); + testHarness2.open(); + } + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L); + deleteTestTopic(topic); + } + catch (Exception ex) { + // testHarness1 will be fenced off after creating and closing testHarness2 + if (!findThrowable(ex, ProducerFencedException.class).isPresent()) { + throw ex; + } + } + } + + @Test + public void testFlinkKafkaProducer10FailBeforeNotify() throws Exception { + String topic = "flink-kafka-producer-fail-before-notify"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + OperatorSubtaskState snapshot = testHarness.snapshot(1, 3); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + failBroker(leaderId); + + try { + testHarness.processElement(44, 4); + testHarness.snapshot(2, 5); + assertFalse(true); + } + catch (Exception ex) { + // expected + } + try { + testHarness.close(); + } + catch (Exception ex) { + } + + kafkaServer.restartBroker(leaderId); + + testHarness = createTestHarness(topic); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.close(); + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + + deleteTestTopic(topic); + } + + @Test + public void testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify() throws Exception { + String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify"; + + Properties properties = createProperties(); + + FlinkKafkaProducer10<Integer> kafkaProducer = new FlinkKafkaProducer10<>( + topic, + integerKeyedSerializationSchema, + properties, + FlinkKafkaProducer10.Semantic.EXACTLY_ONCE); + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + IntSerializer.INSTANCE); + + testHarness1.setup(); + testHarness1.open(); + testHarness1.processElement(42, 0); + testHarness1.snapshot(0, 1); + testHarness1.processElement(43, 2); + int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId(); + OperatorSubtaskState snapshot = testHarness1.snapshot(1, 3); + + failBroker(transactionCoordinatorId); + + try { + testHarness1.processElement(44, 4); + testHarness1.notifyOfCompletedCheckpoint(1); + testHarness1.close(); + } + catch (Exception ex) { + // Expected... some random exception could be thrown by any of the above operations. + } + finally { + kafkaServer.restartBroker(transactionCoordinatorId); + } + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) { + testHarness2.setup(); + testHarness2.initializeState(snapshot); + testHarness2.open(); + } + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + + deleteTestTopic(topic); + } + + /** + * This tests checks whether FlinkKafkaProducer10 correctly aborts lingering transactions after a failure. + * If such transactions were left alone lingering it consumers would be unable to read committed records + * that were created after this lingering transaction. + */ + @Test + public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception { + String topic = "flink-kafka-producer-fail-before-notify"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3); + + testHarness.processElement(44, 4); + testHarness.snapshot(2, 5); + testHarness.processElement(45, 6); + + // do not close previous testHarness to make sure that closing do not clean up something (in case of failure + // there might not be any close) + testHarness = createTestHarness(topic); + testHarness.setup(); + // restore from snapshot1, transactions with records 44 and 45 should be aborted + testHarness.initializeState(snapshot1); + testHarness.open(); + + // write and commit more records, after potentially lingering transactions + testHarness.processElement(46, 7); + testHarness.snapshot(4, 8); + testHarness.processElement(47, 9); + testHarness.notifyOfCompletedCheckpoint(4); + + //now we should have: + // - records 42 and 43 in committed transactions + // - aborted transactions with records 44 and 45 + // - committed transaction with record 46 + // - pending transaction with record 47 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L); + + testHarness.close(); + deleteTestTopic(topic); + } + + @Test + public void testFailAndRecoverSameCheckpointTwice() throws Exception { + String topic = "flink-kafka-producer-fail-and-recover-same-checkpoint-twice"; + + OperatorSubtaskState snapshot1; + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + testHarness.setup(); + testHarness.open(); + testHarness.processElement(42, 0); + testHarness.snapshot(0, 1); + testHarness.processElement(43, 2); + snapshot1 = testHarness.snapshot(1, 3); + + testHarness.processElement(44, 4); + } + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + testHarness.setup(); + // restore from snapshot1, transactions with records 44 and 45 should be aborted + testHarness.initializeState(snapshot1); + testHarness.open(); + + // write and commit more records, after potentially lingering transactions + testHarness.processElement(44, 7); + testHarness.snapshot(2, 8); + testHarness.processElement(45, 9); + } + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + testHarness.setup(); + // restore from snapshot1, transactions with records 44 and 45 should be aborted + testHarness.initializeState(snapshot1); + testHarness.open(); + + // write and commit more records, after potentially lingering transactions + testHarness.processElement(44, 7); + testHarness.snapshot(3, 8); + testHarness.processElement(45, 9); + } + + //now we should have: + // - records 42 and 43 in committed transactions + // - aborted transactions with records 44 and 45 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L); + deleteTestTopic(topic); + } + + /** + * This tests checks whether FlinkKafkaProducer10 correctly aborts lingering transactions after a failure, + * which happened before first checkpoint and was followed up by reducing the parallelism. + * If such transactions were left alone lingering it consumers would be unable to read committed records + * that were created after this lingering transaction. + */ + @Test + public void testScaleDownBeforeFirstCheckpoint() throws Exception { + String topic = "scale-down-before-first-checkpoint"; + + List<AutoCloseable> operatorsToClose = new ArrayList<>(); + int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer10.SAFE_SCALE_DOWN_FACTOR); + for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) { + OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness( + topic, + preScaleDownParallelism, + preScaleDownParallelism, + subtaskIndex, + FlinkKafkaProducer10.Semantic.EXACTLY_ONCE); + + preScaleDownOperator.setup(); + preScaleDownOperator.open(); + preScaleDownOperator.processElement(subtaskIndex * 2, 0); + preScaleDownOperator.snapshot(0, 1); + preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2); + + operatorsToClose.add(preScaleDownOperator); + } + + // do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure + // there might not be any close) + + // After previous failure simulate restarting application with smaller parallelism + OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0, FlinkKafkaProducer10.Semantic.EXACTLY_ONCE); + + postScaleDownOperator1.setup(); + postScaleDownOperator1.open(); + + // write and commit more records, after potentially lingering transactions + postScaleDownOperator1.processElement(46, 7); + postScaleDownOperator1.snapshot(4, 8); + postScaleDownOperator1.processElement(47, 9); + postScaleDownOperator1.notifyOfCompletedCheckpoint(4); + + //now we should have: + // - records 42, 43, 44 and 45 in aborted transactions + // - committed transaction with record 46 + // - pending transaction with record 47 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L); + + postScaleDownOperator1.close(); + // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids. + for (AutoCloseable operatorToClose : operatorsToClose) { + closeIgnoringProducerFenced(operatorToClose); + } + deleteTestTopic(topic); + } + + /** + * Each instance of FlinkKafkaProducer10 uses it's own pool of transactional ids. After the restore from checkpoint + * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids + * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence + * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up + * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4: + * [1], [2], [3], [4] - one assigned per each subtask + * we scale down to parallelism 2: + * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4 + * surplus ids are dropped from the pools and we scale up to parallelism 3: + * [1 or 2], [3 or 4], [???] + * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate + * new ones that are greater then 4. + */ + @Test + public void testScaleUpAfterScalingDown() throws Exception { + String topic = "scale-down-before-first-checkpoint"; + + final int parallelism1 = 4; + final int parallelism2 = 2; + final int parallelism3 = 3; + final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3)); + + List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute( + topic, + Collections.emptyList(), + parallelism1, + maxParallelism, + IntStream.range(0, parallelism1).boxed().iterator()); + + operatorSubtaskState = repartitionAndExecute( + topic, + operatorSubtaskState, + parallelism2, + maxParallelism, + IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator()); + + operatorSubtaskState = repartitionAndExecute( + topic, + operatorSubtaskState, + parallelism3, + maxParallelism, + IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator()); + + // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would + // not allow us to read all committed messages from the topic. Thus we initialize operators from + // OperatorSubtaskState once more, but without any new data. This should terminate all ongoing transactions. + + operatorSubtaskState = repartitionAndExecute( + topic, + operatorSubtaskState, + 1, + maxParallelism, + Collections.emptyIterator()); + + assertExactlyOnceForTopic( + createProperties(), + topic, + 0, + IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()), + 30_000L); + deleteTestTopic(topic); + } + + private List<OperatorStateHandle> repartitionAndExecute( + String topic, + List<OperatorStateHandle> inputStates, + int parallelism, + int maxParallelism, + Iterator<Integer> inputData) throws Exception { + + List<OperatorStateHandle> outputStates = new ArrayList<>(); + List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>(); + + for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) { + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = + createTestHarness(topic, maxParallelism, parallelism, subtaskIndex, FlinkKafkaProducer10.Semantic.EXACTLY_ONCE); + testHarnesses.add(testHarness); + + testHarness.setup(); + + testHarness.initializeState(new OperatorSubtaskState( + new StateObjectCollection<>(inputStates), + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.empty())); + testHarness.open(); + + if (inputData.hasNext()) { + int nextValue = inputData.next(); + testHarness.processElement(nextValue, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); + + outputStates.addAll(snapshot.getManagedOperatorState()); + checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator state"); + checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed state"); + checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state"); + + for (int i = 1; i < FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) { + testHarness.processElement(-nextValue, 0); + testHarness.snapshot(i, 0); + } + } + } + + for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) { + testHarness.close(); + } + + return outputStates; + } + + @Test + public void testRecoverCommittedTransaction() throws Exception { + String topic = "flink-kafka-producer-recover-committed-transaction"; + + OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic); + + testHarness.setup(); + testHarness.open(); // producerA - start transaction (txn) 0 + testHarness.processElement(42, 0); // producerA - write 42 in txn 0 + OperatorSubtaskState checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1 + testHarness.processElement(43, 2); // producerB - write 43 in txn 1 + testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool + testHarness.snapshot(1, 3); // producerB - pre txn 1, producerA - start txn 2 + testHarness.processElement(44, 4); // producerA - write 44 in txn 2 + testHarness.close(); // producerA - abort txn 2 + + testHarness = createTestHarness(topic); + testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0 + testHarness.close(); + + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L); + + deleteTestTopic(topic); + } + + @Test + public void testRunOutOfProducersInThePool() throws Exception { + String topic = "flink-kafka-run-out-of-producers"; + + try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { + + testHarness.setup(); + testHarness.open(); + + for (int i = 0; i < FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) { + testHarness.processElement(i, i * 2); + testHarness.snapshot(i, i * 2 + 1); + } + } + catch (Exception ex) { + if (!ex.getCause().getMessage().startsWith("Too many ongoing")) { + throw ex; + } + } + deleteTestTopic(topic); + } + + // shut down a Kafka broker + private void failBroker(int brokerId) { + KafkaServer toShutDown = null; + for (KafkaServer server : kafkaServer.getBrokers()) { + + if (kafkaServer.getBrokerId(server) == brokerId) { + toShutDown = server; + break; + } + } + + if (toShutDown == null) { + StringBuilder listOfBrokers = new StringBuilder(); + for (KafkaServer server : kafkaServer.getBrokers()) { + listOfBrokers.append(kafkaServer.getBrokerId(server)); + listOfBrokers.append(" ; "); + } + + throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId + + " ; available brokers: " + listOfBrokers.toString()); + } else { + toShutDown.shutdown(); + toShutDown.awaitShutdown(); + } + } + + private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception { + try { + autoCloseable.close(); + } + catch (Exception ex) { + if (!(ex.getCause() instanceof ProducerFencedException)) { + throw ex; + } + } + } + + private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception { + return createTestHarness(topic, 1, 1, 0, FlinkKafkaProducer10.Semantic.EXACTLY_ONCE); + } + + private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness( + String topic, + int maxParallelism, + int parallelism, + int subtaskIndex, + FlinkKafkaProducer10.Semantic semantic) throws Exception { + Properties properties = createProperties(); + + FlinkKafkaProducer10<Integer> kafkaProducer = new FlinkKafkaProducer10<>( + topic, + integerKeyedSerializationSchema, + properties, + semantic); + + return new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + maxParallelism, + parallelism, + subtaskIndex, + IntSerializer.INSTANCE, + new OperatorID(42, 44)); + } + + private Properties createProperties() { + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + properties.put(FlinkKafkaProducer10.KEY_DISABLE_METRICS, "true"); + return properties; + } + + private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) { + Optional<FlinkKafka011Exception> cause = findThrowable(ex, FlinkKafka011Exception.class); + if (cause.isPresent()) { + return cause.get().getErrorCode().equals(expectedErrorCode); + } + return false; + } + +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java new file mode 100644 index 00000000000..47cc2ea972e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer10StateSerializerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.TransactionHolder; + +import java.util.Collections; +import java.util.Optional; + +/** + * A test for the {@link TypeSerializer TypeSerializers} used for the Kafka producer state. + */ +public class FlinkKafkaProducer10StateSerializerTest + extends SerializerTestBase< + TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>> { + + @Override + protected TypeSerializer< + TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>> createSerializer() { + return new TwoPhaseCommitSinkFunction.StateSerializer<>( + new FlinkKafkaProducer10.TransactionStateSerializer(), + new FlinkKafkaProducer10.ContextStateSerializer()); + } + + @Override + protected Class<TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>> getTypeClass() { + return (Class) TwoPhaseCommitSinkFunction.State.class; + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>[] getTestData() { + //noinspection unchecked + return new TwoPhaseCommitSinkFunction.State[] { + new TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>( + new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0), + Collections.emptyList(), + Optional.empty()), + new TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>( + new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 2711), + Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 42)), + Optional.empty()), + new TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>( + new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0), + Collections.emptyList(), + Optional.of(new FlinkKafkaProducer10.KafkaTransactionContext(Collections.emptySet()))), + new TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>( + new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0), + Collections.emptyList(), + Optional.of(new FlinkKafkaProducer10.KafkaTransactionContext(Collections.singleton("hello")))), + new TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>( + new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0), + Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0)), + Optional.of(new FlinkKafkaProducer10.KafkaTransactionContext(Collections.emptySet()))), + new TwoPhaseCommitSinkFunction.State< + FlinkKafkaProducer10.KafkaTransactionState, + FlinkKafkaProducer10.KafkaTransactionContext>( + new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0), + Collections.singletonList(new TransactionHolder(new FlinkKafkaProducer10.KafkaTransactionState("fake", 1L, (short) 42, null), 0)), + Optional.of(new FlinkKafkaProducer10.KafkaTransactionContext(Collections.singleton("hello")))) + }; + } + + @Override + public void testInstantiate() { + // this serializer does not support instantiation + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java new file mode 100644 index 00000000000..d5521728061 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; + +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Optional; + +/** + * IT cases for Kafka 1.0 . + */ +public class Kafka10ITCase extends KafkaConsumerTestBase { + + @BeforeClass + public static void prepare() throws ClassNotFoundException { + KafkaProducerTestBase.prepare(); + ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE); + } + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout = 60000) + public void testMetricsAndEndOfStream() throws Exception { + runEndOfStreamTest(); + } + + // --- startup mode --- + + @Test(timeout = 60000) + public void testStartFromEarliestOffsets() throws Exception { + runStartFromEarliestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromLatestOffsets() throws Exception { + runStartFromLatestOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromGroupOffsets() throws Exception { + runStartFromGroupOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } + + @Test(timeout = 60000) + public void testStartFromTimestamp() throws Exception { + runStartFromTimestamp(); + } + + // --- offset committing --- + + @Test(timeout = 60000) + public void testCommitOffsetsToKafka() throws Exception { + runCommitOffsetsToKafka(); + } + + @Test(timeout = 60000) + public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); + } + + /** + * Kafka 10 specific test, ensuring Timestamps are properly written to and read from Kafka. + */ + @Test(timeout = 60000) + public void testTimestamps() throws Exception { + + final String topic = "tstopic"; + createTestTopic(topic, 3, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() { + private static final long serialVersionUID = -2255115836471289626L; + boolean running = true; + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + long i = 0; + while (running) { + ctx.collectWithTimestamp(i, i * 2); + if (i++ == 1110L) { + running = false; + } + } + } + + @Override + public void cancel() { + running = false; + } + }); + + final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(Types.LONG, env.getConfig()); + FlinkKafkaProducer10<Long> prod = new FlinkKafkaProducer10<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() { + private static final long serialVersionUID = -6730989584364230617L; + + @Override + public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return (int) (next % 3); + } + })); + prod.setWriteTimestampToKafka(true); + + streamWithTimestamps.addSink(prod).setParallelism(3); + + env.execute("Produce some"); + + // ---------- Consume stream from Kafka ------------------- + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer10<Long> kafkaSource = new FlinkKafkaConsumer10<>(topic, new Kafka10ITCase.LimitedLongDeserializer(), standardProps); + kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() { + private static final long serialVersionUID = -4834111173247835189L; + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { + if (lastElement % 11 == 0) { + return new Watermark(lastElement); + } + return null; + } + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return previousElementTimestamp; + } + }); + + DataStream<Long> stream = env.addSource(kafkaSource); + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1); + + env.execute("Consume again"); + + deleteTestTopic(topic); + } + + private static class TimestampValidatingOperator extends StreamSink<Long> { + + private static final long serialVersionUID = 1353168781235526806L; + + public TimestampValidatingOperator() { + super(new SinkFunction<Long>() { + private static final long serialVersionUID = -6676565693361786524L; + + @Override + public void invoke(Long value) throws Exception { + throw new RuntimeException("Unexpected"); + } + }); + } + + long elCount = 0; + long wmCount = 0; + long lastWM = Long.MIN_VALUE; + + @Override + public void processElement(StreamRecord<Long> element) throws Exception { + elCount++; + if (element.getValue() * 2 != element.getTimestamp()) { + throw new RuntimeException("Invalid timestamp: " + element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + wmCount++; + + if (lastWM <= mark.getTimestamp()) { + lastWM = mark.getTimestamp(); + } else { + throw new RuntimeException("Received watermark higher than the last one"); + } + + if (mark.getTimestamp() % 11 != 0 && mark.getTimestamp() != Long.MAX_VALUE) { + throw new RuntimeException("Invalid watermark: " + mark.getTimestamp()); + } + } + + @Override + public void close() throws Exception { + super.close(); + if (elCount != 1110L) { + throw new RuntimeException("Wrong final element count " + elCount); + } + + if (wmCount <= 2) { + throw new RuntimeException("Almost no watermarks have been sent " + wmCount); + } + } + } + + private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> { + + private static final long serialVersionUID = 6966177118923713521L; + private final TypeInformation<Long> ti; + private final TypeSerializer<Long> ser; + long cnt = 0; + + public LimitedLongDeserializer() { + this.ti = Types.LONG; + this.ser = ti.createSerializer(new ExecutionConfig()); + } + + @Override + public TypeInformation<Long> getProducedType() { + return ti; + } + + @Override + public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + cnt++; + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Long e = ser.deserialize(in); + return e; + } + + @Override + public boolean isEndOfStream(Long nextElement) { + return cnt > 1110L; + } + } + +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java new file mode 100644 index 00000000000..b064d9fa6b6 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerAtLeastOnceITCase.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.BeforeClass; + +/** + * IT cases for the {@link FlinkKafkaProducer10}. + */ +@SuppressWarnings("serial") +public class Kafka10ProducerAtLeastOnceITCase extends KafkaProducerTestBase { + + @BeforeClass + public static void prepare() throws ClassNotFoundException { + KafkaProducerTestBase.prepare(); + ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.AT_LEAST_ONCE); + } + + @Override + public void testExactlyOnceRegularSink() throws Exception { + // disable test for at least once semantic + } + + @Override + public void testExactlyOnceCustomOperator() throws Exception { + // disable test for at least once semantic + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java new file mode 100644 index 00000000000..8365e25e6f4 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ProducerExactlyOnceITCase.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * IT cases for the {@link FlinkKafkaProducer10}. + */ +@SuppressWarnings("serial") +public class Kafka10ProducerExactlyOnceITCase extends KafkaProducerTestBase { + @BeforeClass + public static void prepare() throws ClassNotFoundException { + KafkaProducerTestBase.prepare(); + ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer10.Semantic.EXACTLY_ONCE); + } + + @Override + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + // TODO: fix this test + // currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call. + // Somehow Kafka 10 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka + // that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design + // and this test should be reimplemented in completely different way... + } + + @Override + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + // TODO: fix this test + // currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call. + // Somehow Kafka 10 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka + // that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design + // and this test should be reimplemented in completely different way... + } + + @Test + public void testMultipleSinkOperators() throws Exception { + testExactlyOnce(false, 2); + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java new file mode 100644 index 00000000000..e557e519570 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10TableSourceSinkFactoryTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.types.Row; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Test for {@link Kafka10TableSource} and {@link Kafka10TableSink} created + * by {@link Kafka10TableSourceSinkFactory}. + */ +public class Kafka10TableSourceSinkFactoryTest extends KafkaTableSourceSinkFactoryTestBase { + + @Override + protected String getKafkaVersion() { + return KafkaValidator.CONNECTOR_VERSION_VALUE_10; + } + + @Override + protected Class<FlinkKafkaConsumerBase<Row>> getExpectedFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer10.class; + } + + @Override + protected Class<?> getExpectedFlinkKafkaProducer() { + return FlinkKafkaProducer10.class; + } + + @Override + protected KafkaTableSource getExpectedKafkaTableSource( + TableSchema schema, + Optional<String> proctimeAttribute, + List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors, + Map<String, String> fieldMapping, + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { + + return new Kafka10TableSource( + schema, + proctimeAttribute, + rowtimeAttributeDescriptors, + Optional.of(fieldMapping), + topic, + properties, + deserializationSchema, + startupMode, + specificStartupOffsets); + } + + @Override + protected KafkaTableSink getExpectedKafkaTableSink( + TableSchema schema, + String topic, + Properties properties, + Optional<FlinkKafkaPartitioner<Row>> partitioner, + SerializationSchema<Row> serializationSchema) { + + return new Kafka10TableSink( + schema, + topic, + properties, + partitioner, + serializationSchema); + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 00000000000..02d5d0a3a81 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.networking.NetworkFailuresProxy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; + +import kafka.common.KafkaException; +import kafka.metrics.KafkaMetricsReporter; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.apache.commons.collections.list.UnmodifiableList; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.BindException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; + +import scala.collection.mutable.ArraySeq; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 1.0 . + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private FlinkKafkaProducer10.Semantic producerSemantic = FlinkKafkaProducer10.Semantic.EXACTLY_ONCE; + // 6 seconds is default. Seems to be too small for travis. 30 seconds + private int zkTimeout = 30000; + private Config config; + + public void setProducerSemantic(FlinkKafkaProducer10.Semantic producerSemantic) { + this.producerSemantic = producerSemantic; + } + + @Override + public void prepare(Config config) { + //increase the timeout since in Travis ZK connection takes long time for secure connection. + if (config.isSecureMode()) { + //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout + config.setKafkaServersNumber(1); + zkTimeout = zkTimeout * 15; + } + this.config = config; + + File tempDir = new File(System.getProperty("java.io.tmpdir")); + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(config.getKafkaServersNumber()); + + ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { + KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i)); + brokers.add(kafkaServer); + brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName)); + brokerConnectionString += ","; + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("enable.auto.commit", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 1.0 value) + standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + } + + @Override + public void deleteTestTopic(String topic) { + LOG.info("Deleting topic {}", topic); + try (AdminClient adminClient = AdminClient.create(getStandardProperties())) { + adminClient.deleteTopics(Collections.singleton(topic)).all().get(); + } catch (Exception e) { + e.printStackTrace(); + fail("Delete test topic : " + topic + " failed, " + e.getMessage()); + } + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties) { + LOG.info("Creating topic {}", topic); + try (AdminClient adminClient = AdminClient.create(getStandardProperties())) { + NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short) replicationFactor); + adminClient.createTopics(Collections.singleton(topicObj)).all().get(); + } catch (Exception e) { + e.printStackTrace(); + fail("Create test topic : " + topic + " failed, " + e.getMessage()); + } + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public Properties getSecureProperties() { + Properties prop = new Properties(); + if (config.isSecureMode()) { + prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + prop.put("security.protocol", "SASL_PLAINTEXT"); + prop.put("sasl.kerberos.service.name", "kafka"); + + //add special timeout for Travis + prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout)); + prop.setProperty("metadata.fetch.timeout.ms", "120000"); + } + return prop; + } + + @Override + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + @Override + public String getVersion() { + return "1.0"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer10<T>(topics, readSchema, props); + } + + @Override + public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) { + List<ConsumerRecord<K, V>> result = new ArrayList<>(); + + try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) { + consumer.assign(Arrays.asList(new TopicPartition(topic, partition))); + + while (true) { + boolean processedAtLeastOneRecord = false; + + // wait for new records with timeout and break the loop if we didn't get any + Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator(); + while (iterator.hasNext()) { + ConsumerRecord<K, V> record = iterator.next(); + result.add(record); + processedAtLeastOneRecord = true; + } + + if (!processedAtLeastOneRecord) { + break; + } + } + consumer.commitSync(); + } + + return UnmodifiableList.decorate(result); + } + + @Override + public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) { + return new StreamSink<>(new FlinkKafkaProducer10<T>( + topic, + serSchema, + props, + Optional.ofNullable(partitioner), + producerSemantic, + FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)); + } + + @Override + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) { + return stream.addSink(new FlinkKafkaProducer10<T>( + topic, + serSchema, + props, + Optional.ofNullable(partitioner), + producerSemantic, + FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)); + } + + @Override + public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) { + FlinkKafkaProducer10<T> prod = new FlinkKafkaProducer10<T>( + topic, + serSchema, + props, + Optional.of(new FlinkFixedPartitioner<>()), + producerSemantic, + FlinkKafkaProducer10.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + + prod.setWriteTimestampToKafka(true); + + return stream.addSink(prod); + } + + @Override + public KafkaOffsetHandler createOffsetHandler() { + return new KafkaOffsetHandlerImpl(); + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + AdminClient client = AdminClient.create(getStandardProperties()); + TopicDescription result = client.describeTopics(Collections.singleton(topic)).all().get().get(topic); + return result.partitions().get(0).leader().id(); + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.config().brokerId(); + } + + @Override + public boolean isSecureRunSupported() { + return true; + } + + @Override + public void shutdown() throws Exception { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + + if (zookeeper != null) { + try { + zookeeper.stop(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", KAFKA_HOST); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("transaction.max.timeout.ms", Integer.toString(1000 * 60 * 60 * 2)); // 2hours + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); + kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); + } + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + + if (config.isHideKafkaBehindProxy()) { + NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort); + kafkaProperties.put("advertised.port", proxy.getLocalPort()); + } + + //to support secure kafka cluster + if (config.isSecureMode()) { + LOG.info("Adding Kafka secure configurations"); + kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); + kafkaProperties.putAll(getSecureProperties()); + } + + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + scala.Option<String> stringNone = scala.Option.apply(null); + KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0)); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final KafkaConsumer<byte[], byte[]> offsetClient; + + public KafkaOffsetHandlerImpl() { + Properties props = new Properties(); + props.putAll(standardProps); + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + offsetClient = new KafkaConsumer<>(props); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); + return (committed != null) ? committed.offset() : null; + } + + @Override + public void setCommittedOffset(String topicName, int partition, long offset) { + Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>(); + partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset)); + offsetClient.commitSync(partitionAndOffset); + } + + @Override + public void close() { + offsetClient.close(); + } + } +} diff --git a/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties new file mode 100644 index 00000000000..fbeb110350f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-1.0/src/test/resources/log4j-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java index cad37f8f8cd..ca308d13ea9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java @@ -36,6 +36,7 @@ public static final String CONNECTOR_VERSION_VALUE_09 = "0.9"; public static final String CONNECTOR_VERSION_VALUE_010 = "0.10"; public static final String CONNECTOR_VERSION_VALUE_011 = "0.11"; + public static final String CONNECTOR_VERSION_VALUE_10 = "1.0"; public static final String CONNECTOR_TOPIC = "connector.topic"; public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode"; public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; @@ -73,7 +74,8 @@ private void validateVersion(DescriptorProperties properties) { CONNECTOR_VERSION_VALUE_08, CONNECTOR_VERSION_VALUE_09, CONNECTOR_VERSION_VALUE_010, - CONNECTOR_VERSION_VALUE_011); + CONNECTOR_VERSION_VALUE_011, + CONNECTOR_VERSION_VALUE_10); properties.validateEnumValues(CONNECTOR_VERSION(), false, versions); properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 7d88f0d94ea..c0df351c625 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -184,7 +184,10 @@ public void runFailOnNoBrokerTest() throws Exception { stream.print(); see.execute("No broker test"); } catch (JobExecutionException jee) { - if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10") || kafkaServer.getVersion().equals("0.11")) { + if (kafkaServer.getVersion().equals("0.9") || + kafkaServer.getVersion().equals("0.10") || + kafkaServer.getVersion().equals("0.11") || + kafkaServer.getVersion().equals("1.0")) { assertTrue(jee.getCause() instanceof TimeoutException); TimeoutException te = (TimeoutException) jee.getCause(); diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index cacea91578e..87a90683988 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -56,6 +56,7 @@ under the License. <module>flink-connector-nifi</module> <module>flink-connector-cassandra</module> <module>flink-connector-filesystem</module> + <module>flink-connector-kafka-1.0</module> </modules> <!-- override these root dependencies as 'provided', so they don't end up diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala index 211d374de58..8b69e750a9c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala @@ -46,7 +46,7 @@ object ConnectorDescriptorValidator { /** * Key for describing the version of the connector. This property can be used for different - * connector versions (e.g. Kafka 0.8 or Kafka 0.11). + * connector versions (e.g. Kafka 0.8 or Kafka 1.0). */ val CONNECTOR_VERSION = "connector.version" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > ----------------------------------- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-7964 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: Hai Zhou > Assignee: vinoyang > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project > Management Committee has packed a number of valuable enhancements into the > release. Here is a summary of a few of them: > * Since its introduction in version 0.10, the Streams API has become hugely > popular among Kafka users, including the likes of Pinterest, Rabobank, > Zalando, and The New York Times. In 1.0, the the API continues to evolve at a > healthy pace. To begin with, the builder API has been improved (KIP-120). A > new API has been added to expose the state of active tasks at runtime > (KIP-130). The new cogroup API makes it much easier to deal with partitioned > aggregates with fewer StateStores and fewer moving parts in your code > (KIP-150). Debuggability gets easier with enhancements to the print() and > writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 > and KIP-161 too. For more on streams, check out the Apache Kafka Streams > documentation, including some helpful new tutorial videos. > * Operating Kafka at scale requires that the system remain observable, and to > make that easier, we’ve made a number of improvements to metrics. These are > too many to summarize without becoming tedious, but Connect metrics have been > significantly improved (KIP-196), a litany of new health check metrics are > now exposed (KIP-188), and we now have a global topic and partition count > (KIP-168). Check out KIP-164 and KIP-187 for even more. > * We now support Java 9, leading, among other things, to significantly faster > TLS and CRC32C implementations. Over-the-wire encryption will be faster now, > which will keep Kafka fast and compute costs low when encryption is enabled. > * In keeping with the security theme, KIP-152 cleans up the error handling on > Simple Authentication Security Layer (SASL) authentication attempts. > Previously, some authentication error conditions were indistinguishable from > broker failures and were not logged in a clear way. This is cleaner now. > * Kafka can now tolerate disk failures better. Historically, JBOD storage > configurations have not been recommended, but the architecture has > nevertheless been tempting: after all, why not rely on Kafka’s own > replication mechanism to protect against storage failure rather than using > RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single > disk failure in a JBOD broker will not bring the entire broker down; rather, > the broker will continue serving any log files that remain on functioning > disks. > * Since release 0.11.0, the idempotent producer (which is the producer used > in the presence of a transaction, which of course is the producer we use for > exactly-once processing) required max.in.flight.requests.per.connection to be > equal to one. As anyone who has written or tested a wire protocol can attest, > this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be > as large as five, relaxing the throughput constraint quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)