[GitHub] spark pull request #15367: [SPARK-17346][SQL][test-maven]Add Kafka source fo...

2016-10-07 Thread zsxwing
Github user zsxwing closed the pull request at:

https://github.com/apache/spark/pull/15367


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15367: [SPARK-17346][SQL][test-maven]Add Kafka source fo...

2016-10-06 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15367#discussion_r82313748
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.scalatest.BeforeAndAfter
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override val streamingTimeout = 30.seconds
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
+// Because KafkaSource's initialPartitionOffsets is set lazily, we 
need to make sure
+// its "getOffset" is called before pushing any data. Otherwise, 
because of the race contion,
+// we don't know which data should be fetched when `startingOffset` is 
latest.
+q.processAllAvailable()
+true
+  }
+
+  /**
+   * Add data to Kafka.
+   *
+   * `topicAction` can be used to run actions for each topic before 
inserting data.
+   */
+  case class AddKafkaData(topics: Set[String], data: Int*)
+(implicit ensureDataInMultiplePartition: Boolean = false,
+  concurrent: Boolean = false,
+  message: String = "",
+  topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends 
AddData {
+
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (query.get.isActive) {
+// Make sure no Spark job is running when deleting a topic
+query.get.processAllAvailable()
+  }
+
+  val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
+  val newTopics = topics.diff(existingTopics.keySet)
+  for (newTopic <- newTopics) {
+topicAction(newTopic, None)
+  }
+  for (existingTopicPartitions <- existingTopics) {
+topicAction(existingTopicPartitions._1, 
Some(existingTopicPartitions._2))
+  }
+
+  // Read all topics again in case some topics are delete.
+  val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
kafka source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[KafkaSource] =>
+  source.asInstanceOf[KafkaSource]
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find Kafka source in the StreamExecution logical plan 
to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the Kafka source in the StreamExecution 
logical plan as there" +
+"are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val kafkaSource = sources.head
+  val topic = topics.toSeq(Random.nextInt(topics.size))
+  val sentMetadata = testUtils.sendMessages(topic, data.map { 
_.toString }.toArray)
+
+  def metadataToStr(m: (String, RecordMetadata)): String = {
+s"Sent ${m._1} to partition ${m._2.partition()}, offset 
${m._2.offset()}"

[GitHub] spark pull request #15367: [SPARK-17346][SQL][test-maven]Add Kafka source fo...

2016-10-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15367#discussion_r82313116
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.scalatest.BeforeAndAfter
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override val streamingTimeout = 30.seconds
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
+// Because KafkaSource's initialPartitionOffsets is set lazily, we 
need to make sure
+// its "getOffset" is called before pushing any data. Otherwise, 
because of the race contion,
+// we don't know which data should be fetched when `startingOffset` is 
latest.
+q.processAllAvailable()
+true
+  }
+
+  /**
+   * Add data to Kafka.
+   *
+   * `topicAction` can be used to run actions for each topic before 
inserting data.
+   */
+  case class AddKafkaData(topics: Set[String], data: Int*)
+(implicit ensureDataInMultiplePartition: Boolean = false,
+  concurrent: Boolean = false,
+  message: String = "",
+  topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends 
AddData {
+
+override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
+  if (query.get.isActive) {
+// Make sure no Spark job is running when deleting a topic
+query.get.processAllAvailable()
+  }
+
+  val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
+  val newTopics = topics.diff(existingTopics.keySet)
+  for (newTopic <- newTopics) {
+topicAction(newTopic, None)
+  }
+  for (existingTopicPartitions <- existingTopics) {
+topicAction(existingTopicPartitions._1, 
Some(existingTopicPartitions._2))
+  }
+
+  // Read all topics again in case some topics are delete.
+  val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
kafka source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[KafkaSource] =>
+  source.asInstanceOf[KafkaSource]
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find Kafka source in the StreamExecution logical plan 
to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the Kafka source in the StreamExecution 
logical plan as there" +
+"are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val kafkaSource = sources.head
+  val topic = topics.toSeq(Random.nextInt(topics.size))
+  val sentMetadata = testUtils.sendMessages(topic, data.map { 
_.toString }.toArray)
+
+  def metadataToStr(m: (String, RecordMetadata)): String = {
+s"Sent ${m._1} to partition ${m._2.partition()}, offset 
${m._2.offset()}"