[jira] [Assigned] (SPARK-20373) Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute
[ https://issues.apache.org/jira/browse/SPARK-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-20373: Assignee: Genmao Yu > Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute > --- > > Key: SPARK-20373 > URL: https://issues.apache.org/jira/browse/SPARK-20373 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0, 2.2.0 >Reporter: Tathagata Das >Assignee: Genmao Yu >Priority: Minor > Fix For: 2.2.1, 2.3.0 > > > Any Dataset/DataFrame batch query with the operation `withWatermark` does not > execute because the batch planner does not have any rule to explicitly handle > the EventTimeWatermark logical plan. The right solution is to simply remove > the plan node, as the watermark should not affect any batch query in any way. > {code} > from pyspark.sql.functions import * > eventsDF = spark.createDataFrame([("2016-03-11 09:00:07", "dev1", > 123)]).toDF("eventTime", "deviceId", > "signal").select(col("eventTime").cast("timestamp").alias("eventTime"), > "deviceId", "signal") > windowedCountsDF = \ > eventsDF \ > .withWatermark("eventTime", "10 minutes") \ > .groupBy( > "deviceId", > window("eventTime", "5 minutes")) \ > .count() > windowedCountsDF.collect() > {code} > This throws as an error > {code} > java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark > eventTime#3762657: timestamp, interval 10 minutes > +- Project [cast(_1#3762643 as timestamp) AS eventTime#3762657, _2#3762644 AS > deviceId#3762651] >+- LogicalRDD [_1#3762643, _2#3762644, _3#3762645L] > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at
[jira] [Assigned] (SPARK-20373) Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute
[ https://issues.apache.org/jira/browse/SPARK-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20373: Assignee: Apache Spark > Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute > --- > > Key: SPARK-20373 > URL: https://issues.apache.org/jira/browse/SPARK-20373 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0, 2.2.0 >Reporter: Tathagata Das >Assignee: Apache Spark >Priority: Minor > > Any Dataset/DataFrame batch query with the operation `withWatermark` does not > execute because the batch planner does not have any rule to explicitly handle > the EventTimeWatermark logical plan. The right solution is to simply remove > the plan node, as the watermark should not affect any batch query in any way. > {code} > from pyspark.sql.functions import * > eventsDF = spark.createDataFrame([("2016-03-11 09:00:07", "dev1", > 123)]).toDF("eventTime", "deviceId", > "signal").select(col("eventTime").cast("timestamp").alias("eventTime"), > "deviceId", "signal") > windowedCountsDF = \ > eventsDF \ > .withWatermark("eventTime", "10 minutes") \ > .groupBy( > "deviceId", > window("eventTime", "5 minutes")) \ > .count() > windowedCountsDF.collect() > {code} > This throws as an error > {code} > java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark > eventTime#3762657: timestamp, interval 10 minutes > +- Project [cast(_1#3762643 as timestamp) AS eventTime#3762657, _2#3762644 AS > deviceId#3762651] >+- LogicalRDD [_1#3762643, _2#3762644, _3#3762645L] > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at
[jira] [Assigned] (SPARK-20373) Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute
[ https://issues.apache.org/jira/browse/SPARK-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20373: Assignee: (was: Apache Spark) > Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute > --- > > Key: SPARK-20373 > URL: https://issues.apache.org/jira/browse/SPARK-20373 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0, 2.2.0 >Reporter: Tathagata Das >Priority: Minor > > Any Dataset/DataFrame batch query with the operation `withWatermark` does not > execute because the batch planner does not have any rule to explicitly handle > the EventTimeWatermark logical plan. The right solution is to simply remove > the plan node, as the watermark should not affect any batch query in any way. > {code} > from pyspark.sql.functions import * > eventsDF = spark.createDataFrame([("2016-03-11 09:00:07", "dev1", > 123)]).toDF("eventTime", "deviceId", > "signal").select(col("eventTime").cast("timestamp").alias("eventTime"), > "deviceId", "signal") > windowedCountsDF = \ > eventsDF \ > .withWatermark("eventTime", "10 minutes") \ > .groupBy( > "deviceId", > window("eventTime", "5 minutes")) \ > .count() > windowedCountsDF.collect() > {code} > This throws as an error > {code} > java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark > eventTime#3762657: timestamp, interval 10 minutes > +- Project [cast(_1#3762643 as timestamp) AS eventTime#3762657, _2#3762644 AS > deviceId#3762651] >+- LogicalRDD [_1#3762643, _2#3762644, _3#3762645L] > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) > at > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at >