Repository: spark Updated Branches: refs/heads/master 19331b8e4 -> e55a105ae
[SPARK-20599][SS] ConsoleSink should work with (batch) ## What changes were proposed in this pull request? Currently, if we read a batch and want to display it on the console sink, it will lead a runtime exception. Changes: - In this PR, we add a match rule to check whether it is a ConsoleSinkProvider, we will display the Dataset if using console format. ## How was this patch tested? spark.read.schema().json(path).write.format("console").save Author: Lubo Zhang <lubo.zh...@intel.com> Author: lubozhan <lubo.zh...@intel.com> Closes #18347 from lubozhan/dev. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e55a105a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e55a105a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e55a105a Branch: refs/heads/master Commit: e55a105ae04f1d1c35ee8f02005a3ab71d789124 Parents: 19331b8 Author: Lubo Zhang <lubo.zh...@intel.com> Authored: Thu Jun 22 11:18:58 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Jun 22 11:18:58 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/console.scala | 28 ++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e55a105a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 38c6319..9e889ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.types.StructType class ConsoleSink(options: Map[String, String]) extends Sink with Logging { // Number of rows to display, by default 20 rows @@ -51,7 +53,14 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging { } } -class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister { +case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) + extends BaseRelation { + override def schema: StructType = data.schema +} + +class ConsoleSinkProvider extends StreamSinkProvider + with DataSourceRegister + with CreatableRelationProvider { def createSink( sqlContext: SQLContext, parameters: Map[String, String], @@ -60,5 +69,20 @@ class ConsoleSinkProvider extends StreamSinkProvider with DataSourceRegister { new ConsoleSink(parameters) } + def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + // Number of rows to display, by default 20 rows + val numRowsToShow = parameters.get("numRows").map(_.toInt).getOrElse(20) + + // Truncate the displayed data if it is too long, by default it is true + val isTruncated = parameters.get("truncate").map(_.toBoolean).getOrElse(true) + data.showInternal(numRowsToShow, isTruncated) + + ConsoleRelation(sqlContext, data) + } + def shortName(): String = "console" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org