Hi Anthony,

thanks for helping to debug this issue. There are no limits other than the dimensions and number of non-zeros being of type long. It sounds more like an issues of converting special cases of ultra-sparse matrices. I'll try to reproduce this issue and give an update as soon as I know more. In the meantime, could you please (a) also provide the stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b) try calling your IJV conversion script via spark submit to exclude that this issue is API-related? Thanks.

Regards,
Matthias

On 12/24/2017 1:40 AM, Anthony Thomas wrote:
Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
dimensions and blocksizes to dataFrameToBinaryBlock both without success. I
additionally wrote out the matrix to hdfs in IJV format and am still
getting the same error when calling "read()" directly in the DML. However,
I created a 1% sample of the original data in IJV format and SystemML was
able to read the smaller file without any issue. This would seem to suggest
that either there is some corruption in the full file or I'm running into
some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
2.4e9 nonzero values, but this seems like it should be well within the
limits of what SystemML/Spark can handle. I also checked for obvious data
errors (file is not 1 indexed or contains blank lines). In case it's
helpful, the stacktrace from reading the data from hdfs in IJV format is
attached. Thanks again for your help - I really appreciate it.

 00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
        at java.lang.System.arraycopy(Native Method)
        at
org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRightByN(SparseBlockCOO.java:594)
        at
org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(SparseBlockCOO.java:323)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1790)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1736)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
        at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
        at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
        at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
        at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Anthony


On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mboe...@gmail.com> wrote:

Given the line numbers from the stacktrace, it seems that you use a rather
old version of SystemML. Hence, I would recommend to upgrade to SystemML
1.0 or at least 0.15 first.

If the error persists or you're not able to upgrade, please try to call
dataFrameToBinaryBlock with provided matrix characteristics of dimensions
and blocksizes. The issue you've shown usually originates from incorrect
meta data (e.g., negative number of columns or block sizes), which prevents
the sparse rows from growing to the necessary sizes.

Regards,
Matthias

On 12/22/2017 10:42 PM, Anthony Thomas wrote:

Hi Matthias,

Thanks for the help! In response to your questions:

   1. Sorry - this was a typo: the correct schema is: [y: int, features:
   vector] - the column "features" was created using Spark's
VectorAssembler
   and the underlying type is an org.apache.spark.ml.linalg.SparseVector.
   Calling x.schema results in: org.apache.spark.sql.types.StructType =
   StructType(StructField(features,org.apache.spark.ml.
   linalg.VectorUDT@3bfc3ba7,true)
   2. "y" converts fine - it appears the only issue is with X. The script
   still crashes when running "print(sum(X))". The full stack trace is
   attached at the end of the message.
   3. Unfortunately, the error persists when calling
   RDDConverterUtils.dataFrameToBinaryBlock directly.
   4. Also just in case this matters: I'm packaging the script into a jar

   using SBT assembly and submitting via spark-submit.

Here's an updated script:

        val input_df = spark.read.parquet(inputPath)
        val x = input_df.select(featureNames)
        val y = input_df.select("y")
        val meta_x = new MatrixMetadata(DF_VECTOR)
        val meta_y = new MatrixMetadata(DF_DOUBLES)

        val script_x = dml("print(sum(X))").in("X", x, meta_x)
        println("Reading X")
        val res_x = ml.execute(script_x)

Here is the output of the runtime plan generated by SystemML:

# EXPLAIN (RUNTIME):
# Memory Budget local/remote = 76459MB/?MB/?MB/?MB
# Degree of Parallelism (vcores) local/remote = 24/?
PROGRAM ( size CP/SP = 3/0 )
--MAIN PROGRAM
----GENERIC (lines 1-2) [recompile=false]
------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
------CP rmvar _Var0 _Var1

And the resulting stack trace:

7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 (TID 205,
10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.sysml.runtime.matrix.data.SparseRow.append(
SparseRow.java:215)
    at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
append(SparseBlockMCSR.java:253)
    at org.apache.sysml.runtime.matrix.data.MatrixBlock.
appendValue(MatrixBlock.java:663)
    at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
erUtils$
DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
    at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
erUtils$
DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
apply(JavaRDDLike.scala:186)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
apply(JavaRDDLike.scala:186)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(
MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
times; aborting job
Exception in thread "main" org.apache.sysml.api.mlcontext
.MLContextException:
Exception when executing script
    at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
java:311)
    at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
java:280)
    at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
ml_algorithms.scala:63)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
    at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception
occurred while executing runtime program
    at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
Program(
ScriptExecutor.java:390)
    at org.apache.sysml.api.mlcontext.ScriptExecutor.
execute(ScriptExecutor.java:298)
    at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
java:303)
    ... 14 more
Caused by: org.apache.sysml.runtime.DMLRuntimeException:
org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
program block generated from statement block between lines 1 and 2 --
Error
evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
    at org.apache.sysml.runtime.controlprogram.Program.
execute(Program.java:130)
    at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
Program(
ScriptExecutor.java:388)
    ... 16 more
...



On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mboe...@gmail.com>
wrote:

well, let's do the following to figure this out:

1) If the schema is indeed [label: Integer, features: SparseVector],
please change the third line to val y = input_data.select("label").

2) For debugging, I would recommend to use a simple script like
"print(sum(X));" and try converting X and y separately to isolate the
problem.

3) If it's still failing, it would be helpful to known (a) if it's an
issue of converting X, y, or both, as well as (b) the full stacktrace.

4) As a workaround you might also call our internal converter directly
via:
RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
isVector),
where jsc is the java spark context, df is the dataset, mc are matrix
characteristics (if unknown, simply use new MatrixCharacteristics()),
containsID indicates if the dataset contains a column "__INDEX" with the
row indexes, and isVector indicates if the passed datasets contains
vectors
or basic types such as double.


Regards,
Matthias


On 12/22/2017 12:00 AM, Anthony Thomas wrote:

Hi SystemML folks,

I'm trying to pass some data from Spark to a DML script via the
MLContext
API. The data is derived from a parquet file containing a dataframe with
the schema: [label: Integer, features: SparseVector]. I am doing the
following:

        val input_data = spark.read.parquet(inputPath)
        val x = input_data.select("features")
        val y = input_data.select("y")
        val x_meta = new MatrixMetadata(DF_VECTOR)
        val y_meta = new MatrixMetadata(DF_DOUBLES)
        val script = dmlFromFile(s"${script_path}/script.dml").
                in("X", x, x_meta).
                in("Y", y, y_meta)
        ...

However, this results in an error from SystemML:
java.lang.ArrayIndexOutOfBoundsException: 0
I'm guessing this has something to do with SparkML being zero indexed
and
SystemML being 1 indexed. Is there something I should be doing
differently
here? Note that I also tried converting the dataframe to a
CoordinateMatrix
and then creating an RDD[String] in IJV format. That too resulted in
"ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
simple
I'm doing wrong here, but I haven't been able to figure out exactly
what.
Please let me know if you need more information (I can send along the
full
error stacktrace if that would be helpful)!

Thanks,

Anthony





Reply via email to