Repository: spark
Updated Branches:
  refs/heads/master 109935fc5 -> 2824f12b8


[SPARK-23565][SS] New error message for structured streaming sources assertion

## What changes were proposed in this pull request?

A more informative message to tell you why a structured streaming query cannot 
continue if you have added more sources, than there are in the existing 
checkpoint offsets.

## How was this patch tested?

I added a Unit Test.

Author: Patrick McGloin <mcgloin.patr...@gmail.com>

Closes #20946 from patrickmcgloin/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2824f12b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2824f12b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2824f12b

Branch: refs/heads/master
Commit: 2824f12b8bac5d86a82339d4dfb4d2625e978a15
Parents: 109935f
Author: Patrick McGloin <mcgloin.patr...@gmail.com>
Authored: Fri Apr 27 23:04:14 2018 +0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Apr 27 23:04:14 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/streaming/OffsetSeq.scala     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2824f12b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 73945b3..7871744 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -39,7 +39,9 @@ case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: 
Option[OffsetSeqMet
    * cannot be serialized).
    */
   def toStreamProgress(sources: Seq[BaseStreamingSource]): StreamProgress = {
-    assert(sources.size == offsets.size)
+    assert(sources.size == offsets.size, s"There are [${offsets.size}] sources 
in the " +
+      s"checkpoint offsets and now there are [${sources.size}] sources 
requested by the query. " +
+      s"Cannot continue.")
     new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => 
(s, o) }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to