quick update: The poor runtime of scenario (3) is now fixed in master. The
reasons were unnecessary shuffle and load imbalance for spark rexpand
operations with small input vector and large, ultra-sparse output matrix.
Thanks for pointing this out Mingyang.

Regards,
Matthias


On Mon, May 8, 2017 at 3:09 PM, Matthias Boehm <mboe...@googlemail.com>
wrote:

> ok thanks for sharing - I'll have a look later this week.
>
> Regards,
> Matthias
>
> On Mon, May 8, 2017 at 2:20 PM, Mingyang Wang <miw...@eng.ucsd.edu> wrote:
>
>> Hi Matthias,
>>
>> With a driver memory of 10GB, all operations were executed on CP, and I
>> did
>> observe that the version of reading FK as a vector and then converting it
>> was faster, which took 8.337s (6.246s on GC) while the version of reading
>> FK as a matrix took 31.680s (26.256s on GC).
>>
>> For the distributed caching, I have re-run all scripts with the following
>> Spark configuration
>>
>>     --driver-memory 1G \
>>     --executor-memory 100G \
>>     --executor-cores 20 \
>>     --num-executors 1 \
>>     --conf spark.driver.maxResultSize=0 \
>>     --conf spark.rpc.message.maxSize=128 \
>>
>> And it seems that both versions have some problems.
>>
>> 1) Sum of FK in matrix form
>> ```
>> FK = read($FK)
>> print("Sum of FK = " + sum(FK))
>> ```
>> Worked as expected. Took 8.786s.
>>
>>
>> 2) Sum of FK in matrix form, with checkpoints
>> ```
>> FK = read($FK)
>> if (1 == 1) {}
>> print("Sum of FK = " + sum(FK))
>> ```
>> It took 89.731s, with detailed stats shown below.
>>
>> 17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics:
>> Total elapsed time:             91.619 sec.
>> Total compilation time:         1.889 sec.
>> Total execution time:           89.731 sec.
>> Number of compiled Spark inst:  2.
>> Number of executed Spark inst:  2.
>> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
>> Cache writes (WB, FS, HDFS):    0/0/0.
>> Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec.
>> HOP DAGs recompiled (PRED, SB): 0/0.
>> HOP DAGs recompile time:        0.000 sec.
>> Spark ctx create time (lazy):   0.895 sec.
>> Spark trans counts (par,bc,col):0/0/0.
>> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
>> Total JIT compile time:         5.001 sec.
>> Total JVM GC count:             8.
>> Total JVM GC time:              0.161 sec.
>> Heavy hitter instructions (name, time, count):
>> -- 1)   sp_uak+         89.349 sec      1
>> -- 2)   sp_chkpoint     0.381 sec       1
>> -- 3)   ==      0.001 sec       1
>> -- 4)   +       0.000 sec       1
>> -- 5)   print   0.000 sec       1
>> -- 6)   castdts         0.000 sec       1
>> -- 7)   createvar       0.000 sec       3
>> -- 8)   rmvar   0.000 sec       7
>> -- 9)   assignvar       0.000 sec       1
>> -- 10)  cpvar   0.000 sec       1
>>
>>
>> 3) Sum of FK in vector form
>> ```
>> FK_colvec = read($FK_colvec)
>> FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6)
>> print("Sum of FK = " + sum(FK))
>> ```
>> Things really went wrong. It took ~10 mins.
>>
>> 17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics:
>> Total elapsed time:             605.688 sec.
>> Total compilation time:         1.857 sec.
>> Total execution time:           603.832 sec.
>> Number of compiled Spark inst:  2.
>> Number of executed Spark inst:  2.
>> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
>> Cache writes (WB, FS, HDFS):    0/0/0.
>> Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
>> HOP DAGs recompiled (PRED, SB): 0/1.
>> HOP DAGs recompile time:        0.002 sec.
>> Spark ctx create time (lazy):   0.858 sec.
>> Spark trans counts (par,bc,col):0/0/0.
>> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
>> Total JIT compile time:         3.682 sec.
>> Total JVM GC count:             5.
>> Total JVM GC time:              0.064 sec.
>> Heavy hitter instructions (name, time, count):
>> -- 1)   sp_uak+         603.447 sec     1
>> -- 2)   sp_rexpand      0.381 sec       1
>> -- 3)   createvar       0.002 sec       3
>> -- 4)   rmvar   0.000 sec       5
>> -- 5)   +       0.000 sec       1
>> -- 6)   print   0.000 sec       1
>> -- 7)   castdts         0.000 sec       1
>>
>> Also, from the executor log, there were some disk spilling:
>>
>> 17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory
>> map of 33.8 GB to disk (1 time so far)
>> 17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory
>> map of 31.2 GB to disk (1 time so far)
>>
>> ...
>>
>> 17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling
>> in-memory map of 26.9 GB to disk (1 time so far)
>> 17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling
>> in-memory map of 26.6 GB to disk (1 time so far)
>>
>>
>>
>> Regards,
>> Mingyang
>>
>> On Sat, May 6, 2017 at 9:12 PM Matthias Boehm <mboe...@googlemail.com>
>> wrote:
>>
>> > yes, even with the previous patch for improved memory efficiency of
>> > ultra-sparse matrices in MCSR format, there is still some unnecessary
>> > overhead that leads to garbage collection. For this reason, I would
>> > recommend to read it as vector and convert it in memory to an
>> ultra-sparse
>> > matrix. I also just pushed a minor performance improvement for reading
>> > ultra-sparse matrices but the major bottleneck still exist.
>> >
>> > The core issue is that we can't read these ultra-sparse matrices into a
>> CSR
>> > representation because it does not allow for efficient incremental
>> > construction (with unordered inputs and multi-threaded read). However, I
>> > created SYSTEMML-1587 to solve this in the general case. The idea is to
>> > read ultra-sparse matrices into thread-local COO deltas and finally
>> merge
>> > it into a CSR representation. The initial results are very promising and
>> > it's safe because the temporary memory requirements are covered by the
>> MCSR
>> > estimate, but it will take a while because I want to introduce this
>> > consistently for all readers (single-/multi-threaded, all formats).
>> >
>> > In contrast to the read issue, I was not able to reproduce the described
>> > performance issue of distributed caching. Could you please double check
>> > that this test also used the current master build and perhaps share the
>> > detailed setup again (e.g., num executors, data distribution, etc).
>> Thanks.
>> >
>> > Regards,
>> > Matthias
>> >
>> >
>> > On Thu, May 4, 2017 at 9:55 PM, Mingyang Wang <miw...@eng.ucsd.edu>
>> wrote:
>> >
>> > > Out of curiosity, I increased the driver memory to 10GB, and then all
>> > > operations were executed on CP. It took 37.166s but JVM GC took
>> 30.534s.
>> > I
>> > > was wondering whether this is the expected behavior?
>> > >
>> > > Total elapsed time: 38.093 sec.
>> > > Total compilation time: 0.926 sec.
>> > > Total execution time: 37.166 sec.
>> > > Number of compiled Spark inst: 0.
>> > > Number of executed Spark inst: 0.
>> > > Cache hits (Mem, WB, FS, HDFS): 0/0/0/1.
>> > > Cache writes (WB, FS, HDFS): 0/0/0.
>> > > Cache times (ACQr/m, RLS, EXP): 30.400/0.000/0.001/0.000 sec.
>> > > HOP DAGs recompiled (PRED, SB): 0/0.
>> > > HOP DAGs recompile time: 0.000 sec.
>> > > Spark ctx create time (lazy): 0.000 sec.
>> > > Spark trans counts (par,bc,col):0/0/0.
>> > > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
>> > > Total JIT compile time: 22.302 sec.
>> > > Total JVM GC count: 11.
>> > > Total JVM GC time: 30.534 sec.
>> > > Heavy hitter instructions (name, time, count):
>> > > -- 1) uak+ 37.166 sec 1
>> > > -- 2) == 0.001 sec 1
>> > > -- 3) + 0.000 sec 1
>> > > -- 4) print 0.000 sec 1
>> > > -- 5) rmvar 0.000 sec 5
>> > > -- 6) createvar 0.000 sec 1
>> > > -- 7) assignvar 0.000 sec 1
>> > > -- 8) cpvar 0.000 sec 1
>> > >
>> > > Regards,
>> > > Mingyang
>> > >
>> > > On Thu, May 4, 2017 at 9:48 PM Mingyang Wang <miw...@eng.ucsd.edu>
>> > wrote:
>> > >
>> > > > Hi Matthias,
>> > > >
>> > > > Thanks for the patch.
>> > > >
>> > > > I have re-run the experiment and observed that there was indeed no
>> more
>> > > > memory pressure, but it still took ~90s for this simple script. I
>> was
>> > > > wondering what is the bottleneck for this case?
>> > > >
>> > > >
>> > > > Total elapsed time: 94.800 sec.
>> > > > Total compilation time: 1.826 sec.
>> > > > Total execution time: 92.974 sec.
>> > > > Number of compiled Spark inst: 2.
>> > > > Number of executed Spark inst: 2.
>> > > > Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
>> > > > Cache writes (WB, FS, HDFS): 0/0/0.
>> > > > Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
>> > > > HOP DAGs recompiled (PRED, SB): 0/0.
>> > > > HOP DAGs recompile time: 0.000 sec.
>> > > > Spark ctx create time (lazy): 0.860 sec.
>> > > > Spark trans counts (par,bc,col):0/0/0.
>> > > > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
>> > > > Total JIT compile time: 3.498 sec.
>> > > > Total JVM GC count: 5.
>> > > > Total JVM GC time: 0.064 sec.
>> > > > Heavy hitter instructions (name, time, count):
>> > > > -- 1) sp_uak+ 92.597 sec 1
>> > > > -- 2) sp_chkpoint 0.377 sec 1
>> > > > -- 3) == 0.001 sec 1
>> > > > -- 4) print 0.000 sec 1
>> > > > -- 5) + 0.000 sec 1
>> > > > -- 6) castdts 0.000 sec 1
>> > > > -- 7) createvar 0.000 sec 3
>> > > > -- 8) rmvar 0.000 sec 7
>> > > > -- 9) assignvar 0.000 sec 1
>> > > > -- 10) cpvar 0.000 sec 1
>> > > >
>> > > > Regards,
>> > > > Mingyang
>> > > >
>> > > > On Wed, May 3, 2017 at 8:54 AM Matthias Boehm <
>> mboe...@googlemail.com>
>> > > > wrote:
>> > > >
>> > > >> to summarize, this was an issue of selecting serialized
>> > representations
>> > > >> for large ultra-sparse matrices. Thanks again for sharing your
>> > feedback
>> > > >> with us.
>> > > >>
>> > > >> 1) In-memory representation: In CSR every non-zero will require 12
>> > bytes
>> > > >> - this is 240MB in your case. The overall memory consumption,
>> however,
>> > > >> depends on the distribution of non-zeros: In CSR, each block with
>> at
>> > > >> least one non-zero requires 4KB for row pointers. Assuming uniform
>> > > >> distribution (the worst case), this gives us 80GB. This is likely
>> the
>> > > >> problem here. Every empty block would have an overhead of 44Bytes
>> but
>> > > >> for the worst-case assumption, there are no empty blocks left. We
>> do
>> > not
>> > > >> use COO for checkpoints because it would slow down subsequent
>> > > operations.
>> > > >>
>> > > >> 2) Serialized/on-disk representation: For sparse datasets that are
>> > > >> expected to exceed aggregate memory, we used to use a serialized
>> > > >> representation (with storage level MEM_AND_DISK_SER) which uses
>> > sparse,
>> > > >> ultra-sparse, or empty representations. In this form, ultra-sparse
>> > > >> blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes.
>> > > >> Therefore, with this representation selected, you're dataset should
>> > > >> easily fit in aggregate memory. Also, note that chkpoint is only a
>> > > >> transformation that persists the rdd, the subsequent operation then
>> > > >> pulls the data into memory.
>> > > >>
>> > > >> At a high-level this was a bug. We missed ultra-sparse
>> representations
>> > > >> when introducing an improvement that stores sparse matrices in MCSR
>> > > >> format in CSR format on checkpoints which eliminated the need to
>> use a
>> > > >> serialized storage level. I just deliver a fix. Now we store such
>> > > >> ultra-sparse matrices again in serialized form which should
>> > > >> significantly reduce the memory pressure.
>> > > >>
>> > > >> Regards,
>> > > >> Matthias
>> > > >>
>> > > >> On 5/3/2017 9:38 AM, Mingyang Wang wrote:
>> > > >> > Hi all,
>> > > >> >
>> > > >> > I was playing with a super sparse matrix FK, 2e7 by 1e6, with
>> only
>> > one
>> > > >> > non-zero value on each row, that is 2e7 non-zero values in total.
>> > > >> >
>> > > >> > With driver memory of 1GB and executor memory of 100GB, I found
>> the
>> > > HOP
>> > > >> > "Spark chkpoint", which is used to pin the FK matrix in memory,
>> is
>> > > >> really
>> > > >> > expensive, as it invokes lots of disk operations.
>> > > >> >
>> > > >> > FK is stored in binary format with 24 blocks, each block is
>> ~45MB,
>> > and
>> > > >> ~1GB
>> > > >> > in total.
>> > > >> >
>> > > >> > For example, with the script as
>> > > >> >
>> > > >> > """
>> > > >> > FK = read($FK)
>> > > >> > print("Sum of FK = " + sum(FK))
>> > > >> > """
>> > > >> >
>> > > >> > things worked fine, and it took ~8s.
>> > > >> >
>> > > >> > While with the script as
>> > > >> >
>> > > >> > """
>> > > >> > FK = read($FK)
>> > > >> > if (1 == 1) {}
>> > > >> > print("Sum of FK = " + sum(FK))
>> > > >> > """
>> > > >> >
>> > > >> > things changed. It took ~92s and I observed lots of disk spills
>> from
>> > > >> logs.
>> > > >> > Based on the stats from Spark UI, it seems the materialized FK
>> > > requires
>> > > >> >> 54GB storage and thus introduces disk operations.
>> > > >> >
>> > > >> > I was wondering, is this the expected behavior of a super sparse
>> > > matrix?
>> > > >> >
>> > > >> >
>> > > >> > Regards,
>> > > >> > Mingyang
>> > > >> >
>> > > >>
>> > > >
>> > >
>> >
>>
>
>

Reply via email to