Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20243#discussion_r161305572
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala 
---
    @@ -17,58 +17,36 @@
     
     package org.apache.spark.sql.execution.streaming
     
    -import org.apache.spark.internal.Logging
    -import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
    -import org.apache.spark.sql.execution.SQLExecution
    -import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister, StreamSinkProvider}
    -import org.apache.spark.sql.streaming.OutputMode
    -import org.apache.spark.sql.types.StructType
    -
    -class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
    -  // Number of rows to display, by default 20 rows
    -  private val numRowsToShow = 
options.get("numRows").map(_.toInt).getOrElse(20)
    -
    -  // Truncate the displayed data if it is too long, by default it is true
    -  private val isTruncated = 
options.get("truncate").map(_.toBoolean).getOrElse(true)
    +import java.util.Optional
     
    -  // Track the batch id
    -  private var lastBatchId = -1L
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = 
synchronized {
    -    val batchIdStr = if (batchId <= lastBatchId) {
    -      s"Rerun batch: $batchId"
    -    } else {
    -      lastBatchId = batchId
    -      s"Batch: $batchId"
    -    }
    -
    -    // scalastyle:off println
    -    println("-------------------------------------------")
    -    println(batchIdStr)
    -    println("-------------------------------------------")
    -    // scalastyle:off println
    -    data.sparkSession.createDataFrame(
    -      data.sparkSession.sparkContext.parallelize(data.collect()), 
data.schema)
    -      .show(numRowsToShow, isTruncated)
    -  }
    +import scala.collection.JavaConverters._
     
    -  override def toString(): String = s"ConsoleSink[numRows=$numRowsToShow, 
truncate=$isTruncated]"
    -}
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
    +import org.apache.spark.sql.sources.{BaseRelation, 
CreatableRelationProvider, DataSourceRegister}
    +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
    +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
     case class ConsoleRelation(override val sqlContext: SQLContext, data: 
DataFrame)
       extends BaseRelation {
       override def schema: StructType = data.schema
     }
     
    -class ConsoleSinkProvider extends StreamSinkProvider
    +class ConsoleSinkProvider extends DataSourceV2
    +  with MicroBatchWriteSupport
       with DataSourceRegister
       with CreatableRelationProvider {
    -  def createSink(
    -      sqlContext: SQLContext,
    -      parameters: Map[String, String],
    -      partitionColumns: Seq[String],
    -      outputMode: OutputMode): Sink = {
    -    new ConsoleSink(parameters)
    +
    +  override def createMicroBatchWriter(
    +      queryId: String,
    +      epochId: Long,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
    +    Optional.of(new ConsoleWriter(epochId, schema, 
options.asMap.asScala.toMap))
       }
     
       def createRelation(
    --- End diff --
    
    What is createRelation used for? For batch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to