Hi Jungtaek,

Thanks for your response.

*> you'd want to dive inside the checkpoint directory and have separate
numbers per top-subdirectory*

All the checkpoint store numbers are solely for the subdirectory set by
option("checkpointLocation", .. checkpoint dir for writer ... )

Other subdirectories are empty or nearly-empty.

*> First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch*

There were 69 microbatches, each containing 13,334 events.

Event's Row object size in Java heap is 620 bytes, thus the total amount of
data in a microbatch (in terms of aggregate Java-heap objects sizes) is 8.3
MB.

Average number of system calls per microbatch was:

For query (writer) checkpoint directory:

create/open file = 877
mkdir = 826
readlink = 855
unlink = 607
rename = 213
execve readlink = 5116
execve chmod = 4937

For Spark local directory:

create/open file = 797
unlink = 200
mmap = 197
stat = 2391

(The number for local.stat in the previous message was incorrect).

Physical processing time per microbatch was 3.4 seconds.

That's to store a mere 8.3 MB of uncompressed (Java-heap) data!

Most created "delta" files have file size in the order of 1 KB or less.
"Snapshot" files are several KB in size.
One would think that the tiny size of created files is one key factor in
dismal performance. It causes a very high number of system calls and also
hugely fragments actual data IO.

As a result, using iostat, typical disk write rate was observed only ~ 100
KB/s.
(Read rate was near-zero, presumably because all data was in Linux block
cache.)

Average CPU usage when ingesting data was in the order of 600% (i.e. 6
cores busy), I presume chiefly for serialization/deserialization, even
though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most
immediate limiting factor must have been not CPU saturation but IO latency
(unless there is some obscure setting limiting the number of
reading/writing threads). The latency arising, fundamentally, out of very
large number of tiny files.

Is there a way to control the size of checkpoint "delta" and "snapshot"
files Spark creates, to make them larger?
And the same also for the files in Spark local directory?

* * *

The numbers for checkpoint directory are, of course, captured when it was
set to a local drive (or Lustre/NFS.).

For HDFS there are obviously no local file system calls for the checkpoint
store, as HDFS does not present itself as an OS-level file system.
Nevertheless the name of checkpoint directory was transmitted over HDFS
connection socket 1,675 times per microbatch, so the number of high-level
HDFS file operations must have been at least that high.

* * *

