[ 
https://issues.apache.org/jira/browse/SPARK-21565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107968#comment-16107968
 ] 

Amit Assudani commented on SPARK-21565:
---------------------------------------

I am not sure, does it mean we can not use watermark without windows - kind of 
on a microbatch level ? 

If that is the case, it should not work with current_timestamp and should have 
validation not to allow without windows.

> aggregate query fails with watermark on eventTime but works with watermark on 
> timestamp column generated by current_timestamp
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21565
>                 URL: https://issues.apache.org/jira/browse/SPARK-21565
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Amit Assudani
>
> *Short Description: *
> Aggregation query fails with eventTime as watermark column while works with 
> newTimeStamp column generated by running SQL with current_timestamp,
> *Exception:*
> Caused by: java.util.NoSuchElementException: None.get
>       at scala.None$.get(Option.scala:347)
>       at scala.None$.get(Option.scala:345)
>       at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:204)
>       at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:172)
>       at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
>       at 
> org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
>       at 
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> *Code to replicate:*
> package test
> import java.nio.file.{Files, Path, Paths}
> import java.text.SimpleDateFormat
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{SparkSession}
> import scala.collection.JavaConverters._
> object Test1 {
>   def main(args: Array[String]) {
>     val sparkSession = SparkSession
>       .builder()
>       .master("local[*]")
>       .appName("Spark SQL basic example")
>       .config("spark.some.config.option", "some-value")
>       .getOrCreate()
>     val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
>     val checkpointPath = "target/cp1"
>     val newEventsPath = Paths.get("target/newEvents/").toAbsolutePath
>     delete(newEventsPath)
>     delete(Paths.get(checkpointPath).toAbsolutePath)
>     Files.createDirectories(newEventsPath)
>     val dfNewEvents= newEvents(sparkSession)
>     dfNewEvents.createOrReplaceTempView("dfNewEvents")
>     //The below works - Start
> //    val dfNewEvents2 = sparkSession.sql("select *,current_timestamp as 
> newTimeStamp from dfNewEvents ").withWatermark("newTimeStamp","2 seconds")
> //    dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
> //    val groupEvents = sparkSession.sql("select symbol,newTimeStamp, 
> count(price) as count1 from dfNewEvents2 group by symbol,newTimeStamp")
>     // End
>     
>     
>     //The below doesn't work - Start
>     val dfNewEvents2 = sparkSession.sql("select * from dfNewEvents 
> ").withWatermark("eventTime","2 seconds")
>      dfNewEvents2.createOrReplaceTempView("dfNewEvents2")
>       val groupEvents = sparkSession.sql("select symbol,eventTime, 
> count(price) as count1 from dfNewEvents2 group by symbol,eventTime")
>     // - End
>     
>     
>     val query1 = groupEvents.writeStream
>       .outputMode("append")
>         .format("console")
>       .option("checkpointLocation", checkpointPath)
>       .start("./myop")
>     val newEventFile1=newEventsPath.resolve("eventNew1.json")
>     Files.write(newEventFile1, List(
>       """{"symbol": 
> "GOOG","price":100,"eventTime":"2017-07-25T16:00:00.000-04:00"}""",
>       """{"symbol": 
> "GOOG","price":200,"eventTime":"2017-07-25T16:00:00.000-04:00"}"""
>     ).toIterable.asJava)
>     query1.processAllAvailable()
>     sparkSession.streams.awaitAnyTermination(10000)
>   }
>   private def newEvents(sparkSession: SparkSession) = {
>     val newEvents = Paths.get("target/newEvents/").toAbsolutePath
>     delete(newEvents)
>     Files.createDirectories(newEvents)
>     val dfNewEvents = 
> sparkSession.readStream.schema(eventsSchema).json(newEvents.toString)//.withWatermark("eventTime","2
>  seconds")
>     dfNewEvents
>   }
>   private val eventsSchema = StructType(List(
>     StructField("symbol", StringType, true),
>     StructField("price", DoubleType, true),
>     StructField("eventTime", TimestampType, false)
>   ))
>   private def delete(dir: Path) = {
>     if(Files.exists(dir)) {
>       Files.walk(dir).iterator().asScala.toList
>         .map(p => p.toFile)
>         .sortWith((o1, o2) => o1.compareTo(o2) > 0)
>         .foreach(_.delete)
>     }
>   }
> }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to