Repository: spark Updated Branches: refs/heads/master f2da738c7 -> 808e886b9
[SPARK-21667][STREAMING] ConsoleSink should not fail streaming query with checkpointLocation option ## What changes were proposed in this pull request? Fix to allow recovery on console , avoid checkpoint exception ## How was this patch tested? existing tests manual tests [ Replicating error and seeing no checkpoint error after fix] Author: Rekha Joshi <rekhajo...@gmail.com> Author: rjoshi2 <rekhajo...@gmail.com> Closes #19407 from rekhajoshm/SPARK-21667. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/808e886b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/808e886b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/808e886b Branch: refs/heads/master Commit: 808e886b9638ab2981dac676b594f09cda9722fe Parents: f2da738 Author: Rekha Joshi <rekhajo...@gmail.com> Authored: Fri Nov 10 15:18:11 2017 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Nov 10 15:18:11 2017 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/808e886b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 14e7df6..0be69b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -267,12 +267,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { useTempCheckpointLocation = true, trigger = trigger) } else { - val (useTempCheckpointLocation, recoverFromCheckpointLocation) = - if (source == "console") { - (true, false) - } else { - (false, true) - } val dataSource = DataSource( df.sparkSession, @@ -285,8 +279,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { df, dataSource.createSink(outputMode), outputMode, - useTempCheckpointLocation = useTempCheckpointLocation, - recoverFromCheckpointLocation = recoverFromCheckpointLocation, + useTempCheckpointLocation = source == "console", + recoverFromCheckpointLocation = true, trigger = trigger) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org