Hey Matthias,

Just wanted to confirm that patch above works for me - I'm now able to pass
a dataframe of sparse vectors to a DML script without issue. Sorry for the
slow confirmation on this - I've been out of the office for the last couple
weeks. Thanks for your help debugging this!

Best,

Anthony

On Mon, Dec 25, 2017 at 5:35 AM, Matthias Boehm <mboe...@gmail.com> wrote:

> ok that was very helpful - I just pushed two additional fixes which should
> resolve these issues. The underlying cause was an incorrect sparse row
> preallocation (to reduce GC overhead), which resulted in resizing issues
> for initial sizes of zero. These two patches fix the underlying issues,
> make both MCSR and COO more robust for such ultra-sparse cases, and improve
> the performance for converting ultra-sparse matrices. Thanks again for your
> help Anthony.
>
> As a side note: our default block size is 1000 but converting to 1024 is
> fine if you also set 'sysml.defaultblocksize' to 1024; otherwise there will
> be an unnecessary reblock (with shuffle) from block size 1024 to 1000 on
> the first access of this input.
>
> Regards,
> Matthias
>
>
> On 12/25/2017 3:07 AM, Anthony Thomas wrote:
>
>> Thanks Matthias - unfortunately I'm still running into an
>> ArrayIndexOutOfBounds exception both in reading the file as IJV and when
>> calling dataFrametoBinaryBlock. Just to confirm: I downloaded and compiled
>> the latest version using:
>>
>> git clone https://github.com/apache/systemml
>> cd systemml
>> mvn clean package
>>
>> mvn -version
>> Apache Maven 3.3.9
>> Maven home: /usr/share/maven
>> Java version: 1.8.0_151, vendor: Oracle Corporation
>> Java home: /usr/lib/jvm/java-8-oracle/jre
>> Default locale: en_US, platform encoding: UTF-8
>> OS name: "linux", version: "4.4.0-103-generic", arch: "amd64", family:
>> "unix"
>>
>> I have a simple driver script written in Scala which calls the API
>> methods.
>> I compile the script using SBT (version 1.0.4) and submit using
>> spark-submit (spark version 2.2.0). Here's how I'm calling the methods:
>>
>>         val x = spark.read.parquet(inputPath).select(featureNames)
>>         val mc = new MatrixCharacteristics(199563535L, 71403L, 1024,
>> 1024,
>> 2444225947L) // as far as I know 1024x1024 is default block size in sysml?
>>         println("Reading Direct")
>>         val xrdd = RDDConverterUtils.dataFrameToBinaryBlock(jsc, x, mc,
>> false, true)
>>         xrdd.count
>>
>> here is the stacktrace from calling dataFrameToBinaryBlock:
>>
>>  java.lang.ArrayIndexOutOfBoundsException: 0
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseRowVector.append(
>> SparseRowVector.java:196)
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.append(
>> SparseBlockMCSR.java:267)
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.appendValue
>> (MatrixBlock.java:685)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>> erUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1067)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>> erUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:999)
>>         at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(
>> JavaRDDLike.scala:186)
>>         at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(
>> JavaRDDLike.scala:186)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:797)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:797)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>> 323)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:96)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:53)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> and here is the stacktrace from calling "read()" directly:
>>
>> java.lang.ArrayIndexOutOfBoundsException: 2
>>         at
>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.sort(
>> SparseBlockCOO.java:399)
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp
>> arse(MatrixBlock.java:1784)
>>         at
>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri
>> xBlock.java:1687)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>         at
>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>         at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti
>> on2$1.apply(JavaPairRDD.scala:1037)
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>> apply(ExternalSorter.scala:189)
>>         at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>> apply(ExternalSorter.scala:188)
>>         at
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(
>> AppendOnlyMap.scala:150)
>>         at
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c
>> hangeValue(SizeTrackingAppendOnlyMap.scala:32)
>>         at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(
>> ExternalSorter.scala:194)
>>         at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh
>> uffleWriter.scala:63)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:96)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:53)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1149)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> Best,
>>
>> Anthony
>>
>>
>> On Sun, Dec 24, 2017 at 3:14 AM, Matthias Boehm <mboe...@gmail.com>
>> wrote:
>>
>> Thanks again for catching this issue Anthony - this IJV reblock issue with
>>> large ultra-sparse matrices is now fixed in master. It likely did not
>>> show
>>> up on the 1% sample because the data was small enough to read it directly
>>> into the driver.
>>>
>>> However, the dataFrameToBinaryBlock might be another issue that I could
>>> not reproduce yet, so it would be very helpful if you could give it
>>> another
>>> try. Thanks.
>>>
>>> Regards,
>>> Matthias
>>>
>>>
>>> On 12/24/2017 9:57 AM, Matthias Boehm wrote:
>>>
>>> Hi Anthony,
>>>>
>>>> thanks for helping to debug this issue. There are no limits other than
>>>> the dimensions and number of non-zeros being of type long. It sounds
>>>> more like an issues of converting special cases of ultra-sparse
>>>> matrices. I'll try to reproduce this issue and give an update as soon as
>>>> I know more. In the meantime, could you please (a) also provide the
>>>> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b)
>>>> try calling your IJV conversion script via spark submit to exclude that
>>>> this issue is API-related? Thanks.
>>>>
>>>> Regards,
>>>> Matthias
>>>>
>>>> On 12/24/2017 1:40 AM, Anthony Thomas wrote:
>>>>
>>>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing
>>>>> dimensions and blocksizes to dataFrameToBinaryBlock both without
>>>>> success. I
>>>>> additionally wrote out the matrix to hdfs in IJV format and am still
>>>>> getting the same error when calling "read()" directly in the DML.
>>>>> However,
>>>>> I created a 1% sample of the original data in IJV format and SystemML
>>>>> was
>>>>> able to read the smaller file without any issue. This would seem to
>>>>> suggest
>>>>> that either there is some corruption in the full file or I'm running
>>>>> into
>>>>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols
>>>>> with
>>>>> 2.4e9 nonzero values, but this seems like it should be well within the
>>>>> limits of what SystemML/Spark can handle. I also checked for obvious
>>>>> data
>>>>> errors (file is not 1 indexed or contains blank lines). In case it's
>>>>> helpful, the stacktrace from reading the data from hdfs in IJV format
>>>>> is
>>>>> attached. Thanks again for your help - I really appreciate it.
>>>>>
>>>>>  00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126,
>>>>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException
>>>>>         at java.lang.System.arraycopy(Native Method)
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRig
>>>>> htByN(SparseBlockCOO.java:594)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set(
>>>>> SparseBlockCOO.java:323)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp
>>>>> arse(MatrixBlock.java:1790)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri
>>>>> xBlock.java:1736)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627)
>>>>>
>>>>>         at
>>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega
>>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596)
>>>>>
>>>>>         at
>>>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti
>>>>> on2$1.apply(JavaPairRDD.scala:1037)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>>>> apply(ExternalSorter.scala:189)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5.
>>>>> apply(ExternalSorter.scala:188)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(
>>>>> AppendOnlyMap.scala:150)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c
>>>>> hangeValue(SizeTrackingAppendOnlyMap.scala:32)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(
>>>>> ExternalSorter.scala:194)
>>>>>
>>>>>         at
>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh
>>>>> uffleWriter.scala:63)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>>> Task.scala:96)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>>>> Task.scala:53)
>>>>>
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>>> Executor.java:1149)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>>> lExecutor.java:624)
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> Anthony
>>>>>
>>>>>
>>>>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mboe...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Given the line numbers from the stacktrace, it seems that you use a
>>>>>
>>>>>> rather
>>>>>> old version of SystemML. Hence, I would recommend to upgrade to
>>>>>> SystemML
>>>>>> 1.0 or at least 0.15 first.
>>>>>>
>>>>>> If the error persists or you're not able to upgrade, please try to
>>>>>> call
>>>>>> dataFrameToBinaryBlock with provided matrix characteristics of
>>>>>> dimensions
>>>>>> and blocksizes. The issue you've shown usually originates from
>>>>>> incorrect
>>>>>> meta data (e.g., negative number of columns or block sizes), which
>>>>>> prevents
>>>>>> the sparse rows from growing to the necessary sizes.
>>>>>>
>>>>>> Regards,
>>>>>> Matthias
>>>>>>
>>>>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote:
>>>>>>
>>>>>> Hi Matthias,
>>>>>>
>>>>>>>
>>>>>>> Thanks for the help! In response to your questions:
>>>>>>>
>>>>>>>    1. Sorry - this was a typo: the correct schema is: [y: int,
>>>>>>> features:
>>>>>>>    vector] - the column "features" was created using Spark's
>>>>>>> VectorAssembler
>>>>>>>    and the underlying type is an
>>>>>>> org.apache.spark.ml.linalg.SparseVector.
>>>>>>>    Calling x.schema results in: org.apache.spark.sql.types.Str
>>>>>>> uctType
>>>>>>> =
>>>>>>>    StructType(StructField(features,org.apache.spark.ml.
>>>>>>>    linalg.VectorUDT@3bfc3ba7,true)
>>>>>>>    2. "y" converts fine - it appears the only issue is with X. The
>>>>>>> script
>>>>>>>    still crashes when running "print(sum(X))". The full stack trace
>>>>>>> is
>>>>>>>    attached at the end of the message.
>>>>>>>    3. Unfortunately, the error persists when calling
>>>>>>>    RDDConverterUtils.dataFrameToBinaryBlock directly.
>>>>>>>    4. Also just in case this matters: I'm packaging the script into
>>>>>>> a jar
>>>>>>>
>>>>>>>    using SBT assembly and submitting via spark-submit.
>>>>>>>
>>>>>>> Here's an updated script:
>>>>>>>
>>>>>>>         val input_df = spark.read.parquet(inputPath)
>>>>>>>         val x = input_df.select(featureNames)
>>>>>>>         val y = input_df.select("y")
>>>>>>>         val meta_x = new MatrixMetadata(DF_VECTOR)
>>>>>>>         val meta_y = new MatrixMetadata(DF_DOUBLES)
>>>>>>>
>>>>>>>         val script_x = dml("print(sum(X))").in("X", x, meta_x)
>>>>>>>         println("Reading X")
>>>>>>>         val res_x = ml.execute(script_x)
>>>>>>>
>>>>>>> Here is the output of the runtime plan generated by SystemML:
>>>>>>>
>>>>>>> # EXPLAIN (RUNTIME):
>>>>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB
>>>>>>> # Degree of Parallelism (vcores) local/remote = 24/?
>>>>>>> PROGRAM ( size CP/SP = 3/0 )
>>>>>>> --MAIN PROGRAM
>>>>>>> ----GENERIC (lines 1-2) [recompile=false]
>>>>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24
>>>>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING
>>>>>>> ------CP rmvar _Var0 _Var1
>>>>>>>
>>>>>>> And the resulting stack trace:
>>>>>>>
>>>>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0
>>>>>>> (TID 205,
>>>>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException:
>>>>>>> 0
>>>>>>>     at org.apache.sysml.runtime.matrix.data.SparseRow.append(
>>>>>>> SparseRow.java:215)
>>>>>>>     at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.
>>>>>>> append(SparseBlockMCSR.java:253)
>>>>>>>     at org.apache.sysml.runtime.matrix.data.MatrixBlock.
>>>>>>> appendValue(MatrixBlock.java:663)
>>>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>>>> erUtils$
>>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076)
>>>>>>>     at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert
>>>>>>> erUtils$
>>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008)
>>>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>>>> apply(JavaRDDLike.scala:186)
>>>>>>>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.
>>>>>>> apply(JavaRDDLike.scala:186)
>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$
>>>>>>> anonfun$apply$23.apply(RDD.scala:797)
>>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(
>>>>>>> MapPartitionsRDD.scala:38)
>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>>>>>>> 323)
>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>>>> ShuffleMapTask.scala:96)
>>>>>>>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>>>>>> ShuffleMapTask.scala:53)
>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>>>>> scala:335)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>> ThreadPoolExecutor.java:1149)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>> ThreadPoolExecutor.java:624)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4
>>>>>>> times; aborting job
>>>>>>> Exception in thread "main" org.apache.sysml.api.mlcontext
>>>>>>> .MLContextException:
>>>>>>> Exception when executing script
>>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>>> java:311)
>>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>>> java:280)
>>>>>>>     at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_
>>>>>>> ml_algorithms.scala:63)
>>>>>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:
>>>>>>> 160)
>>>>>>>     at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60)
>>>>>>>     at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>>>> NativeMethodAccessorImpl.java:62)
>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
>>>>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
>>>>>>> SparkSubmit.scala:180)
>>>>>>>     at
>>>>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:
>>>>>>> 119)
>>>>>>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException:
>>>>>>> Exception
>>>>>>> occurred while executing runtime program
>>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>>>> Program(
>>>>>>> ScriptExecutor.java:390)
>>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.
>>>>>>> execute(ScriptExecutor.java:298)
>>>>>>>     at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.
>>>>>>> java:303)
>>>>>>>     ... 14 more
>>>>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException:
>>>>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error
>>>>>>> in
>>>>>>> program block generated from statement block between lines 1 and 2 --
>>>>>>> Error
>>>>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·
>>>>>>> SCALAR·STRING°24
>>>>>>>     at org.apache.sysml.runtime.controlprogram.Program.
>>>>>>> execute(Program.java:130)
>>>>>>>     at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime
>>>>>>> Program(
>>>>>>> ScriptExecutor.java:388)
>>>>>>>     ... 16 more
>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mboe...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> well, let's do the following to figure this out:
>>>>>>>
>>>>>>>
>>>>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector],
>>>>>>>> please change the third line to val y = input_data.select("label").
>>>>>>>>
>>>>>>>> 2) For debugging, I would recommend to use a simple script like
>>>>>>>> "print(sum(X));" and try converting X and y separately to isolate
>>>>>>>> the
>>>>>>>> problem.
>>>>>>>>
>>>>>>>> 3) If it's still failing, it would be helpful to known (a) if it's
>>>>>>>> an
>>>>>>>> issue of converting X, y, or both, as well as (b) the full
>>>>>>>> stacktrace.
>>>>>>>>
>>>>>>>> 4) As a workaround you might also call our internal converter
>>>>>>>> directly
>>>>>>>> via:
>>>>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID,
>>>>>>>> isVector),
>>>>>>>> where jsc is the java spark context, df is the dataset, mc are
>>>>>>>> matrix
>>>>>>>> characteristics (if unknown, simply use new
>>>>>>>> MatrixCharacteristics()),
>>>>>>>> containsID indicates if the dataset contains a column "__INDEX"
>>>>>>>> with the
>>>>>>>> row indexes, and isVector indicates if the passed datasets contains
>>>>>>>> vectors
>>>>>>>> or basic types such as double.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote:
>>>>>>>>
>>>>>>>> Hi SystemML folks,
>>>>>>>>
>>>>>>>>
>>>>>>>>> I'm trying to pass some data from Spark to a DML script via the
>>>>>>>>> MLContext
>>>>>>>>> API. The data is derived from a parquet file containing a
>>>>>>>>> dataframe with
>>>>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing
>>>>>>>>> the
>>>>>>>>> following:
>>>>>>>>>
>>>>>>>>>         val input_data = spark.read.parquet(inputPath)
>>>>>>>>>         val x = input_data.select("features")
>>>>>>>>>         val y = input_data.select("y")
>>>>>>>>>         val x_meta = new MatrixMetadata(DF_VECTOR)
>>>>>>>>>         val y_meta = new MatrixMetadata(DF_DOUBLES)
>>>>>>>>>         val script = dmlFromFile(s"${script_path}/script.dml").
>>>>>>>>>                 in("X", x, x_meta).
>>>>>>>>>                 in("Y", y, y_meta)
>>>>>>>>>         ...
>>>>>>>>>
>>>>>>>>> However, this results in an error from SystemML:
>>>>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0
>>>>>>>>> I'm guessing this has something to do with SparkML being zero
>>>>>>>>> indexed
>>>>>>>>> and
>>>>>>>>> SystemML being 1 indexed. Is there something I should be doing
>>>>>>>>> differently
>>>>>>>>> here? Note that I also tried converting the dataframe to a
>>>>>>>>> CoordinateMatrix
>>>>>>>>> and then creating an RDD[String] in IJV format. That too resulted
>>>>>>>>> in
>>>>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something
>>>>>>>>> simple
>>>>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly
>>>>>>>>> what.
>>>>>>>>> Please let me know if you need more information (I can send along
>>>>>>>>> the
>>>>>>>>> full
>>>>>>>>> error stacktrace if that would be helpful)!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Anthony
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>

Reply via email to