to summarize, this was an issue of selecting serialized representations for large ultra-sparse matrices. Thanks again for sharing your feedback with us.

1) In-memory representation: In CSR every non-zero will require 12 bytes - this is 240MB in your case. The overall memory consumption, however, depends on the distribution of non-zeros: In CSR, each block with at least one non-zero requires 4KB for row pointers. Assuming uniform distribution (the worst case), this gives us 80GB. This is likely the problem here. Every empty block would have an overhead of 44Bytes but for the worst-case assumption, there are no empty blocks left. We do not use COO for checkpoints because it would slow down subsequent operations.

2) Serialized/on-disk representation: For sparse datasets that are expected to exceed aggregate memory, we used to use a serialized representation (with storage level MEM_AND_DISK_SER) which uses sparse, ultra-sparse, or empty representations. In this form, ultra-sparse blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes. Therefore, with this representation selected, you're dataset should easily fit in aggregate memory. Also, note that chkpoint is only a transformation that persists the rdd, the subsequent operation then pulls the data into memory.

At a high-level this was a bug. We missed ultra-sparse representations when introducing an improvement that stores sparse matrices in MCSR format in CSR format on checkpoints which eliminated the need to use a serialized storage level. I just deliver a fix. Now we store such ultra-sparse matrices again in serialized form which should significantly reduce the memory pressure.

Regards,
Matthias

On 5/3/2017 9:38 AM, Mingyang Wang wrote:
Hi all,

I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one
non-zero value on each row, that is 2e7 non-zero values in total.

With driver memory of 1GB and executor memory of 100GB, I found the HOP
"Spark chkpoint", which is used to pin the FK matrix in memory, is really
expensive, as it invokes lots of disk operations.

FK is stored in binary format with 24 blocks, each block is ~45MB, and ~1GB
in total.

For example, with the script as

"""
FK = read($FK)
print("Sum of FK = " + sum(FK))
"""

things worked fine, and it took ~8s.

While with the script as

"""
FK = read($FK)
if (1 == 1) {}
print("Sum of FK = " + sum(FK))
"""

things changed. It took ~92s and I observed lots of disk spills from logs.
Based on the stats from Spark UI, it seems the materialized FK requires
54GB storage and thus introduces disk operations.

I was wondering, is this the expected behavior of a super sparse matrix?


Regards,
Mingyang

Reply via email to