[ 
https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret updated SPARK-27564:
----------------------------
    Description: 
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following: 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
    var schema: StructType = createSchema()

    override def readSchema(): StructType = schema

    override def pruneColumns(requiredSchema: StructType) = {
        this.schema = requiredSchema
    }{code}
and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:
{code:java}
def x(): Unit = {
        val df1 = sparkSession.readStream.format(myV2Source).load()

        val df2 = df1
                .withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType))
                .withWatermark("epoch", "1 milliseconds")
                .groupBy(col("epoch"), col("id")).count()

        val streamingQuery = df2
                .writeStream
                .format("console")
                .trigger(Trigger.ProcessingTime("10 seconds"))
                .outputMode(OutputMode.Append())
                .start()

        streamingQuery.awaitTermination()
   }
{code}
I get the following exception:
{code:java}
Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as 
timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
{code}
Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.

  was:
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following:

 

 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
    var schema: StructType = createSchema()

    override def readSchema(): StructType = schema

    override def pruneColumns(requiredSchema: StructType) = {
        this.schema = requiredSchema
    }{code}
 

and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:

 

I get the following exception:

{{Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
[cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as timestamp) 
AS epoch#201, id#367L|#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS 
epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, 
id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
 at scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }}

Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.


> 'No plan for EventTimeWatermark' error while using structured streaming with 
> column pruning
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27564
>                 URL: https://issues.apache.org/jira/browse/SPARK-27564
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Kineret
>            Priority: Major
>
> I get 'No plan for EventTimeWatermark' error while doing a query with columns 
> pruning using structured streaming with a custom data source that implements 
> Spark datasource v2.
> My data source implementation that handles the schemas includes the 
> following: 
> {code:java}
> class MyDataSourceReader extends DataSourceReader with  
> SupportsPushDownRequiredColumns { 
>     var schema: StructType = createSchema()
>     override def readSchema(): StructType = schema
>     override def pruneColumns(requiredSchema: StructType) = {
>         this.schema = requiredSchema
>     }{code}
> and then:
> {code:java}
> class MyDataSourceReaderStream extends MyDataSourceReader { ... }
> {code}
> This is my test code:
> {code:java}
> def x(): Unit = {
>         val df1 = sparkSession.readStream.format(myV2Source).load()
>         val df2 = df1
>                 .withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
> 30).cast(TimestampType))
>                 .withWatermark("epoch", "1 milliseconds")
>                 .groupBy(col("epoch"), col("id")).count()
>         val streamingQuery = df2
>                 .writeStream
>                 .format("console")
>                 .trigger(Trigger.ProcessingTime("10 seconds"))
>                 .outputMode(OutputMode.Append())
>                 .start()
>         streamingQuery.awaitTermination()
>    }
> {code}
> I get the following exception:
> {code:java}
> Caused by: java.lang.AssertionError: assertion failed: No plan for 
> EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
> +- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as 
> timestamp) AS epoch#201, id#367L]
>    +- DataSourceV2Relation [epoch#320L, id#367L], 
> com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
> at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
> at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {code}
> Note that in the logical plan I got *DataSourceV2Relation* and not 
> *StreamingDataSourceV2Relation*although I use streaming.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to