On a related note, for 920,000 events Spark made 700,000 attempts to
execute chmod or readlink program, i.e. to launch an external subprocess
with an executable in order to perform a file operation. Those 900,000
attempts actually represent 150,000 cycles, and in each cycle Spark tried
to launch the program from 6 different locations (/usr/local/sbin ->
/usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it
finally finds it in the last. But then on the next cycle Spark/Hadoop does
not re-use the knowledge of a previously found utility location, and
repeats the search from the very start causing useless file system search
operations over and over again.

This may or may not matter when HDFS is used for checkpoint store
(depending on how HDFS server implements the calls), but it does matter
when a file system like Lustre or NFS is used for checkpoint storage.
(Not to mention spawning readlink and chmod does not seem like a bright
idea in the first place, although perhaps there might be a reason why
Hadoop layer does it this way).

Thanks,
Sergey

On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> First of all, you'd want to divide these numbers by the number of
> micro-batches, as file creations in checkpoint directory would occur
> similarly per micro-batch.
> Second, you'd want to dive inside the checkpoint directory and have
> separate numbers per top-subdirectory.
>
> After that we can see whether the value would make sense or not.
>
> Regarding file I/O issues on SS, two issues I know about are:
> 1) If you use streaming aggregation, it unnecessarily creates a temporary
> file for both read and write on the state store, while the file is only
> needed for writing. That makes the number of file creations to be 2x. The
> patch is proposed under SPARK-30294 [1].
>
> 2) Spark leverages HDFS API which is configured to create crc file per
> file by default. (So you'll have 2x files than expected.) There's a bug in
> HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename
> (in short of how checkpoint works in SS, temp file is atomically renamed to
> be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
> delete the crc file which two additional operations (exist -> delete) may
> occur per crc file.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30294
> 2. https://issues.apache.org/jira/browse/HADOOP-16255
> 3. https://issues.apache.org/jira/browse/SPARK-28025
>
>
> On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev <obog...@gmail.com> wrote:
>
>> I am trying to run a Spark structured streaming program simulating basic
>> scenario of ingesting events and calculating aggregates on a window with
>> watermark, and I am observing an inordinate amount of disk IO Spark
>> performs.
>>
>> The basic structure of the program is like this:
>>
>> sparkSession = SparkSession.builder()
>>                            .appName(....)
>>                            .master("local[*]")
>>                            .config("spark.executor.memory", "8g")
>>                            .config("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>                            .config("spark.kryoserializer.buffer", "8m")
>>                            .config("spark.local.dir", ...local
>> directory...)
>>                            .getOrCreate();
>>
>> sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the
>> app ...);
>>
>> dataset = sparkSession.readStream()
>>                       .option("checkpointLocation", ... checkpoint dir
>> for source ...)
>>                       .format(MockStreamingSource.class.getName())
>>                       .load();
>>
>> Dataset<Row> ds = dataset
>>                       .withWatermark("timestamp", "10 minutes")
>>                       .groupBy(
>>
>> functions.window(functions.col("timestamp"), "2 minutes"),
>>                               functions.col("source"))
>>                       .agg(
>>                               functions.avg("D0").as("AVG_D0"),
>>                               functions.avg("I0").as("AVG_I0"));
>>
>> DataStreamWriter<Row> dsw = ds.writeStream()
>>                               // .trigger(Trigger.ProcessingTime("1
>> minute"))
>>                               .option("checkpointLocation", .. checkpoint
>> dir for writer ... );
>>
>> dsw.outputMode(OutputMode.Append())
>>    .format("console")
>>    .option("truncate", "false")
>>    .option("numRows", Integer.MAX_VALUE)
>>    .start()
>>    .awaitTermination();
>>
>>
>> MockStreamingSource is just that -- a source intended to provide a
>> simulated input. It generates microbatches of mock events and sends them to
>> the app. In the testing scenario, the source simulates 20,000 devices each
>> sending an event every 15 seconds for 11.5 minutes of logical time (just
>> under 12 minutes of window size + watermark), for a total number of 920,000
>> events.
>>
>> I initially started with microbatch sized to 500 events, and processing
>> performance was totally dismal because of disk IO. I then increased
>> microbatch size and performance got better, but still very poor. Microbatch
>> size now is 13,334 events per batch, this corresponds to ingestion interval
>> of 10 seconds. Smaller batches resulted in worse performance.
>>
>> But even with microbatch sized 13,334 event performance is poor because
>> of excessive disk IO generated by Spark.
>> Just ingesting data generated intra-app takes the program physical time
>> equal to 40% of window size + watermark.
>>
>> Using strace, I measured that checkpoint directory for the stream writer
>> receives the following number of Linux system calls:
>>
>> create/open file = 60,500 calls
>> mkdir = 57,000
>> readlink = 59,000
>> unlink = 41,900
>> rename = 14,700
>> execve readlink=353,000 (incl. repetitive searches of readlink executable
>> in 6 different locations)
>> execve chmod=340,620 (incl. repetitive searches of chmod executable in 6
>> different locations)
>>
>> In addition, Spark local directory received:
>>
>> create/open file = 55,000 calls
>> unlink = 13,800
>> stat = 42,000
>>
>> That's for mere 920,000 of small events (each event Row is 600 bytes when
>> in Java heap).
>>
>> I also tried trigger(...) to see whether it can improve anything, but it
>> just made things worse.
>>
>> Spark version 2.4.6.
>>
>> Is this an expected amount of disk IO for Spark, or am I doing something
>> wrong and there is a way to avoid Spark generating such an amount of disk
>> IO?
>>
>

Reply via email to