Repository: spark
Updated Branches:
  refs/heads/master f2da738c7 -> 808e886b9


[SPARK-21667][STREAMING] ConsoleSink should not fail streaming query with 
checkpointLocation option

## What changes were proposed in this pull request?
Fix to allow recovery on console , avoid checkpoint exception

## How was this patch tested?
existing tests
manual tests [ Replicating error and seeing no checkpoint error after fix]

Author: Rekha Joshi <rekhajo...@gmail.com>
Author: rjoshi2 <rekhajo...@gmail.com>

Closes #19407 from rekhajoshm/SPARK-21667.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/808e886b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/808e886b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/808e886b

Branch: refs/heads/master
Commit: 808e886b9638ab2981dac676b594f09cda9722fe
Parents: f2da738
Author: Rekha Joshi <rekhajo...@gmail.com>
Authored: Fri Nov 10 15:18:11 2017 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Nov 10 15:18:11 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/808e886b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 14e7df6..0be69b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -267,12 +267,6 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
         useTempCheckpointLocation = true,
         trigger = trigger)
     } else {
-      val (useTempCheckpointLocation, recoverFromCheckpointLocation) =
-        if (source == "console") {
-          (true, false)
-        } else {
-          (false, true)
-        }
       val dataSource =
         DataSource(
           df.sparkSession,
@@ -285,8 +279,8 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
         df,
         dataSource.createSink(outputMode),
         outputMode,
-        useTempCheckpointLocation = useTempCheckpointLocation,
-        recoverFromCheckpointLocation = recoverFromCheckpointLocation,
+        useTempCheckpointLocation = source == "console",
+        recoverFromCheckpointLocation = true,
         trigger = trigger)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to