Repository: spark
Updated Branches:
  refs/heads/branch-2.1 6b332909f -> 7a84edb24


[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default 
starting offset in latest

## What changes were proposed in this pull request?

Added test to check whether default starting offset in latest

## How was this patch tested?
new unit test

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #15778 from tdas/SPARK-18283.

(cherry picked from commit b06c23db9aedae48c9eba9d702ae82fa5647cfe5)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a84edb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a84edb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a84edb2

Branch: refs/heads/branch-2.1
Commit: 7a84edb2475446ff3a98e8cc8dcf62ee801fbbb9
Parents: 6b33290
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Nov 7 10:43:36 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Nov 7 10:43:53 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 24 ++++++++++++++++++++
 1 file changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a84edb2/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index ed4cc75..89e713f 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
+  test("starting offset is latest by default") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("0"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+
+    val kafka = reader.load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+    val mapped = kafka.map(_.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(1, 2, 3)  // should not have 0
+    )
+  }
+
   test("bad source options") {
     def testBadOptions(options: (String, String)*)(expectedMsgs: String*): 
Unit = {
       val ex = intercept[IllegalArgumentException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to