Repository: spark Updated Branches: refs/heads/branch-2.0 b5d7217af -> 10525c294
[SPARK-18283][STRUCTURED STREAMING][KAFKA] Added test to check whether default starting offset in latest ## What changes were proposed in this pull request? Added test to check whether default starting offset in latest ## How was this patch tested? new unit test Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #15778 from tdas/SPARK-18283. (cherry picked from commit b06c23db9aedae48c9eba9d702ae82fa5647cfe5) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10525c29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10525c29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10525c29 Branch: refs/heads/branch-2.0 Commit: 10525c2947d9d1593e77e6af692573b03de6a71f Parents: b5d7217 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Mon Nov 7 10:43:36 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Mon Nov 7 10:44:05 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaSourceSuite.scala | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/10525c29/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index ed4cc75..89e713f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -306,6 +306,30 @@ class KafkaSourceSuite extends KafkaSourceTest { ) } + test("starting offset is latest by default") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("0")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + + val kafka = reader.load() + .selectExpr("CAST(value AS STRING)") + .as[String] + val mapped = kafka.map(_.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(1, 2, 3) // should not have 0 + ) + } + test("bad source options") { def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = { val ex = intercept[IllegalArgumentException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org