Re: Excessive disk IO with Spark structured streaming

2020-11-05 Thread Jungtaek Lim
FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This
reduces the number of temp files for the state store to half when you use
streaming aggregation.

1. https://issues.apache.org/jira/browse/SPARK-30294

On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim 
wrote:

> I can't spend too much time on explaining one by one. I strongly encourage
> you to do a deep-dive instead of just looking around as you want to know
> about "details" - that's how open source works.
>
> I'll go through a general explanation instead of replying inline; probably
> I'd write a blog doc if there's no existing doc (I guess there should be
> one) instead of putting too much time here.
>
> In short, the reason Spark has to create these files "per micro-batch" is
> to ensure fault-tolerance. For example, If the query fails at batch 5 and
> you rerun the query, it should rerun batch 5. How?
>
> Spark should be aware the offsets the query has been read for batch 4,
> preferably the offsets the query read for batch 5. They're offsets/commits.
> State is for storing accumulated values on stateful operations. Same here
> - Spark should be able to read the state for batch 4 so that it can
> calculate the new accumulated values for batch 5. In addition, partition
> means max parallelism (they aren't aware of each other and they shouldn't),
> hence the state for partition should be stored individually.
>
> Storing 4 files (in the end we'll only have "2" files, but here I count
> temp files with crc files, as we are talking about performance aspect) per
> partition per micro-batch is the thing I already explained - I agree it's
> not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
> number of files by half. Probably we could propose Hadoop to skip creating
> CRC files (I'm not sure it can be simply done as of now), but Spark
> is conservative about upgrading the versions for dependencies so it might
> not be available soon even if we address it right away.
>
> As you've found here it's super important to find the right value of
> shuffle partitions. It's partitioned by hash function, so it strongly
> depends on the group key. If the cardinality of group key is low, probably
> the right value of shuffle partitions should be fairly small. Unfortunately
> once the query runs you can't change the value of shuffle partitions, as
> Spark doesn't have the feature of state migration once the number of
> partitions change. Either you need to predict the overall cardinality at
> specific time and set the right value, or try to use a 3rd party state
> tool. [2] (DISCLAIMER: I'm the author.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30294
> 2. https://github.com/HeartSaVioR/spark-state-tools
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev  wrote:
>
>> Hi Jungtaek,
>>
>> *> I meant the subdirectory inside the directory you're providing as
>> "checkpointLocation", as there're several directories in that directory...*
>>
>> There are two:
>>
>> *my-spark-checkpoint-dir/MainApp*
>> created by sparkSession.sparkContext().setCheckpointDir(> for the app>)
>> contains only empty subdir with GUID name
>>
>> *my-spark-checkpoint-dir/writer*
>> created by ds.writeStream().option("checkpointLocation", > for writer>)
>> contains all the files
>>
>> Within the latter ("writer") there are four subdirectories: commits,
>> metadata, offsets, state.
>>
>> Breakdown of file creations within them, per 69 microbatches (when
>> shuffle partition count = 200) is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 56232
>>
>> (Creation is identified by strace record for "openat" system call with
>> O_CREAT flag and file path in the corresponding directory.)
>>
>> When shuffle partition count is 10, breakdown of file creations within
>> them, per 69 microbatches, is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 2760
>>
>> *> The size of the delta file heavily depends on your stateful operation
>> and data in each micro-batch. delta file only captures the "changes" of
>> state in specific micro-batch, so there're cases you'll have very tiny
>> delta files, e.g. cardinality of grouped key is small (hence cardinality of
>> KVs is also small), small amount of inputs are provided per micro-batch,
>> the overall size of aggregated row is small, there's skew on grouped key
>> (hence some partitions get no input or small inputs), etc.*
>>
>>
>> In my case there is no key in the Row object (unless the bucketized
>> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
>> large enough: the whole problem is that Spark does not want to save the
>> microbatch as a single file. Even after I reduce the number of shuffle
>> partitions (see below), the number of files per microbatch remains
>> significantly larger than the number of shuffle partitions.
>>
>> ..
>>
>> When the number of shuffle partitions is 200, Spark creates 816 files
>> (per microbatch) in checkpoint store and 202 files 

Re: Excessive disk IO with Spark structured streaming

2020-10-07 Thread Jungtaek Lim
I can't spend too much time on explaining one by one. I strongly encourage
you to do a deep-dive instead of just looking around as you want to know
about "details" - that's how open source works.

I'll go through a general explanation instead of replying inline; probably
I'd write a blog doc if there's no existing doc (I guess there should be
one) instead of putting too much time here.

In short, the reason Spark has to create these files "per micro-batch" is
to ensure fault-tolerance. For example, If the query fails at batch 5 and
you rerun the query, it should rerun batch 5. How?

Spark should be aware the offsets the query has been read for batch 4,
preferably the offsets the query read for batch 5. They're offsets/commits.
State is for storing accumulated values on stateful operations. Same here -
Spark should be able to read the state for batch 4 so that it can calculate
the new accumulated values for batch 5. In addition, partition means max
parallelism (they aren't aware of each other and they shouldn't), hence the
state for partition should be stored individually.

Storing 4 files (in the end we'll only have "2" files, but here I count
temp files with crc files, as we are talking about performance aspect) per
partition per micro-batch is the thing I already explained - I agree it's
not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
number of files by half. Probably we could propose Hadoop to skip creating
CRC files (I'm not sure it can be simply done as of now), but Spark
is conservative about upgrading the versions for dependencies so it might
not be available soon even if we address it right away.

As you've found here it's super important to find the right value of
shuffle partitions. It's partitioned by hash function, so it strongly
depends on the group key. If the cardinality of group key is low, probably
the right value of shuffle partitions should be fairly small. Unfortunately
once the query runs you can't change the value of shuffle partitions, as
Spark doesn't have the feature of state migration once the number of
partitions change. Either you need to predict the overall cardinality at
specific time and set the right value, or try to use a 3rd party state
tool. [2] (DISCLAIMER: I'm the author.)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://github.com/HeartSaVioR/spark-state-tools


On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev  wrote:

> Hi Jungtaek,
>
> *> I meant the subdirectory inside the directory you're providing as
> "checkpointLocation", as there're several directories in that directory...*
>
> There are two:
>
> *my-spark-checkpoint-dir/MainApp*
> created by sparkSession.sparkContext().setCheckpointDir( for the app>)
> contains only empty subdir with GUID name
>
> *my-spark-checkpoint-dir/writer*
> created by ds.writeStream().option("checkpointLocation",  for writer>)
> contains all the files
>
> Within the latter ("writer") there are four subdirectories: commits,
> metadata, offsets, state.
>
> Breakdown of file creations within them, per 69 microbatches (when shuffle
> partition count = 200) is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 56232
>
> (Creation is identified by strace record for "openat" system call with
> O_CREAT flag and file path in the corresponding directory.)
>
> When shuffle partition count is 10, breakdown of file creations within
> them, per 69 microbatches, is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 2760
>
> *> The size of the delta file heavily depends on your stateful operation
> and data in each micro-batch. delta file only captures the "changes" of
> state in specific micro-batch, so there're cases you'll have very tiny
> delta files, e.g. cardinality of grouped key is small (hence cardinality of
> KVs is also small), small amount of inputs are provided per micro-batch,
> the overall size of aggregated row is small, there's skew on grouped key
> (hence some partitions get no input or small inputs), etc.*
>
>
> In my case there is no key in the Row object (unless the bucketized
> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
> large enough: the whole problem is that Spark does not want to save the
> microbatch as a single file. Even after I reduce the number of shuffle
> partitions (see below), the number of files per microbatch remains
> significantly larger than the number of shuffle partitions.
>
> ..
>
> When the number of shuffle partitions is 200, Spark creates 816 files (per
> microbatch) in checkpoint store and 202 files in Spark local-dir.
>
> Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
> delta files.
> The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
> Of local-dir files: 200 temp_shuffle files per microbatch (as expected)
> and 2 other files (shuffle.data+shuffle.index).
>
> If I reduce the number of shuffle partitions, two things happen:
> - Throughput of a single pipeline 

Re: Excessive disk IO with Spark structured streaming

2020-10-07 Thread Sergey Oboguev
 Hi Jungtaek,

*> I meant the subdirectory inside the directory you're providing as
"checkpointLocation", as there're several directories in that directory...*

There are two:

*my-spark-checkpoint-dir/MainApp*
created by sparkSession.sparkContext().setCheckpointDir()
contains only empty subdir with GUID name

*my-spark-checkpoint-dir/writer*
created by ds.writeStream().option("checkpointLocation", )
contains all the files

Within the latter ("writer") there are four subdirectories: commits,
metadata, offsets, state.

Breakdown of file creations within them, per 69 microbatches (when shuffle
partition count = 200) is:

commits = 136
metadata = 0
offsets = 138
state = 56232

(Creation is identified by strace record for "openat" system call with
O_CREAT flag and file path in the corresponding directory.)

When shuffle partition count is 10, breakdown of file creations within
them, per 69 microbatches, is:

commits = 136
metadata = 0
offsets = 138
state = 2760

*> The size of the delta file heavily depends on your stateful operation
and data in each micro-batch. delta file only captures the "changes" of
state in specific micro-batch, so there're cases you'll have very tiny
delta files, e.g. cardinality of grouped key is small (hence cardinality of
KVs is also small), small amount of inputs are provided per micro-batch,
the overall size of aggregated row is small, there's skew on grouped key
(hence some partitions get no input or small inputs), etc.*


In my case there is no key in the Row object (unless the bucketized
"timestamp" for 2-min windows buckets becomes a key), and the microbatch is
large enough: the whole problem is that Spark does not want to save the
microbatch as a single file. Even after I reduce the number of shuffle
partitions (see below), the number of files per microbatch remains
significantly larger than the number of shuffle partitions.

..

When the number of shuffle partitions is 200, Spark creates 816 files (per
microbatch) in checkpoint store and 202 files in Spark local-dir.

Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
delta files.
The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
Of local-dir files: 200 temp_shuffle files per microbatch (as expected) and
2 other files (shuffle.data+shuffle.index).

If I reduce the number of shuffle partitions, two things happen:
- Throughput of a single pipeline improves.
- CPU usage by the pipeline is reduced (allowing a single node to co-run
larger number of pipelines).
Most of the improvements are gained by the time the number of partitions is
reduced to 5-10.
Going below that, further improvements are marginal.

When reducing the number of shuffle partitions from 200 to 10, physical
latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU
usage is reduced 2.6 times.

When reducing the number of shuffle partitions from 200 to 5, physical
latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU
usage is reduced 4.5 times.

Still, latency remains high, because the number of created files per
microbatch remains high.

..

With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in
checkpoint store and 6.9 files in Spark local-dir.
Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are
delta files.
Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected)
and 2 other files (shuffle.data+shuffle.index).

Why would Spark need to create 20 delta files per microbatch, or to put it
another way: 4 delta files per microbatch per shuffle partition?

One could try to guess this could be due to changing "timestamp", but this
does not bear out. In my produced stream (69 microbatches) there are only
46 distinct values for timestamp, consecutively increasing from first
timestamp to last. Thus lots of microbatches will have just one timestamp
value. On average, microbatch will have 1.5 distinct timestamp values. But
it would, of course, be terribly wrong for Spark to use raw timestamp value
as a key, as in real world almost every event would have a unique
timestamp, so the number of files required for saving by timestamp as a key
would be insane, hopefully Spark does not attempt to do that. But perhaps
it may use the index of 2-minute window bucket as a key. If so, there are
only 6 distinct values per the whole event set (I have window size set to 2
minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90%
of microbatches will fall wholly in just one window bucket, and 10% in two
buckets. So why 4 delta files per microbatch per shuffle partition?

..

For the completeness of the picture, if I run the test with the shuffle
partition count set to 1, then:
Spark creates 8 files (per microbatch) in the checkpoint store and 3 files
in Spark local-dir.
Of checkpoint files: 0.03 per microbatch are snapshot files (only 2
snapshot files in the whole run), and 4 delta files per microbatch.
Of 

Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
Replied inline.

On Tue, Oct 6, 2020 at 6:07 AM Sergey Oboguev  wrote:

> Hi Jungtaek,
>
> Thanks for your response.
>
> *> you'd want to dive inside the checkpoint directory and have separate
> numbers per top-subdirectory*
>
> All the checkpoint store numbers are solely for the subdirectory set by
> option("checkpointLocation", .. checkpoint dir for writer ... )
>

>
> Other subdirectories are empty or nearly-empty.
>

I meant the subdirectory inside the directory you're providing as
"checkpointLocation", as there're several directories in that directory,
and they exist for different purposes. It'd be nice if we can determine
whether the issue is all around these directories or specific to a
directory.


>
> *> First of all, you'd want to divide these numbers by the number of
> micro-batches, as file creations in checkpoint directory would occur
> similarly per micro-batch*
>
> There were 69 microbatches, each containing 13,334 events.
>
> Event's Row object size in Java heap is 620 bytes, thus the total amount
> of data in a microbatch (in terms of aggregate Java-heap objects sizes) is
> 8.3 MB.
>
> Average number of system calls per microbatch was:
>
> For query (writer) checkpoint directory:
>
> create/open file = 877
> mkdir = 826
> readlink = 855
> unlink = 607
> rename = 213
> execve readlink = 5116
> execve chmod = 4937
>
> For Spark local directory:
>
> create/open file = 797
> unlink = 200
> mmap = 197
> stat = 2391
>
> (The number for local.stat in the previous message was incorrect).
>
> Physical processing time per microbatch was 3.4 seconds.
>
> That's to store a mere 8.3 MB of uncompressed (Java-heap) data!
>
> Most created "delta" files have file size in the order of 1 KB or less.
> "Snapshot" files are several KB in size.
> One would think that the tiny size of created files is one key factor in
> dismal performance. It causes a very high number of system calls and also
> hugely fragments actual data IO.
>

The size of the delta file heavily depends on your stateful operation and
data in each micro-batch. delta file only captures the "changes" of state
in specific micro-batch, so there're cases you'll have very tiny delta
files, e.g. cardinality of grouped key is small (hence cardinality of KVs
is also small), small amount of inputs are provided per micro-batch, the
overall size of aggregated row is small, there's skew on grouped key (hence
some partitions get no input or small inputs), etc.

The snapshot files will be getting bigger as it contains all of the state
KVs at the specific micro-batch, so you may not want to worry about that
being small.


>
> As a result, using iostat, typical disk write rate was observed only ~ 100
> KB/s.
> (Read rate was near-zero, presumably because all data was in Linux block
> cache.)
>
> Average CPU usage when ingesting data was in the order of 600% (i.e. 6
> cores busy), I presume chiefly for serialization/deserialization, even
> though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most
> immediate limiting factor must have been not CPU saturation but IO latency
> (unless there is some obscure setting limiting the number of
> reading/writing threads). The latency arising, fundamentally, out of very
> large number of tiny files.
>
> Is there a way to control the size of checkpoint "delta" and "snapshot"
> files Spark creates, to make them larger?
>

Unfortunately no - it's used for fault-tolerance guarantee (stateful
exactly-once) per micro-batch. All stateful operations should write a delta
file per shuffle partition (spark.sql.shuffle.partitions) per micro-batch.

The default value of shuffle partitions is 200, hence in each microbatch
the query will create 200 files for each state store by default. (You can
reduce this value from the start of the streaming query, so that's a thing
you can tweak.) In reality, you still need to multiply it by 4, as there's
also a crc file per file if HDFS API picks the filesystem as checksum file
system, as well as Spark creates two files (read/write) for streaming
aggregation. (I hope SPARK-30294 would address it - after that we no longer
need to multiply by 2 because of read/write purpose.)

The only way for now to have less small files is increasing the interval of
micro-batch, which may bring another concern, batch size (and output size)
and output latency. That is a downside compared to what streaming
frameworks provide - in streaming frameworks, having a longer interval of
checkpoint only affects the amount of data to restore when failing. If you
expect end-to-end exactly-once then output latency is also affected, but
it's an option end users can tolerate.

Probably micro-batch could also decouple micro-batch interval and
checkpoint interval to provide flexibility, say, I can tolerate
reprocessing up to 10 mins of data being processed when fail occurs, but
due to the output latency I should have micro-batch interval as 30 seconds.
(In other words, do a checkpoint per around 20 micro-batches.) 

Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Sergey Oboguev
 Hi Jungtaek,

Thanks for your response.

*> you'd want to dive inside the checkpoint directory and have separate
numbers per top-subdirectory*

All the checkpoint store numbers are solely for the subdirectory set by
option("checkpointLocation", .. checkpoint dir for writer ... )

Other subdirectories are empty or nearly-empty.

*> First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch*

There were 69 microbatches, each containing 13,334 events.

Event's Row object size in Java heap is 620 bytes, thus the total amount of
data in a microbatch (in terms of aggregate Java-heap objects sizes) is 8.3
MB.

Average number of system calls per microbatch was:

For query (writer) checkpoint directory:

create/open file = 877
mkdir = 826
readlink = 855
unlink = 607
rename = 213
execve readlink = 5116
execve chmod = 4937

For Spark local directory:

create/open file = 797
unlink = 200
mmap = 197
stat = 2391

(The number for local.stat in the previous message was incorrect).

Physical processing time per microbatch was 3.4 seconds.

That's to store a mere 8.3 MB of uncompressed (Java-heap) data!

Most created "delta" files have file size in the order of 1 KB or less.
"Snapshot" files are several KB in size.
One would think that the tiny size of created files is one key factor in
dismal performance. It causes a very high number of system calls and also
hugely fragments actual data IO.

As a result, using iostat, typical disk write rate was observed only ~ 100
KB/s.
(Read rate was near-zero, presumably because all data was in Linux block
cache.)

Average CPU usage when ingesting data was in the order of 600% (i.e. 6
cores busy), I presume chiefly for serialization/deserialization, even
though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most
immediate limiting factor must have been not CPU saturation but IO latency
(unless there is some obscure setting limiting the number of
reading/writing threads). The latency arising, fundamentally, out of very
large number of tiny files.

Is there a way to control the size of checkpoint "delta" and "snapshot"
files Spark creates, to make them larger?
And the same also for the files in Spark local directory?

* * *

The numbers for checkpoint directory are, of course, captured when it was
set to a local drive (or Lustre/NFS.).

For HDFS there are obviously no local file system calls for the checkpoint
store, as HDFS does not present itself as an OS-level file system.
Nevertheless the name of checkpoint directory was transmitted over HDFS
connection socket 1,675 times per microbatch, so the number of high-level
HDFS file operations must have been at least that high.

* * *

On a related note, for 920,000 events Spark made 700,000 attempts to
execute chmod or readlink program, i.e. to launch an external subprocess
with an executable in order to perform a file operation. Those 900,000
attempts actually represent 150,000 cycles, and in each cycle Spark tried
to launch the program from 6 different locations (/usr/local/sbin ->
/usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it
finally finds it in the last. But then on the next cycle Spark/Hadoop does
not re-use the knowledge of a previously found utility location, and
repeats the search from the very start causing useless file system search
operations over and over again.

This may or may not matter when HDFS is used for checkpoint store
(depending on how HDFS server implements the calls), but it does matter
when a file system like Lustre or NFS is used for checkpoint storage.
(Not to mention spawning readlink and chmod does not seem like a bright
idea in the first place, although perhaps there might be a reason why
Hadoop layer does it this way).

Thanks,
Sergey

On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim 
wrote:

> First of all, you'd want to divide these numbers by the number of
> micro-batches, as file creations in checkpoint directory would occur
> similarly per micro-batch.
> Second, you'd want to dive inside the checkpoint directory and have
> separate numbers per top-subdirectory.
>
> After that we can see whether the value would make sense or not.
>
> Regarding file I/O issues on SS, two issues I know about are:
> 1) If you use streaming aggregation, it unnecessarily creates a temporary
> file for both read and write on the state store, while the file is only
> needed for writing. That makes the number of file creations to be 2x. The
> patch is proposed under SPARK-30294 [1].
>
> 2) Spark leverages HDFS API which is configured to create crc file per
> file by default. (So you'll have 2x files than expected.) There's a bug in
> HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename
> (in short of how checkpoint works in SS, temp file is atomically renamed to
> be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
> delete the crc file which two 

Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have
separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary
file for both read and write on the state store, while the file is only
needed for writing. That makes the number of file creations to be 2x. The
patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file
by default. (So you'll have 2x files than expected.) There's a bug in HDFS
API (HADOOP-16255 [2]) which missed to handle crc files during rename (in
short of how checkpoint works in SS, temp file is atomically renamed to be
the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
delete the crc file which two additional operations (exist -> delete) may
occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://issues.apache.org/jira/browse/HADOOP-16255
3. https://issues.apache.org/jira/browse/SPARK-28025


On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev  wrote:

> I am trying to run a Spark structured streaming program simulating basic
> scenario of ingesting events and calculating aggregates on a window with
> watermark, and I am observing an inordinate amount of disk IO Spark
> performs.
>
> The basic structure of the program is like this:
>
> sparkSession = SparkSession.builder()
>.appName()
>.master("local[*]")
>.config("spark.executor.memory", "8g")
>.config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>.config("spark.kryoserializer.buffer", "8m")
>.config("spark.local.dir", ...local
> directory...)
>.getOrCreate();
>
> sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the
> app ...);
>
> dataset = sparkSession.readStream()
>   .option("checkpointLocation", ... checkpoint dir for
> source ...)
>   .format(MockStreamingSource.class.getName())
>   .load();
>
> Dataset ds = dataset
>   .withWatermark("timestamp", "10 minutes")
>   .groupBy(
>   functions.window(functions.col("timestamp"),
> "2 minutes"),
>   functions.col("source"))
>   .agg(
>   functions.avg("D0").as("AVG_D0"),
>   functions.avg("I0").as("AVG_I0"));
>
> DataStreamWriter dsw = ds.writeStream()
>   // .trigger(Trigger.ProcessingTime("1
> minute"))
>   .option("checkpointLocation", .. checkpoint
> dir for writer ... );
>
> dsw.outputMode(OutputMode.Append())
>.format("console")
>.option("truncate", "false")
>.option("numRows", Integer.MAX_VALUE)
>.start()
>.awaitTermination();
>
>
> MockStreamingSource is just that -- a source intended to provide a
> simulated input. It generates microbatches of mock events and sends them to
> the app. In the testing scenario, the source simulates 20,000 devices each
> sending an event every 15 seconds for 11.5 minutes of logical time (just
> under 12 minutes of window size + watermark), for a total number of 920,000
> events.
>
> I initially started with microbatch sized to 500 events, and processing
> performance was totally dismal because of disk IO. I then increased
> microbatch size and performance got better, but still very poor. Microbatch
> size now is 13,334 events per batch, this corresponds to ingestion interval
> of 10 seconds. Smaller batches resulted in worse performance.
>
> But even with microbatch sized 13,334 event performance is poor because of
> excessive disk IO generated by Spark.
> Just ingesting data generated intra-app takes the program physical time
> equal to 40% of window size + watermark.
>
> Using strace, I measured that checkpoint directory for the stream writer
> receives the following number of Linux system calls:
>
> create/open file = 60,500 calls
> mkdir = 57,000
> readlink = 59,000
> unlink = 41,900
> rename = 14,700
> execve readlink=353,000 (incl. repetitive searches of readlink executable
> in 6 different locations)
> execve chmod=340,620 (incl. repetitive searches of chmod executable in 6
> different locations)
>
> In addition, Spark local directory received:
>
> create/open file = 55,000 calls
> unlink = 13,800
> stat = 42,000
>
> That's for mere 920,000 of small events (each event Row is 600 bytes when
> in Java