Thanks Matthias - unfortunately I'm still running into an
ArrayIndexOutOfBounds exception both in reading the file as IJV and when
calling dataFrametoBinaryBlock. Just to confirm: I downloaded and compiled
the latest version using:

git clone https://github.com/apache/systemml
cd systemml
mvn clean package

mvn -version
Apache Maven 3.3.9
Maven home: /usr/share/maven
Java version: 1.8.0_151, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-oracle/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-103-generic", arch: "amd64", family: "unix"

I have a simple driver script written in Scala which calls the API methods.
I compile the script using SBT (version 1.0.4) and submit using
spark-submit (spark version 2.2.0). Here's how I'm calling the methods:

        val x = spark.read.parquet(inputPath).select(featureNames)
        val mc = new MatrixCharacteristics(199563535L, 71403L, 1024, 1024,
2444225947L) // as far as I know 1024x1024 is default block size in sysml?
        println("Reading Direct")
        val xrdd = RDDConverterUtils.dataFrameToBinaryBlock(jsc, x, mc,
false, true)
        xrdd.count

here is the stacktrace from calling dataFrameToBinaryBlock:

 java.lang.ArrayIndexOutOfBoundsException: 0
        at
org.apache.sysml.runtime.matrix.data.SparseRowVector.append(SparseRowVector.java:196)
        at
org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.append(SparseBlockMCSR.java:267)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.appendValue(MatrixBlock.java:685)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1067)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:999)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
        at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

and here is the stacktrace from calling "read()" directly:

java.lang.ArrayIndexOutOfBoundsException: 2
        at
org.apache.sysml.runtime.matrix.data.SparseBlockCOO.sort(SparseBlockCOO.java:399)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1784)
        at
org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1687)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
        at
org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
        at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
        at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
        at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
        at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
        at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Best,

Anthony


On Sun, Dec 24, 2017 at 3:14 AM, Matthias Boehm <mboe...@gmail.com> wrote:

