Hi, I am playing around with Spark structured streaming and we have a use case to use this as a CEP engine.
I am reading from 3 different kafka topics together. I want to perform windowing on this structured stream and then run some queries on this block on a sliding scale. Also, all of this needs to happen on the event time and I have my corresponding timestamp attached to each event. Now I have something like this /val windowedEvents = filteredBamEvents.withWatermark("timestamp", "10 minutes") .groupBy(functions.window($"timestamp", "10 minutes", "5 minutes"))/ where timestamp is of type long in my case class as follows: /case class BAMIngestedEvent(id: String, eventName: String, eventID: String, correlationID: Seq[String], timestamp: Long)/ But when I am running this example with my data from kafka I cam getting this following exception: Exception in thread "main" org.apache.spark.sql.AnalysisException: Event time must be defined on a window or a timestamp, but timestamp is of type bigint;; EventTimeWatermark timestamp#36: bigint, interval 10 minutes +- TypedFilter <function1>, class com.airplus.poc.edl.model.BAMIngestedEvent, [StructField(id,StringType,true), StructField(eventName,StringType,true), StructField(eventID,StringType,true), StructField(correlationID,ArrayType(StringType,true),true), StructField(timestamp,LongType,false)], newInstance(class com.airplus.poc.edl.model.BAMIngestedEvent) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).id, true) AS id#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventName, true) AS eventName#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventID, true) AS eventID#34, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String)), true), assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).correlationID) AS correlationID#35, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).timestamp AS timestamp#36L] +- MapElements <function1>, interface org.apache.spark.sql.Row, [StructField(value,StringType,true)], obj#31: com.airplus.poc.edl.model.BAMIngestedEvent +- DeserializeToObject createexternalrow(value#16.toString, StructField(value,StringType,true)), obj#30: org.apache.spark.sql.Row +- Project [value#16] +- Project [cast(key#0 as string) AS key#15, cast(value#1 as string) AS value#16] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@222acad,kafka,List(),None,List(),None,Map(startingOffsets -> latest, subscribe -> iom, edl, kafka.bootstrap.servers -> airpluspoc-hdp-dn0.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn1.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn2.germanycentral.cloudapp.microsoftazure.de:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:204) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:161) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2850) at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:571) at com.airplus.poc.edl.CEPForBAM$.main(CEPForBAM.scala:47) at com.airplus.poc.edl.CEPForBAM.main(CEPForBAM.scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-Exception-while-using-watermark-with-type-of-timestamp-tp28745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org