I am trying to run a Spark structured streaming program simulating basic
scenario of ingesting events and calculating aggregates on a window with
watermark, and I am observing an inordinate amount of disk IO Spark
performs.

The basic structure of the program is like this:

sparkSession = SparkSession.builder()
                           .appName(....)
                           .master("local[*]")
                           .config("spark.executor.memory", "8g")
                           .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
                           .config("spark.kryoserializer.buffer", "8m")
                           .config("spark.local.dir", ...local directory...)
                           .getOrCreate();

sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the app
...);

dataset = sparkSession.readStream()
                      .option("checkpointLocation", ... checkpoint dir for
source ...)
                      .format(MockStreamingSource.class.getName())
                      .load();

Dataset<Row> ds = dataset
                      .withWatermark("timestamp", "10 minutes")
                      .groupBy(
                              functions.window(functions.col("timestamp"),
"2 minutes"),
                              functions.col("source"))
                      .agg(
                              functions.avg("D0").as("AVG_D0"),
                              functions.avg("I0").as("AVG_I0"));

DataStreamWriter<Row> dsw = ds.writeStream()
                              // .trigger(Trigger.ProcessingTime("1
minute"))
                              .option("checkpointLocation", .. checkpoint
dir for writer ... );

dsw.outputMode(OutputMode.Append())
   .format("console")
   .option("truncate", "false")
   .option("numRows", Integer.MAX_VALUE)
   .start()
   .awaitTermination();


MockStreamingSource is just that -- a source intended to provide a
simulated input. It generates microbatches of mock events and sends them to
the app. In the testing scenario, the source simulates 20,000 devices each
sending an event every 15 seconds for 11.5 minutes of logical time (just
under 12 minutes of window size + watermark), for a total number of 920,000
events.

I initially started with microbatch sized to 500 events, and processing
performance was totally dismal because of disk IO. I then increased
microbatch size and performance got better, but still very poor. Microbatch
size now is 13,334 events per batch, this corresponds to ingestion interval
of 10 seconds. Smaller batches resulted in worse performance.

But even with microbatch sized 13,334 event performance is poor because of
excessive disk IO generated by Spark.
Just ingesting data generated intra-app takes the program physical time
equal to 40% of window size + watermark.

Using strace, I measured that checkpoint directory for the stream writer
receives the following number of Linux system calls:

create/open file = 60,500 calls
mkdir = 57,000
readlink = 59,000
unlink = 41,900
rename = 14,700
execve readlink=353,000 (incl. repetitive searches of readlink executable
in 6 different locations)
execve chmod=340,620 (incl. repetitive searches of chmod executable in 6
different locations)

In addition, Spark local directory received:

create/open file = 55,000 calls
unlink = 13,800
stat = 42,000

That's for mere 920,000 of small events (each event Row is 600 bytes when
in Java heap).

I also tried trigger(...) to see whether it can improve anything, but it
just made things worse.

Spark version 2.4.6.

Is this an expected amount of disk IO for Spark, or am I doing something
wrong and there is a way to avoid Spark generating such an amount of disk
IO?

Reply via email to