> Thanks again for catching this issue Anthony - this IJV reblock issue with
> large ultra-sparse matrices is now fixed in master. It likely did not show
> up on the 1% sample because the data was small enough to read it directly
> into the driver.
>
> However, the dataFrameToBinaryBlock might be another issue that I could
> not reproduce yet, so it would be very helpful if you could give it another
> try. Thanks.
>
> Regards,
> Matthias
>
>
> On 12/24/2017 9:57 AM, Matthias Boehm wrote:
>
>> Hi Anthony,
>>
>> thanks for helping to debug this issue. There are no limits other than
>> the dimensions and number of non-zeros being of type long. It sounds
>> more like an issues of converting special cases of ultra-sparse
>> matrices. I'll try to reproduce this issue and give an update as soon as
>> I know more. In the meantime, could you please (a) also provide the
>> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b)
>> try calling your IJV conversion script via spark submit to exclude that
>> this issue is API-related? Thanks.
>>
>> Regards,
>> Matthias
>>
>> On 12/24/2017 1:40 AM, Anthony Thomas wrote:
>>
>>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
>>> dimensions and blocksizes to dataFrameToBinaryBlock both without
>>> success. I
>>> additionally wrote out the matrix to hdfs in IJV format and am still
>>> getting the same error when calling "read()" directly in the DML.
>>> However,
>>> I created a 1% sample of the original data in IJV format and SystemML was
>>> able to read the smaller file without any issue. This would seem to
>>> suggest
>>> that either there is some corruption in the full file or I'm running into
>>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with
>>> 2.4e9 nonzero values, but this seems like it should be well within the
>>> limits of what SystemML/Spark can handle. I also checked for obvious data
>>> errors (file is not 1 indexed or contains blank lines). In case it's
>>> helpful, the stacktrace from reading the data from hdfs in IJV format is
>>> attached. Thanks again for your help - I really appreciate it.
>>>
>>>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
>>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>>>         at java.lang.System.arraycopy(Native Method)
>>>         at
>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRig
>>> htByN(SparseBlockCOO.java:594)
>>>
>>>         at
>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(
>>> SparseBlockCOO.java:323)
>>>
>>>         at
>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp
>>> arse(MatrixBlock.java:1790)
>>>
>>>         at
>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri
>>> xBlock.java:1736)
>>>
>>>         at
>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>>
>>>         at
>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>>
>>>         at
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti
>>> on2$1.apply(JavaPairRDD.scala:1037)
>>>
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>> apply(ExternalSorter.scala:189)
>>>
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>> apply(ExternalSorter.scala:188)
>>>
>>>         at
>>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(
>>> AppendOnlyMap.scala:150)
>>>
>>>         at
>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c
>>> hangeValue(SizeTrackingAppendOnlyMap.scala:32)
>>>
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(
>>> ExternalSorter.scala:194)
>>>
>>>         at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh
>>> uffleWriter.scala:63)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:96)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> Task.scala:53)
>>>
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1149)
>>>
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:624)
>>>
>>>         at java.lang.Thread.run(Thread.java:748)
>>>
>>> Anthony
>>>
>>>
>>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mboe...@gmail.com>
>>> wrote:
>>>
>>> Given the line numbers from the stacktrace, it seems that you use a
>>>> rather
>>>> old version of SystemML. Hence, I would recommend to upgrade to SystemML
>>>> 1.0 or at least 0.15 first.
>>>>
>>>> If the error persists or you're not able to upgrade, please try to call
>>>> dataFrameToBinaryBlock with provided matrix characteristics of
>>>> dimensions
>>>> and blocksizes. The issue you've shown usually originates from incorrect
>>>> meta data (e.g., negative number of columns or block sizes), which
>>>> prevents
>>>> the sparse rows from growing to the necessary sizes.
>>>>
>>>> Regards,
>>>> Matthias
>>>>
>>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>>>
>>>> Hi Matthias,
>>>>>
>>>>> Thanks for the help! In response to your questions:
>>>>>
>>>>>    1. Sorry - this was a typo: the correct schema is: [y: int,
>>>>> features:
>>>>>    vector] - the column "features" was created using Spark's
>>>>> VectorAssembler
>>>>>    and the underlying type is an
>>>>> org.apache.spark.ml.linalg.SparseVector.
>>>>>    Calling x.schema results in: org.apache.spark.sql.types.StructType
>>>>> =
>>>>>    StructType(StructField(features,org.apache.spark.ml.
>>>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>>>    2. "y" converts fine - it appears the only issue is with X. The
>>>>> script
>>>>>    still crashes when running "print(sum(X))". The full stack trace is
>>>>>    attached at the end of the message.
>>>>>    3. Unfortunately, the error persists when calling
>>>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>>>    4. Also just in case this matters: I'm packaging the script into
>>>>> a jar
>>>>>
>>>>>    using SBT assembly and submitting via spark-submit.
>>>>>
>>>>> Here's an updated script:
>>>>>
>>>>>         val input_df = spark.read.parquet(inputPath)
>>>>>         val x = input_df.select(featureNames)
>>>>>         val y = input_df.select("y")
>>>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>>>
>>>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>>>         println("Reading X")
>>>>>         val res_x = ml.execute(script_x)
>>>>>
>>>>> Here is the output of the runtime plan generated by SystemML:
>>>>>
>>>>> # EXPLAIN (RUNTIME):
>>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>>>> PROGRAM ( size CP/SP = 3/0 )
>>>>> --MAIN PROGRAM
>>>>> ----GENERIC (lines 1-2) [recompile=false]
>>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>>>> ------CP rmvar _Var0 _Var1
>>>>>
>>>>> And the resulting stack trace:
>>>>>
>>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0
>>>>> (TID 205,
>>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>>>> SparseRow.java:215)
>>>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>>>> append(SparseBlockMCSR.java:253)
>>>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>>>> appendValue(MatrixBlock.java:663)
>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>> erUtils$
>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>> erUtils$
>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>> apply(JavaRDDLike.scala:186)
>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>> apply(JavaRDDLike.scala:186)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>>>> MapPartitionsRDD.scala:38)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>> ShuffleMapTask.scala:96)
>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>> ShuffleMapTask.scala:53)
>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>>> scala:335)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1149)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>>>> times; aborting job
>>>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>>>> .MLContextException:
>>>>> Exception when executing script
>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>> java:311)
>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>> java:280)
>>>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>>>> ml_algorithms.scala:63)
>>>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:
>>>>> 160)
>>>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>> NativeMethodAccessorImpl.java:62)
>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>>>> SparkSubmit.scala:180)
>>>>>     at
>>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:
>>>>> 119)
>>>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException:
>>>>> Exception
>>>>> occurred while executing runtime program
>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>> Program(
>>>>> ScriptExecutor.java:390)
>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>>>> execute(ScriptExecutor.java:298)
>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>> java:303)
>>>>>     ... 14 more
>>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in
>>>>> program block generated from statement block between lines 1 and 2 --
>>>>> Error
>>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24
>>>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>>>> execute(Program.java:130)
>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>> Program(
>>>>> ScriptExecutor.java:388)
>>>>>     ... 16 more
>>>>> ...
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mboe...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> well, let's do the following to figure this out:
>>>>>
>>>>>>
>>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>>>> please change the third line to val y = input_data.select("label").
>>>>>>
>>>>>> 2) For debugging, I would recommend to use a simple script like
>>>>>> "print(sum(X));" and try converting X and y separately to isolate the
>>>>>> problem.
>>>>>>
>>>>>> 3) If it's still failing, it would be helpful to known (a) if it's an
>>>>>> issue of converting X, y, or both, as well as (b) the full stacktrace.
>>>>>>
>>>>>> 4) As a workaround you might also call our internal converter directly
>>>>>> via:
>>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>>>> isVector),
>>>>>> where jsc is the java spark context, df is the dataset, mc are matrix
>>>>>> characteristics (if unknown, simply use new MatrixCharacteristics()),
>>>>>> containsID indicates if the dataset contains a column "__INDEX"
>>>>>> with the
>>>>>> row indexes, and isVector indicates if the passed datasets contains
>>>>>> vectors
>>>>>> or basic types such as double.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Matthias
>>>>>>
>>>>>>
>>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>>>
>>>>>> Hi SystemML folks,
>>>>>>
>>>>>>>
>>>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>>>> MLContext
>>>>>>> API. The data is derived from a parquet file containing a
>>>>>>> dataframe with
>>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing the
>>>>>>> following:
>>>>>>>
>>>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>>>         val x = input_data.select("features")
>>>>>>>         val y = input_data.select("y")
>>>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>>>                 in("X", x, x_meta).
>>>>>>>                 in("Y", y, y_meta)
>>>>>>>         ...
>>>>>>>
>>>>>>> However, this results in an error from SystemML:
>>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>>> I'm guessing this has something to do with SparkML being zero indexed
>>>>>>> and
>>>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>>>> differently
>>>>>>> here? Note that I also tried converting the dataframe to a
>>>>>>> CoordinateMatrix
>>>>>>> and then creating an RDD[String] in IJV format. That too resulted in
>>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>>>> simple
>>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>>>> what.
>>>>>>> Please let me know if you need more information (I can send along the
>>>>>>> full
>>>>>>> error stacktrace if that would be helpful)!
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Anthony
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>

Reply via email to