Hi,

I've been developing a data source with a source and sink for Spark
Structured Streaming.

I've got a question about Source.getBatch [1]:

    def getBatch(start: Option[Offset], end: Offset): DataFrame

getBatch returns a streaming DataFrame between the offsets so the idiom (?)
is to have a code as follows:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)
    new Dataset[Row](sparkSession, plan, RowEncoder(schema))

Note the use of schema [2] that is another part of the Source abstraction:

    def schema: StructType

This is the "source" of my question. Is the above OK in a streaming sink /
Source.getBatch?

Since there are no interim operators that could change attributes (schema)
I think it's OK.

I've seen the following code and that made me wonder whether it's better or
not compared to the solution above:

    val relation = new MyRelation(...)(sparkSession)
    val plan = LogicalRelation(relation, isStreaming = true)

    // When would we have to execute plan?
    val qe = sparkSession.sessionState.executePlan(plan)
    new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))

When would or do we have to use qe.analyzed.schema vs simply schema? Could
this qe.analyzed.schema help avoid some edge cases and is a preferred
approach?

Thank you for any help you can offer. Much appreciated.

[1]
https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61
[2]
https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>

Reply via email to