Repository: spark Updated Branches: refs/heads/master 109935fc5 -> 2824f12b8
[SPARK-23565][SS] New error message for structured streaming sources assertion ## What changes were proposed in this pull request? A more informative message to tell you why a structured streaming query cannot continue if you have added more sources, than there are in the existing checkpoint offsets. ## How was this patch tested? I added a Unit Test. Author: Patrick McGloin <mcgloin.patr...@gmail.com> Closes #20946 from patrickmcgloin/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2824f12b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2824f12b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2824f12b Branch: refs/heads/master Commit: 2824f12b8bac5d86a82339d4dfb4d2625e978a15 Parents: 109935f Author: Patrick McGloin <mcgloin.patr...@gmail.com> Authored: Fri Apr 27 23:04:14 2018 +0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Apr 27 23:04:14 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/streaming/OffsetSeq.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2824f12b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 73945b3..7871744 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -39,7 +39,9 @@ case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMet * cannot be serialized). */ def toStreamProgress(sources: Seq[BaseStreamingSource]): StreamProgress = { - assert(sources.size == offsets.size) + assert(sources.size == offsets.size, s"There are [${offsets.size}] sources in the " + + s"checkpoint offsets and now there are [${sources.size}] sources requested by the query. " + + s"Cannot continue.") new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org