I am trying to run a Spark structured streaming program simulating basic scenario of ingesting events and calculating aggregates on a window with watermark, and I am observing an inordinate amount of disk IO Spark performs.
The basic structure of the program is like this: sparkSession = SparkSession.builder() .appName(....) .master("local[*]") .config("spark.executor.memory", "8g") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer", "8m") .config("spark.local.dir", ...local directory...) .getOrCreate(); sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the app ...); dataset = sparkSession.readStream() .option("checkpointLocation", ... checkpoint dir for source ...) .format(MockStreamingSource.class.getName()) .load(); Dataset<Row> ds = dataset .withWatermark("timestamp", "10 minutes") .groupBy( functions.window(functions.col("timestamp"), "2 minutes"), functions.col("source")) .agg( functions.avg("D0").as("AVG_D0"), functions.avg("I0").as("AVG_I0")); DataStreamWriter<Row> dsw = ds.writeStream() // .trigger(Trigger.ProcessingTime("1 minute")) .option("checkpointLocation", .. checkpoint dir for writer ... ); dsw.outputMode(OutputMode.Append()) .format("console") .option("truncate", "false") .option("numRows", Integer.MAX_VALUE) .start() .awaitTermination(); MockStreamingSource is just that -- a source intended to provide a simulated input. It generates microbatches of mock events and sends them to the app. In the testing scenario, the source simulates 20,000 devices each sending an event every 15 seconds for 11.5 minutes of logical time (just under 12 minutes of window size + watermark), for a total number of 920,000 events. I initially started with microbatch sized to 500 events, and processing performance was totally dismal because of disk IO. I then increased microbatch size and performance got better, but still very poor. Microbatch size now is 13,334 events per batch, this corresponds to ingestion interval of 10 seconds. Smaller batches resulted in worse performance. But even with microbatch sized 13,334 event performance is poor because of excessive disk IO generated by Spark. Just ingesting data generated intra-app takes the program physical time equal to 40% of window size + watermark. Using strace, I measured that checkpoint directory for the stream writer receives the following number of Linux system calls: create/open file = 60,500 calls mkdir = 57,000 readlink = 59,000 unlink = 41,900 rename = 14,700 execve readlink=353,000 (incl. repetitive searches of readlink executable in 6 different locations) execve chmod=340,620 (incl. repetitive searches of chmod executable in 6 different locations) In addition, Spark local directory received: create/open file = 55,000 calls unlink = 13,800 stat = 42,000 That's for mere 920,000 of small events (each event Row is 600 bytes when in Java heap). I also tried trigger(...) to see whether it can improve anything, but it just made things worse. Spark version 2.4.6. Is this an expected amount of disk IO for Spark, or am I doing something wrong and there is a way to avoid Spark generating such an amount of disk IO?