Repository: spark Updated Branches: refs/heads/branch-2.0 1a4be51d6 -> 6fb1f735f
[SPARK-17813][SQL][KAFKA] Maximum data per trigger ## What changes were proposed in this pull request? maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions. ## How was this patch tested? Added unit test Author: cody koeninger <c...@koeninger.org> Closes #15527 from koeninger/SPARK-17813. (cherry picked from commit 104232580528c097a284d753adb5795f6de8b0a5) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fb1f735 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fb1f735 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fb1f735 Branch: refs/heads/branch-2.0 Commit: 6fb1f735f7206edd06a5aa0a0919da24e7f6dc4e Parents: 1a4be51 Author: cody koeninger <c...@koeninger.org> Authored: Thu Oct 27 10:30:59 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Oct 27 10:31:10 2016 -0700 ---------------------------------------------------------------------- docs/structured-streaming-kafka-integration.md | 6 ++ .../apache/spark/sql/kafka010/KafkaSource.scala | 107 ++++++++++++++----- .../spark/sql/kafka010/KafkaSourceSuite.scala | 71 +++++++++++- 3 files changed, 157 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6fb1f735/docs/structured-streaming-kafka-integration.md ---------------------------------------------------------------------- diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index e851f21..a6c3b3a 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -221,6 +221,12 @@ The following configurations are optional: <td>10</td> <td>milliseconds to wait before retrying to fetch Kafka offsets</td> </tr> +<tr> + <td>maxOffsetsPerTrigger</td> + <td>long</td> + <td>none</td> + <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td> +</tr> </table> Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, http://git-wip-us.apache.org/repos/asf/spark/blob/6fb1f735/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 537b7b0..61cba73 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -96,6 +96,9 @@ private[kafka010] case class KafkaSource( private val offsetFetchAttemptIntervalMs = sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong + private val maxOffsetsPerTrigger = + sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + /** * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the * offsets and never commits them. @@ -121,6 +124,8 @@ private[kafka010] case class KafkaSource( }.partitionToOffsets } + private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None + override def schema: StructType = KafkaSource.kafkaSchema /** Returns the maximum available offset for this source. */ @@ -128,9 +133,54 @@ private[kafka010] case class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - val offset = KafkaSourceOffset(fetchLatestOffsets()) - logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}") - Some(offset) + val latest = fetchLatestOffsets() + val offsets = maxOffsetsPerTrigger match { + case None => + latest + case Some(limit) if currentPartitionOffsets.isEmpty => + rateLimit(limit, initialPartitionOffsets, latest) + case Some(limit) => + rateLimit(limit, currentPartitionOffsets.get, latest) + } + + currentPartitionOffsets = Some(offsets) + logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") + Some(KafkaSourceOffset(offsets)) + } + + /** Proportionally distribute limit number of offsets among topicpartitions */ + private def rateLimit( + limit: Long, + from: Map[TopicPartition, Long], + until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { + val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq) + val sizes = until.flatMap { + case (tp, end) => + // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it + from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => + val size = end - begin + logDebug(s"rateLimit $tp size is $size") + if (size > 0) Some(tp -> size) else None + } + } + val total = sizes.values.sum.toDouble + if (total < 1) { + until + } else { + until.map { + case (tp, end) => + tp -> sizes.get(tp).map { size => + val begin = from.get(tp).getOrElse(fromNew(tp)) + val prorate = limit * (size / total) + logDebug(s"rateLimit $tp prorated amount is $prorate") + // Don't completely starve small topicpartitions + val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + logDebug(s"rateLimit $tp new offset is $off") + // Paranoia, make sure not to return an offset that's past end + Math.min(end, off) + }.getOrElse(end) + } + } } /** @@ -153,11 +203,7 @@ private[kafka010] case class KafkaSource( // Find the new partitions, and get their earliest offsets val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) - val newPartitionOffsets = if (newPartitions.nonEmpty) { - fetchNewPartitionEarliestOffsets(newPartitions.toSeq) - } else { - Map.empty[TopicPartition, Long] - } + val newPartitionOffsets = fetchNewPartitionEarliestOffsets(newPartitions.toSeq) if (newPartitionOffsets.keySet != newPartitions) { // We cannot get from offsets for some partitions. It means they got deleted. val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet) @@ -221,6 +267,12 @@ private[kafka010] case class KafkaSource( logInfo("GetBatch generating RDD of offset range: " + offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) + + // On recovery, getBatch will get called before getOffset + if (currentPartitionOffsets.isEmpty) { + currentPartitionOffsets = Some(untilPartitionOffsets) + } + sqlContext.createDataFrame(rdd, schema) } @@ -305,23 +357,28 @@ private[kafka010] case class KafkaSource( * some partitions if they are deleted. */ private def fetchNewPartitionEarliestOffsets( - newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { - // Poll to get the latest assigned partitions - consumer.poll(0) - val partitions = consumer.assignment() - consumer.pause(partitions) - logDebug(s"\tPartitions assigned to consumer: $partitions") - - // Get the earliest offset of each partition - consumer.seekToBeginning(partitions) - val partitionOffsets = newPartitions.filter { p => - // When deleting topics happen at the same time, some partitions may not be in `partitions`. - // So we need to ignore them - partitions.contains(p) - }.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") - partitionOffsets - } + newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = + if (newPartitions.isEmpty) { + Map.empty[TopicPartition, Long] + } else { + withRetriesWithoutInterrupt { + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + consumer.pause(partitions) + logDebug(s"\tPartitions assigned to consumer: $partitions") + + // Get the earliest offset of each partition + consumer.seekToBeginning(partitions) + val partitionOffsets = newPartitions.filter { p => + // When deleting topics happen at the same time, some partitions may not be in + // `partitions`. So we need to ignore them + partitions.contains(p) + }.map(p => p -> consumer.position(p)).toMap + logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") + partitionOffsets + } + } /** * Helper function that does multiple retries on the a body of code that returns offsets. http://git-wip-us.apache.org/repos/asf/spark/blob/6fb1f735/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index b50688e..ed4cc75 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -23,13 +23,14 @@ import scala.util.Random import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.{ ProcessingTime, StreamTest } import org.apache.spark.sql.test.SharedSQLContext - abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { protected var testUtils: KafkaTestUtils = _ @@ -133,6 +134,72 @@ class KafkaSourceSuite extends KafkaSourceTest { private val topicId = new AtomicInteger(0) + test("maxOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 10) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), + AdvanceManualClock(100), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 + ), + StopStream, + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + AdvanceManualClock(100), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 121, 122, 123, 124, 125 + ), + AdvanceManualClock(100), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 121, 122, 123, 124, 125, + 13, 126, 127, 128, 129, 130, 131, 132, 133, 134 + ) + ) + } + test("cannot stop Kafka stream") { val topic = newTopic() testUtils.createTopic(newTopic(), partitions = 5) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org