[ https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kineret updated SPARK-27564: ---------------------------- Description: I get 'No plan for EventTimeWatermark' error while doing a query with columns pruning using structured streaming with a custom data source that implements Spark datasource v2. My data source implementation that handles the schemas includes the following: {code:java} class MyDataSourceReader extends DataSourceReader with SupportsPushDownRequiredColumns { var schema: StructType = createSchema() override def readSchema(): StructType = schema override def pruneColumns(requiredSchema: StructType) = { this.schema = requiredSchema }{code} and then: {code:java} class MyDataSourceReaderStream extends MyDataSourceReader { ... } {code} This is my test code: I get the following exception: {{Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L|#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, id#367L], com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }} Note that in the logical plan I got *DataSourceV2Relation* and not *StreamingDataSourceV2Relation*although I use streaming. was: I get 'No plan for EventTimeWatermark' error while doing a query with columns pruning using structured streaming with a custom data source that implements Spark datasource v2. My data source implementation that handles the schemas includes the following: {code:java} class MyDataSourceReader extends DataSourceReader with SupportsPushDownRequiredColumns { var schema: StructType = createSchema() override def readSchema(): StructType = schema override def pruneColumns(requiredSchema: StructType) = { this.schema = requiredSchema }{code} and then: {code:java} class MyDataSourceReaderStream extends MyDataSourceReader { ... } {code} This is my test code: {\{def x(): Unit = { val df1 = sparkSession.readStream.format(myV2Source).load() val df2 = df1 .withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") .groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 .writeStream .format("console") .trigger(Trigger.ProcessingTime("10 seconds")) .outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() } }} I get the following exception: {{Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L|#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, id#367L], com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }} Note that in the logical plan I got *DataSourceV2Relation* and not *StreamingDataSourceV2Relation*although I use streaming. > 'No plan for EventTimeWatermark' error while using structured streaming with > column pruning > ------------------------------------------------------------------------------------------- > > Key: SPARK-27564 > URL: https://issues.apache.org/jira/browse/SPARK-27564 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.1 > Reporter: Kineret > Priority: Major > > I get 'No plan for EventTimeWatermark' error while doing a query with columns > pruning using structured streaming with a custom data source that implements > Spark datasource v2. > My data source implementation that handles the schemas includes the following: > > > {code:java} > class MyDataSourceReader extends DataSourceReader with > SupportsPushDownRequiredColumns { > var schema: StructType = createSchema() > override def readSchema(): StructType = schema > override def pruneColumns(requiredSchema: StructType) = { > this.schema = requiredSchema > }{code} > > and then: > {code:java} > class MyDataSourceReaderStream extends MyDataSourceReader { ... } > {code} > This is my test code: > > I get the following exception: > {{Caused by: java.lang.AssertionError: assertion failed: No plan for > EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project > [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as timestamp) > AS epoch#201, id#367L|#320L as double) / 30000.0), 0) * 30.0) as timestamp) > AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, > id#367L], > com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c > at scala.Predef$.assert(Predef.scala:170) at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }} > Note that in the logical plan I got *DataSourceV2Relation* and not > *StreamingDataSourceV2Relation*although I use streaming. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org