Repository: spark
Updated Branches:
  refs/heads/master 5fd54b994 -> 9293734d3


http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
new file mode 100644
index 0000000..64bf503
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.scalatest.BeforeAndAfter
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override val streamingTimeout = 30.seconds
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    if (testUtils != null) {
+      testUtils.teardown()
+      testUtils = null
+      super.afterAll()
+    }
+  }
+
+  protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
+    // Because KafkaSource's initialPartitionOffsets is set lazily, we need to 
make sure
+    // its "getOffset" is called before pushing any data. Otherwise, because 
of the race contion,
+    // we don't know which data should be fetched when `startingOffset` is 
latest.
+    q.processAllAvailable()
+    true
+  }
+
+  /**
+   * Add data to Kafka.
+   *
+   * `topicAction` can be used to run actions for each topic before inserting 
data.
+   */
+  case class AddKafkaData(topics: Set[String], data: Int*)
+    (implicit ensureDataInMultiplePartition: Boolean = false,
+      concurrent: Boolean = false,
+      message: String = "",
+      topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends 
AddData {
+
+    override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+      if (query.get.isActive) {
+        // Make sure no Spark job is running when deleting a topic
+        query.get.processAllAvailable()
+      }
+
+      val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
+      val newTopics = topics.diff(existingTopics.keySet)
+      for (newTopic <- newTopics) {
+        topicAction(newTopic, None)
+      }
+      for (existingTopicPartitions <- existingTopics) {
+        topicAction(existingTopicPartitions._1, 
Some(existingTopicPartitions._2))
+      }
+
+      // Read all topics again in case some topics are delete.
+      val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys
+      require(
+        query.nonEmpty,
+        "Cannot add data when there is no query for finding the active kafka 
source")
+
+      val sources = query.get.logicalPlan.collect {
+        case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[KafkaSource] =>
+          source.asInstanceOf[KafkaSource]
+      }
+      if (sources.isEmpty) {
+        throw new Exception(
+          "Could not find Kafka source in the StreamExecution logical plan to 
add data to")
+      } else if (sources.size > 1) {
+        throw new Exception(
+          "Could not select the Kafka source in the StreamExecution logical 
plan as there" +
+            "are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
+      }
+      val kafkaSource = sources.head
+      val topic = topics.toSeq(Random.nextInt(topics.size))
+      val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString 
}.toArray)
+
+      def metadataToStr(m: (String, RecordMetadata)): String = {
+        s"Sent ${m._1} to partition ${m._2.partition()}, offset 
${m._2.offset()}"
+      }
+      // Verify that the test data gets inserted into multiple partitions
+      if (ensureDataInMultiplePartition) {
+        require(
+          sentMetadata.groupBy(_._2.partition).size > 1,
+          s"Added data does not test multiple partitions: 
${sentMetadata.map(metadataToStr)}")
+      }
+
+      val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
+      logInfo(s"Added data, expected offset $offset")
+      (kafkaSource, offset)
+    }
+
+    override def toString: String =
+      s"AddKafkaData(topics = $topics, data = $data, message = $message)"
+  }
+}
+
+
+class KafkaSourceSuite extends KafkaSourceTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  test("cannot stop Kafka stream") {
+    val topic = newTopic()
+    testUtils.createTopic(newTopic(), partitions = 5)
+    testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", s"topic-.*")
+
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      StopStream
+    )
+  }
+
+  test("subscribing topic by name from latest offsets") {
+    val topic = newTopic()
+    testFromLatestOffsets(topic, "subscribe" -> topic)
+  }
+
+  test("subscribing topic by name from earliest offsets") {
+    val topic = newTopic()
+    testFromEarliestOffsets(topic, "subscribe" -> topic)
+  }
+
+  test("subscribing topic by pattern from latest offsets") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-suffix"
+    testFromLatestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+  }
+
+  test("subscribing topic by pattern from earliest offsets") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-suffix"
+    testFromEarliestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*")
+  }
+
+  test("subscribing topic by pattern with topic deletions") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-seems"
+    val topic2 = topicPrefix + "-bad"
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", s"$topicPrefix-.*")
+      .option("failOnDataLoss", "false")
+
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      Assert {
+        testUtils.deleteTopic(topic)
+        testUtils.createTopic(topic2, partitions = 5)
+        true
+      },
+      AddKafkaData(Set(topic2), 4, 5, 6),
+      CheckAnswer(2, 3, 4, 5, 6, 7)
+    )
+  }
+
+  test("bad source options") {
+    def testBadOptions(options: (String, String)*)(expectedMsgs: String*): 
Unit = {
+      val ex = intercept[IllegalArgumentException] {
+        val reader = spark
+          .readStream
+          .format("kafka")
+        options.foreach { case (k, v) => reader.option(k, v) }
+        reader.load()
+      }
+      expectedMsgs.foreach { m =>
+        assert(ex.getMessage.toLowerCase.contains(m.toLowerCase))
+      }
+    }
+
+    // No strategy specified
+    testBadOptions()("options must be specified", "subscribe", 
"subscribePattern")
+
+    // Multiple strategies specified
+    testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
+      "only one", "options can be specified")
+
+    testBadOptions("subscribe" -> "")("no topics to subscribe")
+    testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
+  }
+
+  test("unsupported kafka configs") {
+    def testUnsupportedConfig(key: String, value: String = "someValue"): Unit 
= {
+      val ex = intercept[IllegalArgumentException] {
+        val reader = spark
+          .readStream
+          .format("kafka")
+          .option("subscribe", "topic")
+          .option("kafka.bootstrap.servers", "somehost")
+          .option(s"$key", value)
+        reader.load()
+      }
+      assert(ex.getMessage.toLowerCase.contains("not supported"))
+    }
+
+    testUnsupportedConfig("kafka.group.id")
+    testUnsupportedConfig("kafka.auto.offset.reset")
+    testUnsupportedConfig("kafka.enable.auto.commit")
+    testUnsupportedConfig("kafka.interceptor.classes")
+    testUnsupportedConfig("kafka.key.deserializer")
+    testUnsupportedConfig("kafka.value.deserializer")
+
+    testUnsupportedConfig("kafka.auto.offset.reset", "none")
+    testUnsupportedConfig("kafka.auto.offset.reset", "someValue")
+    testUnsupportedConfig("kafka.auto.offset.reset", "earliest")
+    testUnsupportedConfig("kafka.auto.offset.reset", "latest")
+  }
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def testFromLatestOffsets(topic: String, options: (String, 
String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("startingOffset", s"latest")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+    options.foreach { case (k, v) => reader.option(k, v) }
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      StopStream,
+      StartStream(),
+      CheckAnswer(2, 3, 4), // Should get the data back on recovery
+      StopStream,
+      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+      StartStream(),
+      CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
+      AddKafkaData(Set(topic), 7, 8),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+      AssertOnQuery("Add partitions") { query: StreamExecution =>
+        testUtils.addPartitions(topic, 10)
+        true
+      },
+      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+    )
+  }
+
+  private def testFromEarliestOffsets(topic: String, options: (String, 
String)*): Unit = {
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark.readStream
+    reader
+      .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+      .option("startingOffset", s"earliest")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+    options.foreach { case (k, v) => reader.option(k, v) }
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
+      CheckAnswer(2, 3, 4, 5, 6, 7),
+      StopStream,
+      StartStream(),
+      CheckAnswer(2, 3, 4, 5, 6, 7),
+      StopStream,
+      AddKafkaData(Set(topic), 7, 8),
+      StartStream(),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
+      AssertOnQuery("Add partitions") { query: StreamExecution =>
+        testUtils.addPartitions(topic, 10)
+        true
+      },
+      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
+      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
+    )
+  }
+}
+
+
+class KafkaSourceStressSuite extends KafkaSourceTest with BeforeAndAfter {
+
+  import testImplicits._
+
+  val topicId = new AtomicInteger(1)
+
+  @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic)
+
+  def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
+
+  private def nextInt(start: Int, end: Int): Int = {
+    start + Random.nextInt(start + end - 1)
+  }
+
+  after {
+    for (topic <- testUtils.getAllTopicsAndPartitionSize().toMap.keys) {
+      testUtils.deleteTopic(topic)
+    }
+  }
+
+  test("stress test with multiple topics and partitions")  {
+    topics.foreach { topic =>
+      testUtils.createTopic(topic, partitions = nextInt(1, 6))
+      testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
+    }
+
+    // Create Kafka source that reads from latest offset
+    val kafka =
+      spark.readStream
+        .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("kafka.metadata.max.age.ms", "1")
+        .option("subscribePattern", "stress.*")
+        .option("failOnDataLoss", "false")
+        .load()
+        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+        .as[(String, String)]
+
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    runStressTest(
+      mapped,
+      Seq(makeSureGetOffsetCalled),
+      (d, running) => {
+        Random.nextInt(5) match {
+          case 0 => // Add a new topic
+            topics = topics ++ Seq(newStressTopic)
+            AddKafkaData(topics.toSet, d: _*)(message = s"Add topic 
$newStressTopic",
+              topicAction = (topic, partition) => {
+                if (partition.isEmpty) {
+                  testUtils.createTopic(topic, partitions = nextInt(1, 6))
+                }
+              })
+          case 1 if running =>
+            // Only delete a topic when the query is running. Otherwise, we 
may lost data and
+            // cannot check the correctness.
+            val deletedTopic = topics(Random.nextInt(topics.size))
+            if (deletedTopic != topics.head) {
+              topics = topics.filterNot(_ == deletedTopic)
+            }
+            AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic 
$deletedTopic",
+              topicAction = (topic, partition) => {
+                // Never remove the first topic to make sure we have at least 
one topic
+                if (topic == deletedTopic && deletedTopic != topics.head) {
+                  testUtils.deleteTopic(deletedTopic)
+                }
+              })
+          case 2 => // Add new partitions
+            AddKafkaData(topics.toSet, d: _*)(message = "Add partitiosn",
+              topicAction = (topic, partition) => {
+                testUtils.addPartitions(topic, partition.get + nextInt(1, 6))
+              })
+          case _ => // Just add new data
+            AddKafkaData(topics.toSet, d: _*)
+        }
+      },
+      iterations = 50)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000..3eb8a73
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.Random
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.common.TopicAndPartition
+import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
+import kafka.utils.ZkUtils
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to 
set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related 
Kafka APIs.
+ */
+class KafkaTestUtils extends Logging {
+
+  // Zookeeper related configurations
+  private val zkHost = "localhost"
+  private var zkPort: Int = 0
+  private val zkConnectionTimeout = 60000
+  private val zkSessionTimeout = 6000
+
+  private var zookeeper: EmbeddedZookeeper = _
+
+  private var zkUtils: ZkUtils = _
+
+  // Kafka broker related configurations
+  private val brokerHost = "localhost"
+  private var brokerPort = 0
+  private var brokerConf: KafkaConfig = _
+
+  // Kafka broker server
+  private var server: KafkaServer = _
+
+  // Kafka producer
+  private var producer: Producer[String, String] = _
+
+  // Flag to test whether the system is correctly started
+  private var zkReady = false
+  private var brokerReady = false
+
+  def zkAddress: String = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
+    s"$zkHost:$zkPort"
+  }
+
+  def brokerAddress: String = {
+    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get 
broker address")
+    s"$brokerHost:$brokerPort"
+  }
+
+  def zookeeperClient: ZkUtils = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper client")
+    Option(zkUtils).getOrElse(
+      throw new IllegalStateException("Zookeeper client is not yet 
initialized"))
+  }
+
+  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+  private def setupEmbeddedZookeeper(): Unit = {
+    // Zookeeper server startup
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
+    zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout, false)
+    zkReady = true
+  }
+
+  // Set up the Embedded Kafka server
+  private def setupEmbeddedKafkaServer(): Unit = {
+    assert(zkReady, "Zookeeper should be set up beforehand")
+
+    // Kafka broker startup
+    Utils.startServiceOnPort(brokerPort, port => {
+      brokerPort = port
+      brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+      server = new KafkaServer(brokerConf)
+      server.startup()
+      brokerPort = server.boundPort()
+      (server, brokerPort)
+    }, new SparkConf(), "KafkaBroker")
+
+    brokerReady = true
+  }
+
+  /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
+  def setup(): Unit = {
+    setupEmbeddedZookeeper()
+    setupEmbeddedKafkaServer()
+  }
+
+  /** Teardown the whole servers, including Kafka broker and Zookeeper */
+  def teardown(): Unit = {
+    brokerReady = false
+    zkReady = false
+
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
+
+    if (server != null) {
+      server.shutdown()
+      server = null
+    }
+
+    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+    if (zkUtils != null) {
+      zkUtils.close()
+      zkUtils = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p =>
+      waitUntilMetadataIsPropagated(topic, p)
+    }
+  }
+
+  def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
+    
zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String): Unit = {
+    createTopic(topic, 1)
+  }
+
+  /** Delete a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def deleteTopic(topic: String): Unit = {
+    val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
+    AdminUtils.deleteTopic(zkUtils, topic)
+    verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
+  }
+
+  /** Add new paritions to a Kafka topic */
+  def addPartitions(topic: String, partitions: Int): Unit = {
+    AdminUtils.addPartitions(zkUtils, topic, partitions)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p =>
+      waitUntilMetadataIsPropagated(topic, p)
+    }
+  }
+
+  /** Java-friendly function for sending messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+    sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /** Send the messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) 
}.toArray
+    sendMessages(topic, messages)
+  }
+
+  /** Send the array of messages to the Kafka broker */
+  def sendMessages(topic: String, messages: Array[String]): Seq[(String, 
RecordMetadata)] = {
+    producer = new KafkaProducer[String, String](producerConfiguration)
+    val offsets = try {
+      messages.map { m =>
+        val metadata =
+          producer.send(new ProducerRecord[String, String](topic, m)).get(10, 
TimeUnit.SECONDS)
+          logInfo(s"\tSent $m to partition ${metadata.partition}, offset 
${metadata.offset}")
+        (m, metadata)
+      }
+    } finally {
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+    }
+    offsets
+  }
+
+  def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
+    val kc = new KafkaConsumer[String, String](consumerConfiguration)
+    logInfo("Created consumer to get latest offsets")
+    kc.subscribe(topics.asJavaCollection)
+    kc.poll(0)
+    val partitions = kc.assignment()
+    kc.pause(partitions)
+    kc.seekToEnd(partitions)
+    val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
+    kc.close()
+    logInfo("Closed consumer to get latest offsets")
+    offsets
+  }
+
+  private def brokerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("broker.id", "0")
+    props.put("host.name", "localhost")
+    props.put("advertised.host.name", "localhost")
+    props.put("port", brokerPort.toString)
+    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+    props.put("zookeeper.connect", zkAddress)
+    props.put("log.flush.interval.messages", "1")
+    props.put("replica.socket.timeout.ms", "1500")
+    props.put("delete.topic.enable", "true")
+    props
+  }
+
+  private def producerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("bootstrap.servers", brokerAddress)
+    props.put("value.serializer", classOf[StringSerializer].getName)
+    props.put("key.serializer", classOf[StringSerializer].getName)
+    // wait for all in-sync replicas to ack sends
+    props.put("acks", "all")
+    props
+  }
+
+  private def consumerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("bootstrap.servers", brokerAddress)
+    props.put("group.id", "group-KafkaTestUtils-" + Random.nextInt)
+    props.put("value.deserializer", classOf[StringDeserializer].getName)
+    props.put("key.deserializer", classOf[StringDeserializer].getName)
+    props.put("enable.auto.commit", "false")
+    props
+  }
+
+  private def verifyTopicDeletion(
+      zkUtils: ZkUtils,
+      topic: String,
+      numPartitions: Int,
+      servers: Seq[KafkaServer]) {
+    import ZkUtils._
+    val topicAndPartitions = (0 until 
numPartitions).map(TopicAndPartition(topic, _))
+    def isDeleted(): Boolean = {
+      // wait until admin path for delete topic is deleted, signaling 
completion of topic deletion
+      val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
+      val topicPath = !zkUtils.pathExists(getTopicPath(topic))
+      // ensure that the topic-partition has been deleted from all brokers' 
replica managers
+      val replicaManager = servers.forall(server => 
topicAndPartitions.forall(tp =>
+        server.replicaManager.getPartition(tp.topic, tp.partition) == None))
+      // ensure that logs from all replicas are deleted if delete topic is 
marked successful
+      val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
+        server.getLogManager().getLog(tp).isEmpty))
+      // ensure that topic is removed from all cleaner offsets
+      val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
+        val checkpoints = server.getLogManager().logDirs.map { logDir =>
+          new OffsetCheckpoint(new File(logDir, 
"cleaner-offset-checkpoint")).read()
+        }
+        checkpoints.forall(checkpointsPerLogDir => 
!checkpointsPerLogDir.contains(tp))
+      })
+      deletePath && topicPath && replicaManager && logManager && cleaner
+    }
+    eventually(timeout(10.seconds)) {
+      assert(isDeleted, s"$topic not deleted after timeout")
+    }
+  }
+
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
+    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
+      case Some(partitionState) =>
+        val leaderAndInSyncReplicas = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+        zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+          leaderAndInSyncReplicas.isr.size >= 1
+
+      case _ =>
+        false
+    }
+    eventually(timeout(10.seconds)) {
+      assert(isPropagated, s"Partition [$topic, $partition] metadata not 
propagated after timeout")
+    }
+  }
+
+  private class EmbeddedZookeeper(val zkConnect: String) {
+    val snapshotDir = Utils.createTempDir()
+    val logDir = Utils.createTempDir()
+
+    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+    val (ip, port) = {
+      val splits = zkConnect.split(":")
+      (splits(0), splits(1).toInt)
+    }
+    val factory = new NIOServerCnxnFactory()
+    factory.configure(new InetSocketAddress(ip, port), 16)
+    factory.startup(zookeeper)
+
+    val actualPort = factory.getLocalPort
+
+    def shutdown() {
+      factory.shutdown()
+      Utils.deleteRecursively(snapshotDir)
+      Utils.deleteRecursively(logDir)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8408f4b..37976b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
     <module>external/kafka-0-8-assembly</module>
     <module>external/kafka-0-10</module>
     <module>external/kafka-0-10-assembly</module>
+    <module>external/kafka-0-10-sql</module>
   </modules>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 8e47e7f..88d5dc9 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -39,8 +39,8 @@ object BuildCommons {
 
   private val buildLocation = file(".").getAbsoluteFile.getParentFile
 
-  val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer) = Seq(
-    "catalyst", "sql", "hive", "hive-thriftserver"
+  val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010) = 
Seq(
+    "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
   ).map(ProjectRef(buildLocation, _))
 
   val streamingProjects@Seq(
@@ -353,7 +353,7 @@ object SparkBuild extends PomBuild {
   val mimaProjects = allProjects.filterNot { x =>
     Seq(
       spark, hive, hiveThriftServer, catalyst, repl, networkCommon, 
networkShuffle, networkYarn,
-      unsafe, tags
+      unsafe, tags, sqlKafka010
     ).contains(x)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 9825f19..b3a0d6a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -116,7 +116,7 @@ class StreamExecution(
    * [[HDFSMetadataLog]]. See SPARK-14131 for more details.
    */
   val microBatchThread =
-    new UninterruptibleThread(s"stream execution thread for $name") {
+    new StreamExecutionThread(s"stream execution thread for $name") {
       override def run(): Unit = {
         // To fix call site like "run at <unknown>:0", we bridge the call site 
from the caller
         // thread to this micro batch thread
@@ -530,3 +530,9 @@ object StreamExecution {
 
   def nextId: Long = _nextId.getAndIncrement()
 }
+
+/**
+ * A special thread to run the stream query. Some codes require to run in the 
StreamExecutionThread
+ * and will use `classOf[StreamExecutionThread]` to check.
+ */
+abstract class StreamExecutionThread(name: String) extends 
UninterruptibleThread(name)

http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index aa6515b..09140a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, ManualClock, 
SystemClock, Utils}
  *
  * {{{
  *  val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map(_ + 1)
-
-    testStream(mapped)(
-      AddData(inputData, 1, 2, 3),
-      CheckAnswer(2, 3, 4))
+ *  val mapped = inputData.toDS().map(_ + 1)
+ *
+ *  testStream(mapped)(
+ *    AddData(inputData, 1, 2, 3),
+ *    CheckAnswer(2, 3, 4))
  * }}}
  *
  * Note that while we do sleep to allow the other thread to progress without 
spinning,
@@ -477,21 +477,41 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     }
   }
 
+
+  /**
+   * Creates a stress test that randomly starts/stops/adds data/checks the 
result.
+   *
+   * @param ds a dataframe that executes + 1 on a stream of integers, 
returning the result
+   * @param addData an add data action that adds the given numbers to the 
stream, encoding them
+   *                as needed
+   * @param iterations the iteration number
+   */
+  def runStressTest(
+    ds: Dataset[Int],
+    addData: Seq[Int] => StreamAction,
+    iterations: Int = 100): Unit = {
+    runStressTest(ds, Seq.empty, (data, running) => addData(data), iterations)
+  }
+
   /**
    * Creates a stress test that randomly starts/stops/adds data/checks the 
result.
    *
-   * @param ds a dataframe that executes + 1 on a stream of integers, 
returning the result.
-   * @param addData and add data action that adds the given numbers to the 
stream, encoding them
+   * @param ds a dataframe that executes + 1 on a stream of integers, 
returning the result
+   * @param prepareActions actions need to run before starting the stress test.
+   * @param addData an add data action that adds the given numbers to the 
stream, encoding them
    *                as needed
+   * @param iterations the iteration number
    */
   def runStressTest(
       ds: Dataset[Int],
-      addData: Seq[Int] => StreamAction,
-      iterations: Int = 100): Unit = {
+      prepareActions: Seq[StreamAction],
+      addData: (Seq[Int], Boolean) => StreamAction,
+      iterations: Int): Unit = {
     implicit val intEncoder = ExpressionEncoder[Int]()
     var dataPos = 0
     var running = true
     val actions = new ArrayBuffer[StreamAction]()
+    actions ++= prepareActions
 
     def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) }
 
@@ -499,7 +519,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       val numItems = Random.nextInt(10)
       val data = dataPos until (dataPos + numItems)
       dataPos += numItems
-      actions += addData(data)
+      actions += addData(data, running)
     }
 
     (1 to iterations).foreach { i =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to