Hi,

I don't have any code for the forEachBatch approach, I mentioned it due to
this response to my question on SO:
https://stackoverflow.com/a/65803718/1017130

I have added some very simple code below that I think shows what I'm trying
to do:
val schema = StructType(
    Array(
        StructField("senderId1", LongType),
        StructField("senderId2", LongType),
        StructField("destId1", LongType),
    StructField("eventType", IntegerType)
    StructField("cost", LongType)
    )
)

val fileStreamDf = spark.readStream.schema(schema).option("delimiter",
"\t").csv("D:\\SparkTest")

fileStreamDf.createOrReplaceTempView("myTable")

spark.sql("SELECT senderId1, count(*) AS num_events FROM myTable GROUP BY
senderId1 HAVING count(*) >
10000").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT senderId2, sum(cost) AS total_cost FROM myTable WHERE
eventType = 3 GROUP BY senderId2 HAVING sum(cost) >
500").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT destId1, count(*) AS num_events WHERE event_type = 5 GROUP
BY destId1 HAVING count(*) >
1000").writeStream.format("console").outputMode("complete").start()

Of course, this is simplified; there are a lot more columns and the queries
should also group by time period, but I didn't want to complicate it.
With this example, I have 3 queries running on the same input files, but
Spark would need to read the files from disk 3 times. These extra reads are
what I'm trying to avoid.
In the real application, the number of queries would be a lot higher and
dynamic (they are generated in response to some configurations made by the
end users).



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to