[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API
## What changes were proposed in this pull request? New Kafka consumer api for the released 0.10 version of Kafka ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <c...@koeninger.org> Closes #11863 from koeninger/kafka-0.9. (cherry picked from commit dedbceec1ef33ccd88101016de969a1ef3e3e142) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3134f116 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3134f116 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3134f116 Branch: refs/heads/branch-2.0 Commit: 3134f116a3565c3a299fa2e7094acd7304d64280 Parents: a548523 Author: cody koeninger <c...@koeninger.org> Authored: Wed Jun 29 23:21:03 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Jun 29 23:21:15 2016 -0700 ---------------------------------------------------------------------- external/kafka-0-10-assembly/pom.xml | 176 ++++++ external/kafka-0-10/pom.xml | 98 +++ .../kafka010/CachedKafkaConsumer.scala | 189 ++++++ .../streaming/kafka010/ConsumerStrategy.scala | 314 ++++++++++ .../kafka010/DirectKafkaInputDStream.scala | 318 ++++++++++ .../spark/streaming/kafka010/KafkaRDD.scala | 232 +++++++ .../streaming/kafka010/KafkaRDDPartition.scala | 45 ++ .../streaming/kafka010/KafkaTestUtils.scala | 277 +++++++++ .../spark/streaming/kafka010/KafkaUtils.scala | 175 ++++++ .../streaming/kafka010/LocationStrategy.scala | 77 +++ .../spark/streaming/kafka010/OffsetRange.scala | 153 +++++ .../spark/streaming/kafka010/package-info.java | 21 + .../spark/streaming/kafka010/package.scala | 23 + .../kafka010/JavaConsumerStrategySuite.java | 84 +++ .../kafka010/JavaDirectKafkaStreamSuite.java | 180 ++++++ .../streaming/kafka010/JavaKafkaRDDSuite.java | 122 ++++ .../kafka010/JavaLocationStrategySuite.java | 58 ++ .../src/test/resources/log4j.properties | 28 + .../kafka010/DirectKafkaStreamSuite.scala | 612 +++++++++++++++++++ .../streaming/kafka010/KafkaRDDSuite.scala | 169 +++++ pom.xml | 2 + project/SparkBuild.scala | 12 +- 22 files changed, 3359 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml new file mode 100644 index 0000000..f2468d1 --- /dev/null +++ b/external/kafka-0-10-assembly/pom.xml @@ -0,0 +1,176 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka-0-10-assembly_2.11</artifactId> + <packaging>jar</packaging> + <name>Spark Integration for Kafka 0.10 Assembly</name> + <url>http://spark.apache.org/</url> + + <properties> + <sbt.project.name>streaming-kafka-0-10-assembly</sbt.project.name> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!-- + Demote already included in the Spark assembly. + --> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <classifier>${avro.mapred.classifier}</classifier> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <configuration> + <shadedArtifactAttached>false</shadedArtifactAttached> + <artifactSet> + <includes> + <include>*:*</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> + <resource>reference.conf</resource> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resource>log4j.properties</resource> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> +</build> +</project> + http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml new file mode 100644 index 0000000..50395f6 --- /dev/null +++ b/external/kafka-0-10/pom.xml @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> + <properties> + <sbt.project.name>streaming-kafka-0-10</sbt.project.name> + </properties> + <packaging>jar</packaging> + <name>Spark Integration for Kafka 0.10</name> + <url>http://spark.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>0.10.0.0</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + <version>3.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_${scala.binary.version}</artifactId> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala new file mode 100644 index 0000000..fa3ea61 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.{ KafkaException, TopicPartition } + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka010] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), + "groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { + val c = new KafkaConsumer[K, V](kafkaParams) + val tps = new ju.ArrayList[TopicPartition]() + tps.add(topicPartition) + c.assign(tps) + c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { + logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") + if (offset != nextOffset) { + logInfo(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + } + + if (!buffer.hasNext()) { poll(timeout) } + assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + var record = buffer.next() + + if (record.offset != offset) { + logInfo(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, + s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") + } + + nextOffset = offset + 1 + record + } + + private def seek(offset: Long): Unit = { + logDebug(s"Seeking to $topicPartition $offset") + consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { + val p = consumer.poll(timeout) + val r = p.records(topicPartition) + logDebug(s"Polled ${p.partitions()} ${r.size}") + buffer = r.iterator + } + +} + +private[kafka010] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ + def init( + initialCapacity: Int, + maxCapacity: Int, + loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { + if (null == cache) { + logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") + cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]]( + initialCapacity, loadFactor, true) { + override def removeEldestEntry( + entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = { + if (this.size > maxCapacity) { + try { + entry.getValue.consumer.close() + } catch { + case x: KafkaException => + logError("Error closing oldest Kafka consumer", x) + } + true + } else { + false + } + } + } + } + } + + /** + * Get a cached consumer for groupId, assigned to topic and partition. + * If matching consumer doesn't already exist, will be created using kafkaParams. + */ + def get[K, V]( + groupId: String, + topic: String, + partition: Int, + kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = + CachedKafkaConsumer.synchronized { + val k = CacheKey(groupId, topic, partition) + val v = cache.get(k) + if (null == v) { + logInfo(s"Cache miss for $k") + logDebug(cache.keySet.toString) + val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) + cache.put(k, c) + c + } else { + // any given topicpartition should have a consistent key and value type + v.asInstanceOf[CachedKafkaConsumer[K, V]] + } + } + + /** + * Get a fresh new instance, unassociated with the global cache. + * Caller is responsible for closing + */ + def getUncached[K, V]( + groupId: String, + topic: String, + partition: Int, + kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = + new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) + + /** remove consumer for given groupId, topic, and partition, if it exists */ + def remove(groupId: String, topic: String, partition: Int): Unit = { + val k = CacheKey(groupId, topic, partition) + logInfo(s"Removing $k from cache") + val v = CachedKafkaConsumer.synchronized { + cache.remove(k) + } + if (null != v) { + v.close() + logInfo(s"Removed $k from cache") + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala new file mode 100644 index 0000000..079a07d --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{ util => ju } + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.annotation.Experimental + + +/** + * :: Experimental :: + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +trait ConsumerStrategy[K, V] { + /** + * Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * :: Experimental :: + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ +@Experimental +case class Subscribe[K, V] private( + topics: ju.Collection[java.lang.String], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, Long] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { + val consumer = new KafkaConsumer[K, V](kafkaParams) + consumer.subscribe(topics) + if (currentOffsets.isEmpty) { + offsets.asScala.foreach { case (topicPartition, offset) => + consumer.seek(topicPartition, offset) + } + } + + consumer + } +} + +/** + * :: Experimental :: + * Companion object for creating [[Subscribe]] strategy + */ +@Experimental +object Subscribe { + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + def apply[K, V]( + topics: Iterable[java.lang.String], + kafkaParams: collection.Map[String, Object], + offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = { + Subscribe[K, V]( + new ju.ArrayList(topics.asJavaCollection), + new ju.HashMap[String, Object](kafkaParams.asJava), + new ju.HashMap[TopicPartition, Long](offsets.asJava)) + } + + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + @Experimental + def apply[K, V]( + topics: Iterable[java.lang.String], + kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = { + Subscribe[K, V]( + new ju.ArrayList(topics.asJavaCollection), + new ju.HashMap[String, Object](kafkaParams.asJava), + ju.Collections.emptyMap[TopicPartition, Long]()) + } + + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + def create[K, V]( + topics: ju.Collection[java.lang.String], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = { + Subscribe[K, V](topics, kafkaParams, offsets) + } + + /** + * :: Experimental :: + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + @Experimental + def create[K, V]( + topics: ju.Collection[java.lang.String], + kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = { + Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) + } + +} + +/** + * :: Experimental :: + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ +@Experimental +case class Assign[K, V] private( + topicPartitions: ju.Collection[TopicPartition], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, Long] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { + val consumer = new KafkaConsumer[K, V](kafkaParams) + consumer.assign(topicPartitions) + if (currentOffsets.isEmpty) { + offsets.asScala.foreach { case (topicPartition, offset) => + consumer.seek(topicPartition, offset) + } + } + + consumer + } +} + +/** + * :: Experimental :: + * Companion object for creating [[Assign]] strategy + */ +@Experimental +object Assign { + /** + * :: Experimental :: + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + def apply[K, V]( + topicPartitions: Iterable[TopicPartition], + kafkaParams: collection.Map[String, Object], + offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = { + Assign[K, V]( + new ju.ArrayList(topicPartitions.asJavaCollection), + new ju.HashMap[String, Object](kafkaParams.asJava), + new ju.HashMap[TopicPartition, Long](offsets.asJava)) + } + + /** + * :: Experimental :: + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + @Experimental + def apply[K, V]( + topicPartitions: Iterable[TopicPartition], + kafkaParams: collection.Map[String, Object]): Assign[K, V] = { + Assign[K, V]( + new ju.ArrayList(topicPartitions.asJavaCollection), + new ju.HashMap[String, Object](kafkaParams.asJava), + ju.Collections.emptyMap[TopicPartition, Long]()) + } + + /** + * :: Experimental :: + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsets: offsets to begin at on initial startup. If no offset is given for a + * TopicPartition, the committed offset (if applicable) or kafka param + * auto.offset.reset will be used. + */ + @Experimental + def create[K, V]( + topicPartitions: ju.Collection[TopicPartition], + kafkaParams: ju.Map[String, Object], + offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = { + Assign[K, V](topicPartitions, kafkaParams, offsets) + } + + /** + * :: Experimental :: + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to assign + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a> to be used on driver. The same params will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + @Experimental + def create[K, V]( + topicPartitions: ju.Collection[TopicPartition], + kafkaParams: ju.Map[String, Object]): Assign[K, V] = { + Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]()) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala new file mode 100644 index 0000000..acd1841 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A DStream where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param executorKafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> + * configuration parameters</a>. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param consumerStrategy In most cases, pass in [[Subscribe]], + * see [[ConsumerStrategy]] for more details + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +private[spark] class DirectKafkaInputDStream[K, V]( + _ssc: StreamingContext, + locationStrategy: LocationStrategy, + consumerStrategy: ConsumerStrategy[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { + + val executorKafkaParams = { + val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) + KafkaUtils.fixKafkaParams(ekp) + ekp + } + + protected var currentOffsets = Map[TopicPartition, Long]() + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { + if (null == kc) { + kc = consumerStrategy.onStart(currentOffsets) + } + kc + } + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { + logError("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") + super.persist(newLevel) + } + + protected def getBrokers = { + val c = consumer + val result = new ju.HashMap[TopicPartition, String]() + val hosts = new ju.HashMap[TopicPartition, String]() + val assignments = c.assignment().iterator() + while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { + val infos = c.partitionsFor(tp.topic).iterator() + while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) + } + } + result.put(tp, hosts.get(tp)) + } + result + } + + protected def getPreferredHosts: ju.Map[TopicPartition, String] = { + locationStrategy match { + case PreferBrokers => getBrokers + case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]() + case PreferFixed(hostMap) => hostMap + } + } + + // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") + private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]" + + protected[streaming] override val checkpointData = + new DirectKafkaInputDStreamCheckpointData + + + /** + * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. + */ + override protected[streaming] val rateController: Option[RateController] = { + if (RateController.isBackPressureEnabled(ssc.conf)) { + Some(new DirectKafkaRateController(id, + RateEstimator.create(ssc.conf, context.graph.batchDuration))) + } else { + None + } + } + + private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( + "spark.streaming.kafka.maxRatePerPartition", 0) + + protected[streaming] def maxMessagesPerPartition( + offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { + val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) + + // calculate a per-partition rate limit based on current lag + val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match { + case Some(rate) => + val lagPerPartition = offsets.map { case (tp, offset) => + tp -> Math.max(offset - currentOffsets(tp), 0) + } + val totalLag = lagPerPartition.values.sum + + lagPerPartition.map { case (tp, lag) => + val backpressureRate = Math.round(lag / totalLag.toFloat * rate) + tp -> (if (maxRateLimitPerPartition > 0) { + Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) + } + case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition } + } + + if (effectiveRateLimitPerPartition.values.sum > 0) { + val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 + Some(effectiveRateLimitPerPartition.map { + case (tp, limit) => tp -> (secsPerBatch * limit).toLong + }) + } else { + None + } + } + + /** + * Returns the latest (highest) available offsets, taking new partitions into account. + */ + protected def latestOffsets(): Map[TopicPartition, Long] = { + val c = consumer + c.poll(0) + val parts = c.assignment().asScala + + // make sure new partitions are reflected in currentOffsets + val newPartitions = parts.diff(currentOffsets.keySet) + // position for new partitions determined by auto.offset.reset if no commit + currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap + // don't want to consume messages, so pause + c.pause(newPartitions.asJava) + // find latest available offsets + c.seekToEnd(currentOffsets.keySet.asJava) + parts.map(tp => tp -> c.position(tp)).toMap + } + + // limits the maximum number of messages per partition + protected def clamp( + offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { + + maxMessagesPerPartition(offsets).map { mmp => + mmp.map { case (tp, messages) => + val uo = offsets(tp) + tp -> Math.min(currentOffsets(tp) + messages, uo) + } + }.getOrElse(offsets) + } + + override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { + val untilOffsets = clamp(latestOffsets()) + val offsetRanges = untilOffsets.map { case (tp, uo) => + val fo = currentOffsets(tp) + OffsetRange(tp.topic, tp.partition, fo, uo) + } + val rdd = new KafkaRDD[K, V]( + context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true) + + // Report the record number and metadata of this batch interval to InputInfoTracker. + val description = offsetRanges.filter { offsetRange => + // Don't display empty ranges. + offsetRange.fromOffset != offsetRange.untilOffset + }.map { offsetRange => + s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" + }.mkString("\n") + // Copy offsetRanges to immutable.List to prevent from being modified by the user + val metadata = Map( + "offsets" -> offsetRanges.toList, + StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) + val inputInfo = StreamInputInfo(id, rdd.count, metadata) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) + + currentOffsets = untilOffsets + commitAll() + Some(rdd) + } + + override def start(): Unit = { + val c = consumer + c.poll(0) + if (currentOffsets.isEmpty) { + currentOffsets = c.assignment().asScala.map { tp => + tp -> c.position(tp) + }.toMap + } + + // don't actually want to consume any messages, so pause all partitions + c.pause(currentOffsets.keySet.asJava) + } + + override def stop(): Unit = this.synchronized { + if (kc != null) { + kc.close() + } + } + + protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange] + protected val commitCallback = new AtomicReference[OffsetCommitCallback] + + /** + * Queue up offset ranges for commit to Kafka at a future time. Threadsafe. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + */ + def commitAsync(offsetRanges: Array[OffsetRange]): Unit = { + commitAsync(offsetRanges, null) + } + + /** + * Queue up offset ranges for commit to Kafka at a future time. Threadsafe. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + * @param callback Only the most recently provided callback will be used at commit. + */ + def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = { + commitCallback.set(callback) + commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*)) + } + + protected def commitAll(): Unit = { + val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]() + val it = commitQueue.iterator() + while (it.hasNext) { + val osr = it.next + val tp = osr.topicPartition + val x = m.get(tp) + val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) } + m.put(tp, new OffsetAndMetadata(offset)) + } + if (!m.isEmpty) { + consumer.commitAsync(m, commitCallback.get) + } + } + + private[streaming] + class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { + def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { + data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] + } + + override def update(time: Time): Unit = { + batchForTime.clear() + generatedRDDs.foreach { kv => + val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray + batchForTime += kv._1 -> a + } + } + + override def cleanup(time: Time): Unit = { } + + override def restore(): Unit = { + batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => + logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") + generatedRDDs += t -> new KafkaRDD[K, V]( + context.sparkContext, + executorKafkaParams, + b.map(OffsetRange(_)), + getPreferredHosts, + // during restore, it's possible same partition will be consumed from multiple + // threads, so dont use cache + false + ) + } + } + } + + /** + * A RateController to retrieve the rate from RateEstimator. + */ + private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator) + extends RateController(id, estimator) { + override def publish(rate: Long): Unit = () + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala new file mode 100644 index 0000000..c15c163 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a>. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +private[spark] class KafkaRDD[K, V]( + sc: SparkContext, + val kafkaParams: ju.Map[String, Object], + val offsetRanges: Array[OffsetRange], + val preferredHosts: ju.Map[TopicPartition, String], + useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = + conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = + conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = + conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { + logError("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") + super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { + offsetRanges.zipWithIndex.map { case (o, i) => + new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) + }.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def countApprox( + timeout: Long, + confidence: Double = 0.95 + ): PartialResult[BoundedDouble] = { + val c = count + new PartialResult(new BoundedDouble(c, 1.0, c, c), true) + } + + override def isEmpty(): Boolean = count == 0L + + override def take(num: Int): Array[ConsumerRecord[K, V]] = { + val nonEmptyPartitions = this.partitions + .map(_.asInstanceOf[KafkaRDDPartition]) + .filter(_.count > 0) + + if (num < 1 || nonEmptyPartitions.isEmpty) { + return new Array[ConsumerRecord[K, V]](0) + } + + // Determine in advance how many messages need to be taken from each partition + val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => + val remain = num - result.values.sum + if (remain > 0) { + val taken = Math.min(remain, part.count) + result + (part.index -> taken.toInt) + } else { + result + } + } + + val buf = new ArrayBuffer[ConsumerRecord[K, V]] + val res = context.runJob( + this, + (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => + it.take(parts(tc.partitionId)).toArray, parts.keys.toArray + ) + res.foreach(buf ++= _) + buf.toArray + } + + private def executors(): Array[ExecutorCacheTaskLocation] = { + val bm = sparkContext.env.blockManager + bm.master.getPeers(bm.blockManagerId).toArray + .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) + .sortWith(compareExecutors) + } + + protected[kafka010] def compareExecutors( + a: ExecutorCacheTaskLocation, + b: ExecutorCacheTaskLocation): Boolean = + if (a.host == b.host) { + a.executorId > b.executorId + } else { + a.host > b.host + } + + /** + * Non-negative modulus, from java 8 math + */ + private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b + + override def getPreferredLocations(thePart: Partition): Seq[String] = { + // The intention is best-effort consistent executor for a given topicpartition, + // so that caching consumers can be effective. + // TODO what about hosts specified by ip vs name + val part = thePart.asInstanceOf[KafkaRDDPartition] + val allExecs = executors() + val tp = part.topicPartition + val prefHost = preferredHosts.get(tp) + val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost) + val execs = if (prefExecs.isEmpty) allExecs else prefExecs + if (execs.isEmpty) { + Seq() + } else { + // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index + val index = this.floorMod(tp.hashCode, execs.length) + val chosen = execs(index) + Seq(chosen.toString) + } + } + + private def errBeginAfterEnd(part: KafkaRDDPartition): String = + s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " + + s"for topic ${part.topic} partition ${part.partition}. " + + "You either provided an invalid fromOffset, or the Kafka topic has been damaged" + + override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = { + val part = thePart.asInstanceOf[KafkaRDDPartition] + assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) + if (part.fromOffset == part.untilOffset) { + logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + + s"skipping ${part.topic} ${part.partition}") + Iterator.empty + } else { + new KafkaRDDIterator(part, context) + } + } + + /** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ + private class KafkaRDDIterator( + part: KafkaRDDPartition, + context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { + + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + + s"offsets ${part.fromOffset} -> ${part.untilOffset}") + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + context.addTaskCompletionListener{ context => closeIfNeeded() } + + val consumer = if (useConsumerCache) { + CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) + if (context.attemptNumber > 1) { + // just in case the prior attempt failures were cache related + CachedKafkaConsumer.remove(groupId, part.topic, part.partition) + } + CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) + } else { + CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) + } + + var requestOffset = part.fromOffset + + def closeIfNeeded(): Unit = { + if (!useConsumerCache && consumer != null) { + consumer.close + } + } + + override def hasNext(): Boolean = requestOffset < part.untilOffset + + override def next(): ConsumerRecord[K, V] = { + assert(hasNext(), "Can't call getNext() once untilOffset has been reached") + val r = consumer.get(requestOffset, pollTimeout) + requestOffset += 1 + r + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala new file mode 100644 index 0000000..95569b1 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.Partition + + +/** + * @param topic kafka topic name + * @param partition kafka partition id + * @param fromOffset inclusive starting offset + * @param untilOffset exclusive ending offset + */ +private[kafka010] +class KafkaRDDPartition( + val index: Int, + val topic: String, + val partition: Int, + val fromOffset: Long, + val untilOffset: Long +) extends Partition { + /** Number of messages this partition refers to */ + def count(): Long = untilOffset - fromOffset + + /** Kafka TopicPartition object, for convenience */ + def topicPartition(): TopicPartition = new TopicPartition(topic, partition) + +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala new file mode 100644 index 0000000..13c0843 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.io.File +import java.lang.{Integer => JInt} +import java.net.InetSocketAddress +import java.util.{Map => JMap, Properties} +import java.util.concurrent.TimeoutException + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.language.postfixOps +import scala.util.control.NonFatal + +import kafka.admin.AdminUtils +import kafka.api.Request +import kafka.producer.{KeyedMessage, Producer, ProducerConfig} +import kafka.serializer.StringEncoder +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.ZkUtils +import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.streaming.Time +import org.apache.spark.util.Utils + +/** + * This is a helper class for Kafka test suites. This has the functionality to set up + * and tear down local Kafka servers, and to push data using Kafka producers. + * + * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. + */ +private[kafka010] class KafkaTestUtils extends Logging { + + // Zookeeper related configurations + private val zkHost = "localhost" + private var zkPort: Int = 0 + private val zkConnectionTimeout = 60000 + private val zkSessionTimeout = 6000 + + private var zookeeper: EmbeddedZookeeper = _ + + private var zkUtils: ZkUtils = _ + + // Kafka broker related configurations + private val brokerHost = "localhost" + private var brokerPort = 9092 + private var brokerConf: KafkaConfig = _ + + // Kafka broker server + private var server: KafkaServer = _ + + // Kafka producer + private var producer: Producer[String, String] = _ + + // Flag to test whether the system is correctly started + private var zkReady = false + private var brokerReady = false + + def zkAddress: String = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") + s"$zkHost:$zkPort" + } + + def brokerAddress: String = { + assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") + s"$brokerHost:$brokerPort" + } + + def zookeeperClient: ZkUtils = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") + Option(zkUtils).getOrElse( + throw new IllegalStateException("Zookeeper client is not yet initialized")) + } + + // Set up the Embedded Zookeeper server and get the proper Zookeeper port + private def setupEmbeddedZookeeper(): Unit = { + // Zookeeper server startup + zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") + // Get the actual zookeeper binding port + zkPort = zookeeper.actualPort + zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) + zkReady = true + } + + // Set up the Embedded Kafka server + private def setupEmbeddedKafkaServer(): Unit = { + assert(zkReady, "Zookeeper should be set up beforehand") + + // Kafka broker startup + Utils.startServiceOnPort(brokerPort, port => { + brokerPort = port + brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) + server = new KafkaServer(brokerConf) + server.startup() + (server, port) + }, new SparkConf(), "KafkaBroker") + + brokerReady = true + } + + /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ + def setup(): Unit = { + setupEmbeddedZookeeper() + setupEmbeddedKafkaServer() + } + + /** Teardown the whole servers, including Kafka broker and Zookeeper */ + def teardown(): Unit = { + brokerReady = false + zkReady = false + + if (producer != null) { + producer.close() + producer = null + } + + if (server != null) { + server.shutdown() + server = null + } + + brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + + if (zkUtils != null) { + zkUtils.close() + zkUtils = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null + } + } + + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String, partitions: Int): Unit = { + AdminUtils.createTopic(zkUtils, topic, partitions, 1) + // wait until metadata is propagated + (0 until partitions).foreach { p => + waitUntilMetadataIsPropagated(topic, p) + } + } + + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String): Unit = { + createTopic(topic, 1) + } + + /** Java-friendly function for sending messages to the Kafka broker */ + def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { + sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) + } + + /** Send the messages to the Kafka broker */ + def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = { + val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray + sendMessages(topic, messages) + } + + /** Send the array of messages to the Kafka broker */ + def sendMessages(topic: String, messages: Array[String]): Unit = { + producer = new Producer[String, String](new ProducerConfig(producerConfiguration)) + producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*) + producer.close() + producer = null + } + + private def brokerConfiguration: Properties = { + val props = new Properties() + props.put("broker.id", "0") + props.put("host.name", "localhost") + props.put("port", brokerPort.toString) + props.put("log.dir", Utils.createTempDir().getAbsolutePath) + props.put("zookeeper.connect", zkAddress) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props + } + + private def producerConfiguration: Properties = { + val props = new Properties() + props.put("metadata.broker.list", brokerAddress) + props.put("serializer.class", classOf[StringEncoder].getName) + // wait for all in-sync replicas to ack sends + props.put("request.required.acks", "-1") + props + } + + // A simplified version of scalatest eventually, rewritten here to avoid adding extra test + // dependency + def eventually[T](timeout: Time, interval: Time)(func: => T): T = { + def makeAttempt(): Either[Throwable, T] = { + try { + Right(func) + } catch { + case e if NonFatal(e) => Left(e) + } + } + + val startTime = System.currentTimeMillis() + @tailrec + def tryAgain(attempt: Int): T = { + makeAttempt() match { + case Right(result) => result + case Left(e) => + val duration = System.currentTimeMillis() - startTime + if (duration < timeout.milliseconds) { + Thread.sleep(interval.milliseconds) + } else { + throw new TimeoutException(e.getMessage) + } + + tryAgain(attempt + 1) + } + } + + tryAgain(1) + } + + private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { + case Some(partitionState) => + val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + + zkUtils.getLeaderForPartition(topic, partition).isDefined && + Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && + leaderAndInSyncReplicas.isr.size >= 1 + + case _ => + false + } + eventually(Time(10000), Time(100)) { + assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") + } + } + + private class EmbeddedZookeeper(val zkConnect: String) { + val snapshotDir = Utils.createTempDir() + val logDir = Utils.createTempDir() + + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val (ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + val actualPort = factory.getLocalPort + + def shutdown() { + factory.shutdown() + Utils.deleteRecursively(snapshotDir) + Utils.deleteRecursively(logDir) + } + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala new file mode 100644 index 0000000..c052499 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext } +import org.apache.spark.api.java.function.{ Function0 => JFunction0 } +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext } +import org.apache.spark.streaming.dstream._ + +/** + * :: Experimental :: + * Companion object for constructing Kafka streams and RDDs + */ +@Experimental +object KafkaUtils extends Logging { + /** + * :: Experimental :: + * Scala constructor for a batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a>. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ + @Experimental + def createRDD[K, V]( + sc: SparkContext, + kafkaParams: ju.Map[String, Object], + offsetRanges: Array[OffsetRange], + locationStrategy: LocationStrategy + ): RDD[ConsumerRecord[K, V]] = { + val preferredHosts = locationStrategy match { + case PreferBrokers => + throw new AssertionError( + "If you want to prefer brokers, you must provide a mapping using PreferFixed " + + "A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.") + case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]() + case PreferFixed(hostMap) => hostMap + } + val kp = new ju.HashMap[String, Object](kafkaParams) + fixKafkaParams(kp) + val osr = offsetRanges.clone() + + new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true) + } + + /** + * :: Experimental :: + * Java constructor for a batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param kafkaParams Kafka + * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs"> + * configuration parameters</a>. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ + @Experimental + def createRDD[K, V]( + jsc: JavaSparkContext, + kafkaParams: ju.Map[String, Object], + offsetRanges: Array[OffsetRange], + locationStrategy: LocationStrategy + ): JavaRDD[ConsumerRecord[K, V]] = { + + new JavaRDD(createRDD[K, V](jsc.sc, kafkaParams, offsetRanges, locationStrategy)) + } + + /** + * :: Experimental :: + * Scala constructor for a DStream where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param consumerStrategy In most cases, pass in [[Subscribe]], + * see [[ConsumerStrategy]] for more details + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ + @Experimental + def createDirectStream[K, V]( + ssc: StreamingContext, + locationStrategy: LocationStrategy, + consumerStrategy: ConsumerStrategy[K, V] + ): InputDStream[ConsumerRecord[K, V]] = { + new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy) + } + + /** + * :: Experimental :: + * Java constructor for a DStream where + * each given Kafka topic/partition corresponds to an RDD partition. + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param consumerStrategy In most cases, pass in [[Subscribe]], + * see [[ConsumerStrategy]] for more details + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ + @Experimental + def createDirectStream[K, V]( + jssc: JavaStreamingContext, + locationStrategy: LocationStrategy, + consumerStrategy: ConsumerStrategy[K, V] + ): JavaInputDStream[ConsumerRecord[K, V]] = { + new JavaInputDStream( + createDirectStream[K, V]( + jssc.ssc, locationStrategy, consumerStrategy)) + } + + /** + * Tweak kafka params to prevent issues on executors + */ + private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = { + logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor") + kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean) + + logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor") + kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") + + // driver and executor should be in different consumer groups + val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) + logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") + kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) + + // possible workaround for KAFKA-3135 + val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG) + if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) { + logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135") + kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala new file mode 100644 index 0000000..df62030 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{ util => ju } + +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.annotation.Experimental + + +/** + * :: Experimental :: + * Choice of how to schedule consumers for a given TopicPartition on an executor. + * Kafka 0.10 consumers prefetch messages, so it's important for performance + * to keep cached consumers on appropriate executors, not recreate them for every partition. + * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. + */ +@Experimental +sealed trait LocationStrategy + +/** + * :: Experimental :: + * Use this only if your executors are on the same nodes as your Kafka brokers. + */ +@Experimental +case object PreferBrokers extends LocationStrategy { + def create: PreferBrokers.type = this +} + +/** + * :: Experimental :: + * Use this in most cases, it will consistently distribute partitions across all executors. + */ +@Experimental +case object PreferConsistent extends LocationStrategy { + def create: PreferConsistent.type = this +} + +/** + * :: Experimental :: + * Use this to place particular TopicPartitions on particular hosts if your load is uneven. + * Any TopicPartition not specified in the map will use a consistent location. + */ +@Experimental +case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy + +/** + * :: Experimental :: + * Use this to place particular TopicPartitions on particular hosts if your load is uneven. + * Any TopicPartition not specified in the map will use a consistent location. + */ +@Experimental +object PreferFixed { + def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = { + PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) + } + def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed = + PreferFixed(hostMap) +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala new file mode 100644 index 0000000..c66d3c9 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import org.apache.kafka.clients.consumer.OffsetCommitCallback +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.annotation.Experimental + +/** + * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** + * :: Experimental :: + * Represents any object that can commit a collection of [[OffsetRange]]s. + * The direct Kafka DStream implements this interface (see + * [[KafkaUtils.createDirectStream]]). + * {{{ + * val stream = KafkaUtils.createDirectStream(...) + * ... + * stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() { + * def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) { + * if (null != e) { + * // error + * } else { + * // success + * } + * } + * }) + * }}} + */ +@Experimental +trait CanCommitOffsets { + /** + * :: Experimental :: + * Queue up offset ranges for commit to Kafka at a future time. Threadsafe. + * This is only needed if you intend to store offsets in Kafka, instead of your own store. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + */ + @Experimental + def commitAsync(offsetRanges: Array[OffsetRange]): Unit + + /** + * :: Experimental :: + * Queue up offset ranges for commit to Kafka at a future time. Threadsafe. + * This is only needed if you intend to store offsets in Kafka, instead of your own store. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + * @param callback Only the most recently provided callback will be used at commit. + */ + @Experimental + def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit +} + +/** + * Represents a range of offsets from a single Kafka TopicPartition. Instances of this class + * can be created with `OffsetRange.create()`. + * @param topic Kafka topic name + * @param partition Kafka partition id + * @param fromOffset Inclusive starting offset + * @param untilOffset Exclusive ending offset + */ +final class OffsetRange private( + val topic: String, + val partition: Int, + val fromOffset: Long, + val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** Kafka TopicPartition object, for convenience */ + def topicPartition(): TopicPartition = new TopicPartition(topic, partition) + + /** Number of messages this OffsetRange refers to */ + def count(): Long = untilOffset - fromOffset + + override def equals(obj: Any): Boolean = obj match { + case that: OffsetRange => + this.topic == that.topic && + this.partition == that.partition && + this.fromOffset == that.fromOffset && + this.untilOffset == that.untilOffset + case _ => false + } + + override def hashCode(): Int = { + toTuple.hashCode() + } + + override def toString(): String = { + s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])" + } + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[streaming] + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) +} + +/** + * Companion object the provides methods to create instances of [[OffsetRange]]. + */ +object OffsetRange { + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset) + + def create( + topicPartition: TopicPartition, + fromOffset: Long, + untilOffset: Long): OffsetRange = + new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset) + + def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset) + + def apply( + topicPartition: TopicPartition, + fromOffset: Long, + untilOffset: Long): OffsetRange = + new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset) + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[kafka010] + type OffsetRangeTuple = (String, Int, Long, Long) + + private[kafka010] + def apply(t: OffsetRangeTuple) = + new OffsetRange(t._1, t._2, t._3, t._4) +} http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java new file mode 100644 index 0000000..ebfcf87 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Spark Integration for Kafka 0.10 + */ +package org.apache.spark.streaming.kafka010; http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala new file mode 100644 index 0000000..2bfc1e8 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +/** + * Spark Integration for Kafka 0.10 + */ +package object kafka --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org