Re: Questions about the Compositions of Execution Time
That's awesome! Can I take it as when utilizing these super-sparse permutation matrices, it is usually better to store them as column vectors and then dynamically expand them via table()? Currently, all such FK matrices are stored as sparse matrices in binary format. Also, as pmm operator only supports selection, I want to confirm that if FK would be used multiple times, say, in an iterative algorithm, it is still better to use dynamic expansion in each iteration rather than materializing it beforehand (which is simply to reduce read overhead), right? My cluster has been temporally down and sorry I cannot compare these scenarios right now. Best, Mingyang On Fri, Apr 21, 2017 at 12:30 AM Matthias Boehm wrote: > Hi Mingyang, > > just out of curiosity, I did a quick experiment with the discussed > alternative formulation for scenario 1 with the following script > > R = read($1) > S = read($2) > FK = read($3) > > wS = Rand(rows=ncol(S), cols=1, min=0, max=1, pdf="uniform") > wR = Rand(rows=ncol(R), cols=1, min=0, max=1, pdf="uniform") > temp = S %*% wS + table(seq(1,nrow(FK)),FK,nrow(FK),1e6) %*% (R %*% wR) > if(1==1){} > print(sum(temp)) > > and after two additional improvements (SYSTEMML-1550 and SYSTEMML-1551), I > got the following - now reasonable - results: > > Total elapsed time: 7.928 sec. > Total compilation time: 1.802 sec. > Total execution time: 6.126 sec. > > Number of compiled MR Jobs: 0. > Number of executed MR Jobs: 0. > Cache hits (Mem, WB, FS, HDFS): 7/0/0/3. > Cache writes (WB, FS, HDFS):5/0/0. > Cache times (ACQr/m, RLS, EXP): 2.621/0.001/0.511/0.000 sec. > > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time:0.000 sec. > Total JIT compile time: 9.656 sec. > Total JVM GC count: 3. > Total JVM GC time: 1.601 sec. > > Heavy hitter instructions (name, time, count): > -- 1) ba+*3.052 sec 3 > -- 2) rexpand 2.790 sec 1 > -- 3) uak+0.169 sec 1 > -- 4) + 0.095 sec 1 > -- 5) rand0.017 sec 2 > -- 6) print 0.001 sec 1 > -- 7) == 0.001 sec 1 > -- 8) createvar 0.000 sec 10 > -- 9) rmvar 0.000 sec 11 > > -- 10) assignvar 0.000 sec 1 > > There is still some potential because we should compile a permutation > matrix multiply (pmm) instead of materializing this intermediate but this > pmm operator currently only supports selection but no permutation matrices. > Thanks again for catching this performance issue. > > Regards, > Matthias > > On Thu, Apr 20, 2017 at 11:44 AM, Matthias Boehm > wrote: > >> 1) Understanding execution plans: Our local bufferpool reads matrices in >> a lazy manner on the first singlenode, i.e., CP, operation that tries to >> pin the matrix into memory. Similarly, distributed matrices are read into >> aggregated memory on the first Spark instruction. Hence, you can >> differentiate these different scenarios by following the data dependencies, >> i.e., what kind of instructions use the particular matrix. Spark checkpoint >> instructions are a good indicator too but there are special cases where >> they will not exist. >> >> 2) Forcing computation: I typically use 'if(1==1){}' to create a >> statement block cut (and thus a DAG cut) and subsequently simply a >> 'print(sum(temp))' because we apply most algebraic rewrites only within the >> scope of individual statement blocks. >> >> 3) Permutation matrices: If FK has a single entry of value 1 per row, you >> could store it as a column vector with FK2 = rowIndexMax(FK) and >> subsequently reconstruct it via FK = table(seq(1,nrow(FK2)), FK2, >> nrow(FK2), N), for which we will compile a dedicated operator that does row >> expansions. You don't necessarily need the last two argument which only >> ensure padding and thus matching dimensions for the subsequent matrix >> multiplication. >> >> >> Regards, >> Matthias >> >> On 4/20/2017 11:05 AM, Mingyang Wang wrote: >> >>> Hi Matthias, >>> >>> Thanks for your thorough explanations! And I have some other questions. >>> >>> 1. I am curious about the behaviors of the read operation within >>> createvar. >>> How can I differentiate whether the inputs are loaded in the driver >>> memory >>> or loaded in executors? Can I assume the inputs are loaded in executors >>> if >>> a Spark checkpoint instruction is invoked? >>> >>> 2. I am also curious how do you put a sum operation in a different DAG? >>> Currently, I put a "print one entry" instruction within a for loop, is it >>> sufficient to trigger the whole matrix multiplication without some >>> shortcuts like a dot product between a row and a column? At least, from >>> the >>> HOP explains, the whole matrix multiplication is scheduled. >>> >>> 3. About generating a "specific" sparse matrix in SystemML. Say, I need a >>> sparse matrix of 200,000,000 x 10,000,000 and there is exactly one >>> non-zero >>> value in each row (the p
Re: Questions about the Compositions of Execution Time
Hi Mingyang, just out of curiosity, I did a quick experiment with the discussed alternative formulation for scenario 1 with the following script R = read($1) S = read($2) FK = read($3) wS = Rand(rows=ncol(S), cols=1, min=0, max=1, pdf="uniform") wR = Rand(rows=ncol(R), cols=1, min=0, max=1, pdf="uniform") temp = S %*% wS + table(seq(1,nrow(FK)),FK,nrow(FK),1e6) %*% (R %*% wR) if(1==1){} print(sum(temp)) and after two additional improvements (SYSTEMML-1550 and SYSTEMML-1551), I got the following - now reasonable - results: Total elapsed time: 7.928 sec. Total compilation time: 1.802 sec. Total execution time: 6.126 sec. Number of compiled MR Jobs: 0. Number of executed MR Jobs: 0. Cache hits (Mem, WB, FS, HDFS): 7/0/0/3. Cache writes (WB, FS, HDFS):5/0/0. Cache times (ACQr/m, RLS, EXP): 2.621/0.001/0.511/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/0. HOP DAGs recompile time:0.000 sec. Total JIT compile time: 9.656 sec. Total JVM GC count: 3. Total JVM GC time: 1.601 sec. Heavy hitter instructions (name, time, count): -- 1) ba+*3.052 sec 3 -- 2) rexpand 2.790 sec 1 -- 3) uak+0.169 sec 1 -- 4) + 0.095 sec 1 -- 5) rand0.017 sec 2 -- 6) print 0.001 sec 1 -- 7) == 0.001 sec 1 -- 8) createvar 0.000 sec 10 -- 9) rmvar 0.000 sec 11 -- 10) assignvar 0.000 sec 1 There is still some potential because we should compile a permutation matrix multiply (pmm) instead of materializing this intermediate but this pmm operator currently only supports selection but no permutation matrices. Thanks again for catching this performance issue. Regards, Matthias On Thu, Apr 20, 2017 at 11:44 AM, Matthias Boehm wrote: > 1) Understanding execution plans: Our local bufferpool reads matrices in a > lazy manner on the first singlenode, i.e., CP, operation that tries to pin > the matrix into memory. Similarly, distributed matrices are read into > aggregated memory on the first Spark instruction. Hence, you can > differentiate these different scenarios by following the data dependencies, > i.e., what kind of instructions use the particular matrix. Spark checkpoint > instructions are a good indicator too but there are special cases where > they will not exist. > > 2) Forcing computation: I typically use 'if(1==1){}' to create a statement > block cut (and thus a DAG cut) and subsequently simply a 'print(sum(temp))' > because we apply most algebraic rewrites only within the scope of > individual statement blocks. > > 3) Permutation matrices: If FK has a single entry of value 1 per row, you > could store it as a column vector with FK2 = rowIndexMax(FK) and > subsequently reconstruct it via FK = table(seq(1,nrow(FK2)), FK2, > nrow(FK2), N), for which we will compile a dedicated operator that does row > expansions. You don't necessarily need the last two argument which only > ensure padding and thus matching dimensions for the subsequent matrix > multiplication. > > > Regards, > Matthias > > On 4/20/2017 11:05 AM, Mingyang Wang wrote: > >> Hi Matthias, >> >> Thanks for your thorough explanations! And I have some other questions. >> >> 1. I am curious about the behaviors of the read operation within >> createvar. >> How can I differentiate whether the inputs are loaded in the driver memory >> or loaded in executors? Can I assume the inputs are loaded in executors if >> a Spark checkpoint instruction is invoked? >> >> 2. I am also curious how do you put a sum operation in a different DAG? >> Currently, I put a "print one entry" instruction within a for loop, is it >> sufficient to trigger the whole matrix multiplication without some >> shortcuts like a dot product between a row and a column? At least, from >> the >> HOP explains, the whole matrix multiplication is scheduled. >> >> 3. About generating a "specific" sparse matrix in SystemML. Say, I need a >> sparse matrix of 200,000,000 x 10,000,000 and there is exactly one >> non-zero >> value in each row (the position could be random). Is there any efficient >> way to do it? Currently, I am generating such matrix externally in text >> format, and it cannot be easily converted to binary format with a simple >> read/write script (it took quite a long time and failed). >> >> >> Regards, >> Mingyang >> >> On Thu, Apr 20, 2017 at 2:08 AM Matthias Boehm >> wrote: >> >> Hi Mingyang, >>> >>> thanks for the questions - this is very valuable feedback. I was able to >>> reproduce your performance issue on scenario 1 and I have a patch, which >>> I'll push to master tomorrow after a more thorough testing. Below are the >>> details and the answers to your questions: >>> >>> 1) Expected performance and bottlenecks: In general, for these single >>> operation scripts, the read is indeed the expected bottleneck. However, >>> excessive GC is usually an indicator for internal performance issues that >>
Re: Questions about the Compositions of Execution Time
1) Understanding execution plans: Our local bufferpool reads matrices in a lazy manner on the first singlenode, i.e., CP, operation that tries to pin the matrix into memory. Similarly, distributed matrices are read into aggregated memory on the first Spark instruction. Hence, you can differentiate these different scenarios by following the data dependencies, i.e., what kind of instructions use the particular matrix. Spark checkpoint instructions are a good indicator too but there are special cases where they will not exist. 2) Forcing computation: I typically use 'if(1==1){}' to create a statement block cut (and thus a DAG cut) and subsequently simply a 'print(sum(temp))' because we apply most algebraic rewrites only within the scope of individual statement blocks. 3) Permutation matrices: If FK has a single entry of value 1 per row, you could store it as a column vector with FK2 = rowIndexMax(FK) and subsequently reconstruct it via FK = table(seq(1,nrow(FK2)), FK2, nrow(FK2), N), for which we will compile a dedicated operator that does row expansions. You don't necessarily need the last two argument which only ensure padding and thus matching dimensions for the subsequent matrix multiplication. Regards, Matthias On 4/20/2017 11:05 AM, Mingyang Wang wrote: Hi Matthias, Thanks for your thorough explanations! And I have some other questions. 1. I am curious about the behaviors of the read operation within createvar. How can I differentiate whether the inputs are loaded in the driver memory or loaded in executors? Can I assume the inputs are loaded in executors if a Spark checkpoint instruction is invoked? 2. I am also curious how do you put a sum operation in a different DAG? Currently, I put a "print one entry" instruction within a for loop, is it sufficient to trigger the whole matrix multiplication without some shortcuts like a dot product between a row and a column? At least, from the HOP explains, the whole matrix multiplication is scheduled. 3. About generating a "specific" sparse matrix in SystemML. Say, I need a sparse matrix of 200,000,000 x 10,000,000 and there is exactly one non-zero value in each row (the position could be random). Is there any efficient way to do it? Currently, I am generating such matrix externally in text format, and it cannot be easily converted to binary format with a simple read/write script (it took quite a long time and failed). Regards, Mingyang On Thu, Apr 20, 2017 at 2:08 AM Matthias Boehm wrote: Hi Mingyang, thanks for the questions - this is very valuable feedback. I was able to reproduce your performance issue on scenario 1 and I have a patch, which I'll push to master tomorrow after a more thorough testing. Below are the details and the answers to your questions: 1) Expected performance and bottlenecks: In general, for these single operation scripts, the read is indeed the expected bottleneck. However, excessive GC is usually an indicator for internal performance issues that can be addressed. Let's discuss the scenarios individually: a) Script 1 (in-memory operations): Given the mentioned data sizes, the inputs are read into the driver and all operations are executed as singlenode, in-memory operations. However, typically we read binary matrices at 1GB/s and perform these matrix-vector operations at peak memory bandwidth, i.e., 16-64GB/s on a single node. The problem in your scenario is the read of the ultra-sparse matrix FK, which has a sparsity of 10^-6, i.e., roughly a single cell per row. In my environment the stats looked as follows: Total elapsed time: 48.274 sec. Total compilation time: 1.957 sec. Total execution time: 46.317 sec. Number of compiled MR Jobs: 0. Number of executed MR Jobs: 0. Cache hits (Mem, WB, FS, HDFS): 6/0/0/3. Cache writes (WB, FS, HDFS):4/0/0. Cache times (ACQr/m, RLS, EXP): 45.078/0.001/0.005/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/0. HOP DAGs recompile time:0.000 sec. Total JIT compile time: 9.24 sec. Total JVM GC count: 23. Total JVM GC time: 35.181 sec. Heavy hitter instructions (name, time, count): -- 1) ba+*45.927 sec 3 -- 2) uak+0.228 sec 1 -- 3) + 0.138 sec 1 -- 4) rand0.023 sec 2 -- 5) print 0.001 sec 1 -- 6) == 0.001 sec 1 -- 7) createvar 0.000 sec 9 -- 8) rmvar 0.000 sec 10 -- 9) assignvar 0.000 sec 1 -- 10) cpvar 0.000 sec 1 With the patch (that essentially leverages our CSR instead of MCSR sparse format for temporarily read blocks in order to reduce the size overhead and allow for efficient reuse), the execution time improved to the following Total elapsed time: 14.860 sec. Total compilation time: 1.922 sec. Total execution time: 12.938 sec. Number of compiled MR Jobs: 0. Number of executed MR Jobs: 0. Cache hits (Mem, WB, FS, HDFS): 6/0/0/3
Re: Questions about the Compositions of Execution Time
Hi Matthias, Thanks for your thorough explanations! And I have some other questions. 1. I am curious about the behaviors of the read operation within createvar. How can I differentiate whether the inputs are loaded in the driver memory or loaded in executors? Can I assume the inputs are loaded in executors if a Spark checkpoint instruction is invoked? 2. I am also curious how do you put a sum operation in a different DAG? Currently, I put a "print one entry" instruction within a for loop, is it sufficient to trigger the whole matrix multiplication without some shortcuts like a dot product between a row and a column? At least, from the HOP explains, the whole matrix multiplication is scheduled. 3. About generating a "specific" sparse matrix in SystemML. Say, I need a sparse matrix of 200,000,000 x 10,000,000 and there is exactly one non-zero value in each row (the position could be random). Is there any efficient way to do it? Currently, I am generating such matrix externally in text format, and it cannot be easily converted to binary format with a simple read/write script (it took quite a long time and failed). Regards, Mingyang On Thu, Apr 20, 2017 at 2:08 AM Matthias Boehm wrote: > Hi Mingyang, > > thanks for the questions - this is very valuable feedback. I was able to > reproduce your performance issue on scenario 1 and I have a patch, which > I'll push to master tomorrow after a more thorough testing. Below are the > details and the answers to your questions: > > 1) Expected performance and bottlenecks: In general, for these single > operation scripts, the read is indeed the expected bottleneck. However, > excessive GC is usually an indicator for internal performance issues that > can be addressed. Let's discuss the scenarios individually: > > a) Script 1 (in-memory operations): Given the mentioned data sizes, the > inputs are read into the driver and all operations are executed as > singlenode, in-memory operations. However, typically we read binary > matrices at 1GB/s and perform these matrix-vector operations at peak memory > bandwidth, i.e., 16-64GB/s on a single node. > > The problem in your scenario is the read of the ultra-sparse matrix FK, > which has a sparsity of 10^-6, i.e., roughly a single cell per row. In my > environment the stats looked as follows: > > Total elapsed time: 48.274 sec. > Total compilation time: 1.957 sec. > Total execution time: 46.317 sec. > Number of compiled MR Jobs: 0. > Number of executed MR Jobs: 0. > Cache hits (Mem, WB, FS, HDFS): 6/0/0/3. > Cache writes (WB, FS, HDFS):4/0/0. > Cache times (ACQr/m, RLS, EXP): 45.078/0.001/0.005/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time:0.000 sec. > Total JIT compile time: 9.24 sec. > Total JVM GC count: 23. > Total JVM GC time: 35.181 sec. > Heavy hitter instructions (name, time, count): > -- 1) ba+*45.927 sec 3 > -- 2) uak+0.228 sec 1 > -- 3) + 0.138 sec 1 > -- 4) rand0.023 sec 2 > -- 5) print 0.001 sec 1 > -- 6) == 0.001 sec 1 > -- 7) createvar 0.000 sec 9 > -- 8) rmvar 0.000 sec 10 > -- 9) assignvar 0.000 sec 1 > -- 10) cpvar 0.000 sec 1 > > With the patch (that essentially leverages our CSR instead of MCSR sparse > format for temporarily read blocks in order to reduce the size overhead and > allow for efficient reuse), the execution time improved to the following > > Total elapsed time: 14.860 sec. > Total compilation time: 1.922 sec. > Total execution time: 12.938 sec. > Number of compiled MR Jobs: 0. > Number of executed MR Jobs: 0. > Cache hits (Mem, WB, FS, HDFS): 6/0/0/3. > Cache writes (WB, FS, HDFS):4/0/0. > Cache times (ACQr/m, RLS, EXP): 10.227/0.001/0.006/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time:0.000 sec. > Total JIT compile time: 7.529 sec. > Total JVM GC count: 6. > Total JVM GC time: 4.174 sec. > Heavy hitter instructions (name, time, count): > -- 1) ba+*12.442 sec 3 > -- 2) uak+0.380 sec 1 > -- 3) + 0.097 sec 1 > -- 4) rand0.018 sec 2 > -- 5) == 0.001 sec 1 > -- 6) print 0.000 sec 1 > -- 7) createvar 0.000 sec 9 > -- 8) rmvar 0.000 sec 10 > -- 9) cpvar 0.000 sec 1 > -- 10) assignvar 0.000 sec 1 > > b) Script 2 (distributed operations): This scenario looks as expected. > However, the stats output can be a little misleading due to Sparks lazy > evaluation. Since the read and matrix-vector multiplication are just > transformations, the collect action then triggers the entire pipeline and > accordingly shows up as the heavy hitter. Again, here are the stats from my > environment (where I used a sum in a different DAG to trigger compute): > >
Re: Questions about the Compositions of Execution Time
Hi Mingyang, thanks for the questions - this is very valuable feedback. I was able to reproduce your performance issue on scenario 1 and I have a patch, which I'll push to master tomorrow after a more thorough testing. Below are the details and the answers to your questions: 1) Expected performance and bottlenecks: In general, for these single operation scripts, the read is indeed the expected bottleneck. However, excessive GC is usually an indicator for internal performance issues that can be addressed. Let's discuss the scenarios individually: a) Script 1 (in-memory operations): Given the mentioned data sizes, the inputs are read into the driver and all operations are executed as singlenode, in-memory operations. However, typically we read binary matrices at 1GB/s and perform these matrix-vector operations at peak memory bandwidth, i.e., 16-64GB/s on a single node. The problem in your scenario is the read of the ultra-sparse matrix FK, which has a sparsity of 10^-6, i.e., roughly a single cell per row. In my environment the stats looked as follows: Total elapsed time: 48.274 sec. Total compilation time: 1.957 sec. Total execution time: 46.317 sec. Number of compiled MR Jobs: 0. Number of executed MR Jobs: 0. Cache hits (Mem, WB, FS, HDFS): 6/0/0/3. Cache writes (WB, FS, HDFS):4/0/0. Cache times (ACQr/m, RLS, EXP): 45.078/0.001/0.005/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/0. HOP DAGs recompile time:0.000 sec. Total JIT compile time: 9.24 sec. Total JVM GC count: 23. Total JVM GC time: 35.181 sec. Heavy hitter instructions (name, time, count): -- 1) ba+*45.927 sec 3 -- 2) uak+0.228 sec 1 -- 3) + 0.138 sec 1 -- 4) rand0.023 sec 2 -- 5) print 0.001 sec 1 -- 6) == 0.001 sec 1 -- 7) createvar 0.000 sec 9 -- 8) rmvar 0.000 sec 10 -- 9) assignvar 0.000 sec 1 -- 10) cpvar 0.000 sec 1 With the patch (that essentially leverages our CSR instead of MCSR sparse format for temporarily read blocks in order to reduce the size overhead and allow for efficient reuse), the execution time improved to the following Total elapsed time: 14.860 sec. Total compilation time: 1.922 sec. Total execution time: 12.938 sec. Number of compiled MR Jobs: 0. Number of executed MR Jobs: 0. Cache hits (Mem, WB, FS, HDFS): 6/0/0/3. Cache writes (WB, FS, HDFS):4/0/0. Cache times (ACQr/m, RLS, EXP): 10.227/0.001/0.006/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/0. HOP DAGs recompile time:0.000 sec. Total JIT compile time: 7.529 sec. Total JVM GC count: 6. Total JVM GC time: 4.174 sec. Heavy hitter instructions (name, time, count): -- 1) ba+*12.442 sec 3 -- 2) uak+0.380 sec 1 -- 3) + 0.097 sec 1 -- 4) rand0.018 sec 2 -- 5) == 0.001 sec 1 -- 6) print 0.000 sec 1 -- 7) createvar 0.000 sec 9 -- 8) rmvar 0.000 sec 10 -- 9) cpvar 0.000 sec 1 -- 10) assignvar 0.000 sec 1 b) Script 2 (distributed operations): This scenario looks as expected. However, the stats output can be a little misleading due to Sparks lazy evaluation. Since the read and matrix-vector multiplication are just transformations, the collect action then triggers the entire pipeline and accordingly shows up as the heavy hitter. Again, here are the stats from my environment (where I used a sum in a different DAG to trigger compute): Total elapsed time: 62.681 sec. Total compilation time: 1.790 sec. Total execution time: 60.891 sec. Number of compiled Spark inst: 2. Number of executed Spark inst: 2. Cache hits (Mem, WB, FS, HDFS): 1/0/0/1. Cache writes (WB, FS, HDFS):1/0/0. Cache times (ACQr/m, RLS, EXP): 26.323/0.001/0.004/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/1. HOP DAGs recompile time:0.005 sec. Spark ctx create time (lazy): 33.687 sec. Spark trans counts (par,bc,col):0/1/1. Spark trans times (par,bc,col): 0.000/0.011/26.322 secs. Total JIT compile time: 19.571 sec. Total JVM GC count: 12. Total JVM GC time: 0.536 sec. Heavy hitter instructions (name, time, count): -- 1) sp_chkpoint 34.272 sec 1 -- 2) uak+26.474 sec 1 -- 3) sp_mapmm0.026 sec 1 -- 4) rand0.023 sec 1 -- 5) rmvar 0.011 sec 7 -- 6) == 0.001 sec 1 -- 7) print 0.000 sec 1 -- 8) createvar 0.000 sec 4 -- 9) assignvar 0.000 sec 1 -- 10) cpvar 0.000 sec 1 Note, that 33s out of 62s are required for spark context creation (allocating and initializing the yarn containers for executors). The collect is then triggered by the sum (uak+, i.e., unary aggregate kahan plus) which includes the collect. Furthermore, there is