Hi All,

I get 'No plan for EventTimeWatermark' error while doing a query with
columns pruning using structured streaming with a custom data source that
implements Spark datasource v2.

My data source implementation that handles the schemas includes the
following:

class MyDataSourceReader extends DataSourceReader with
SupportsPushDownRequiredColumns {
    var schema: StructType = createSchema()

    override def readSchema(): StructType = schema

    override def pruneColumns(requiredSchema: StructType) = {
        this.schema = requiredSchema
    }

and then:

class MyDataSourceReaderStream extends MyDataSourceReader { ...

This is my test code:

def x(): Unit = {
        val df1 = sparkSession.readStream.format(myV2Source).load()

        val df2 = df1
                .withColumn("epoch", (round(col("epoch") / (30 *
1000)) * 30).cast(TimestampType))
                .withWatermark("epoch", "1 milliseconds")
                .groupBy(col("epoch"), col("id")).count()

        val streamingQuery = df2
                .writeStream
                .format("console")
                .trigger(Trigger.ProcessingTime("10 seconds"))
                .outputMode(OutputMode.Append())
                .start()

        streamingQuery.awaitTermination()
   }

I get the following exception:

Caused by: java.lang.AssertionError: assertion failed: No plan for
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) *
30.0) as timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L],
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

Note that in the logical plan I got *DataSourceV2Relation* and not
*StreamingDataSourceV2Relation*although I use streaming.

Would appreciate the help.

Reply via email to