Hi, I don't have any code for the forEachBatch approach, I mentioned it due to this response to my question on SO: https://stackoverflow.com/a/65803718/1017130
I have added some very simple code below that I think shows what I'm trying to do: val schema = StructType( Array( StructField("senderId1", LongType), StructField("senderId2", LongType), StructField("destId1", LongType), StructField("eventType", IntegerType) StructField("cost", LongType) ) ) val fileStreamDf = spark.readStream.schema(schema).option("delimiter", "\t").csv("D:\\SparkTest") fileStreamDf.createOrReplaceTempView("myTable") spark.sql("SELECT senderId1, count(*) AS num_events FROM myTable GROUP BY senderId1 HAVING count(*) > 10000").writeStream.format("console").outputMode("complete").start() spark.sql("SELECT senderId2, sum(cost) AS total_cost FROM myTable WHERE eventType = 3 GROUP BY senderId2 HAVING sum(cost) > 500").writeStream.format("console").outputMode("complete").start() spark.sql("SELECT destId1, count(*) AS num_events WHERE event_type = 5 GROUP BY destId1 HAVING count(*) > 1000").writeStream.format("console").outputMode("complete").start() Of course, this is simplified; there are a lot more columns and the queries should also group by time period, but I didn't want to complicate it. With this example, I have 3 queries running on the same input files, but Spark would need to read the files from disk 3 times. These extra reads are what I'm trying to avoid. In the real application, the number of queries would be a lot higher and dynamic (they are generated in response to some configurations made by the end users). -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org