[ https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107968#comment-16107968 ]
Amit Assudani commented on SPARK-21565: --------------------------------------- I am not sure, does it mean we can not use watermark without windows - kind of on a microbatch level ? If that is the case, it should not work with current_timestamp and should have validation not to allow without windows. > aggregate query fails with watermark on eventTime but works with watermark on > timestamp column generated by current_timestamp > ----------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-21565 > URL: https://issues.apache.org/jira/browse/SPARK-21565 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Amit Assudani > > *Short Description: * > Aggregation query fails with eventTime as watermark column while works with > newTimeStamp column generated by running SQL with current_timestamp, > *Exception:* > Caused by: java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70) > at > org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > *Code to replicate:* > package test > import java.nio.file.{Files, Path, Paths} > import java.text.SimpleDateFormat > import org.apache.spark.sql.types._ > import org.apache.spark.sql.{SparkSession} > import scala.collection.JavaConverters._ > object Test1 { > def main(args: Array[String]) { > val sparkSession = SparkSession > .builder() > .master("local[*]") > .appName("Spark SQL basic example") > .config("spark.some.config.option", "some-value") > .getOrCreate() > val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") > val checkpointPath = "target/cp1" > val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath > delete(newEventsPath) > delete(Paths.get(checkpointPath).toAbsolutePath) > Files.createDirectories(newEventsPath) > val dfNewEvents= newEvents(sparkSession) > dfNewEvents.createOrReplaceTempView("dfNewEvents") > //The below works - Start > // val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as > newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds") > // dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > // val groupEvents = sparkSession.sql("select symbol,newTimeStamp, > count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp") > // End > > > //The below doesn't work - Start > val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents > ").withWatermark("eventTime","2 seconds") > dfNewEvents2.createOrReplaceTempView("dfNewEvents2") > val groupEvents = sparkSession.sql("select symbol,eventTime, > count(price) as count1 from dfNewEvents2 group by symbol,eventTime") > // - End > > > val query1 = groupEvents.writeStream > .outputMode("append") > .format("console") > .option("checkpointLocation", checkpointPath) > .start("./myop") > val newEventFile1=newEventsPath.resolve("eventNew1.json") > Files.write(newEventFile1, List( > """{"symbol": > "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""", > """{"symbol": > "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}""" > ).toIterable.asJava) > query1.processAllAvailable() > sparkSession.streams.awaitAnyTermination(10000) > } > private def newEvents(sparkSession: SparkSession) = { > val newEvents = Paths.get("target/newEvents/").toAbsolutePath > delete(newEvents) > Files.createDirectories(newEvents) > val dfNewEvents = > sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2 > seconds") > dfNewEvents > } > private val eventsSchema = StructType(List( > StructField("symbol", StringType, true), > StructField("price", DoubleType, true), > StructField("eventTime", TimestampType, false) > )) > private def delete(dir: Path) = { > if(Files.exists(dir)) { > Files.walk(dir).iterator().asScala.toList > .map(p => p.toFile) > .sortWith((o1, o2) => o1.compareTo(o2) > 0) > .foreach(_.delete) > } > } > } -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org