great - I'm glad to hear that. Thanks again for catching these issues Anthony.
Regards, Matthias On Wed, Jan 10, 2018 at 11:09 AM, Anthony Thomas <ahtho...@eng.ucsd.edu> wrote: > Hey Matthias, > > Just wanted to confirm that patch above works for me - I'm now able to pass > a dataframe of sparse vectors to a DML script without issue. Sorry for the > slow confirmation on this - I've been out of the office for the last couple > weeks. Thanks for your help debugging this! > > Best, > > Anthony > > On Mon, Dec 25, 2017 at 5:35 AM, Matthias Boehm <mboe...@gmail.com> wrote: > > > ok that was very helpful - I just pushed two additional fixes which > should > > resolve these issues. The underlying cause was an incorrect sparse row > > preallocation (to reduce GC overhead), which resulted in resizing issues > > for initial sizes of zero. These two patches fix the underlying issues, > > make both MCSR and COO more robust for such ultra-sparse cases, and > improve > > the performance for converting ultra-sparse matrices. Thanks again for > your > > help Anthony. > > > > As a side note: our default block size is 1000 but converting to 1024 is > > fine if you also set 'sysml.defaultblocksize' to 1024; otherwise there > will > > be an unnecessary reblock (with shuffle) from block size 1024 to 1000 on > > the first access of this input. > > > > Regards, > > Matthias > > > > > > On 12/25/2017 3:07 AM, Anthony Thomas wrote: > > > >> Thanks Matthias - unfortunately I'm still running into an > >> ArrayIndexOutOfBounds exception both in reading the file as IJV and when > >> calling dataFrametoBinaryBlock. Just to confirm: I downloaded and > compiled > >> the latest version using: > >> > >> git clone https://github.com/apache/systemml > >> cd systemml > >> mvn clean package > >> > >> mvn -version > >> Apache Maven 3.3.9 > >> Maven home: /usr/share/maven > >> Java version: 1.8.0_151, vendor: Oracle Corporation > >> Java home: /usr/lib/jvm/java-8-oracle/jre > >> Default locale: en_US, platform encoding: UTF-8 > >> OS name: "linux", version: "4.4.0-103-generic", arch: "amd64", family: > >> "unix" > >> > >> I have a simple driver script written in Scala which calls the API > >> methods. > >> I compile the script using SBT (version 1.0.4) and submit using > >> spark-submit (spark version 2.2.0). Here's how I'm calling the methods: > >> > >> val x = spark.read.parquet(inputPath).select(featureNames) > >> val mc = new MatrixCharacteristics(199563535L, 71403L, 1024, > >> 1024, > >> 2444225947L) // as far as I know 1024x1024 is default block size in > sysml? > >> println("Reading Direct") > >> val xrdd = RDDConverterUtils.dataFrameToBinaryBlock(jsc, x, mc, > >> false, true) > >> xrdd.count > >> > >> here is the stacktrace from calling dataFrameToBinaryBlock: > >> > >> java.lang.ArrayIndexOutOfBoundsException: 0 > >> at > >> org.apache.sysml.runtime.matrix.data.SparseRowVector.append( > >> SparseRowVector.java:196) > >> at > >> org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.append( > >> SparseBlockMCSR.java:267) > >> at > >> org.apache.sysml.runtime.matrix.data.MatrixBlock.appendValue > >> (MatrixBlock.java:685) > >> at > >> org.apache.sysml.runtime.instructions.spark.utils.RDDConvert > >> erUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java: > 1067) > >> at > >> org.apache.sysml.runtime.instructions.spark.utils.RDDConvert > >> erUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:999) > >> at > >> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply( > >> JavaRDDLike.scala:186) > >> at > >> org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply( > >> JavaRDDLike.scala:186) > >> at > >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$ > >> apply$23.apply(RDD.scala:797) > >> at > >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$ > >> apply$23.apply(RDD.scala:797) > >> at > >> org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: > >> 323) > >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > >> at > >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap > >> Task.scala:96) > >> at > >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap > >> Task.scala:53) > >> at org.apache.spark.scheduler.Task.run(Task.scala:108) > >> at > >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > >> Executor.java:1149) > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > >> lExecutor.java:624) > >> at java.lang.Thread.run(Thread.java:748) > >> > >> and here is the stacktrace from calling "read()" directly: > >> > >> java.lang.ArrayIndexOutOfBoundsException: 2 > >> at > >> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.sort( > >> SparseBlockCOO.java:399) > >> at > >> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp > >> arse(MatrixBlock.java:1784) > >> at > >> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri > >> xBlock.java:1687) > >> at > >> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega > >> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627) > >> at > >> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega > >> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596) > >> at > >> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti > >> on2$1.apply(JavaPairRDD.scala:1037) > >> at > >> org.apache.spark.util.collection.ExternalSorter$$anonfun$5. > >> apply(ExternalSorter.scala:189) > >> at > >> org.apache.spark.util.collection.ExternalSorter$$anonfun$5. > >> apply(ExternalSorter.scala:188) > >> at > >> org.apache.spark.util.collection.AppendOnlyMap.changeValue( > >> AppendOnlyMap.scala:150) > >> at > >> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c > >> hangeValue(SizeTrackingAppendOnlyMap.scala:32) > >> at > >> org.apache.spark.util.collection.ExternalSorter.insertAll( > >> ExternalSorter.scala:194) > >> at > >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh > >> uffleWriter.scala:63) > >> at > >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap > >> Task.scala:96) > >> at > >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap > >> Task.scala:53) > >> at org.apache.spark.scheduler.Task.run(Task.scala:108) > >> at > >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > >> Executor.java:1149) > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > >> lExecutor.java:624) > >> at java.lang.Thread.run(Thread.java:748) > >> > >> Best, > >> > >> Anthony > >> > >> > >> On Sun, Dec 24, 2017 at 3:14 AM, Matthias Boehm <mboe...@gmail.com> > >> wrote: > >> > >> Thanks again for catching this issue Anthony - this IJV reblock issue > with > >>> large ultra-sparse matrices is now fixed in master. It likely did not > >>> show > >>> up on the 1% sample because the data was small enough to read it > directly > >>> into the driver. > >>> > >>> However, the dataFrameToBinaryBlock might be another issue that I could > >>> not reproduce yet, so it would be very helpful if you could give it > >>> another > >>> try. Thanks. > >>> > >>> Regards, > >>> Matthias > >>> > >>> > >>> On 12/24/2017 9:57 AM, Matthias Boehm wrote: > >>> > >>> Hi Anthony, > >>>> > >>>> thanks for helping to debug this issue. There are no limits other than > >>>> the dimensions and number of non-zeros being of type long. It sounds > >>>> more like an issues of converting special cases of ultra-sparse > >>>> matrices. I'll try to reproduce this issue and give an update as soon > as > >>>> I know more. In the meantime, could you please (a) also provide the > >>>> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and > (b) > >>>> try calling your IJV conversion script via spark submit to exclude > that > >>>> this issue is API-related? Thanks. > >>>> > >>>> Regards, > >>>> Matthias > >>>> > >>>> On 12/24/2017 1:40 AM, Anthony Thomas wrote: > >>>> > >>>> Okay thanks for the suggestions - I upgraded to 1.0 and tried > providing > >>>>> dimensions and blocksizes to dataFrameToBinaryBlock both without > >>>>> success. I > >>>>> additionally wrote out the matrix to hdfs in IJV format and am still > >>>>> getting the same error when calling "read()" directly in the DML. > >>>>> However, > >>>>> I created a 1% sample of the original data in IJV format and SystemML > >>>>> was > >>>>> able to read the smaller file without any issue. This would seem to > >>>>> suggest > >>>>> that either there is some corruption in the full file or I'm running > >>>>> into > >>>>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols > >>>>> with > >>>>> 2.4e9 nonzero values, but this seems like it should be well within > the > >>>>> limits of what SystemML/Spark can handle. I also checked for obvious > >>>>> data > >>>>> errors (file is not 1 indexed or contains blank lines). In case it's > >>>>> helpful, the stacktrace from reading the data from hdfs in IJV format > >>>>> is > >>>>> attached. Thanks again for your help - I really appreciate it. > >>>>> > >>>>> 00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126, > >>>>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException > >>>>> at java.lang.System.arraycopy(Native Method) > >>>>> at > >>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRig > >>>>> htByN(SparseBlockCOO.java:594) > >>>>> > >>>>> at > >>>>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set( > >>>>> SparseBlockCOO.java:323) > >>>>> > >>>>> at > >>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp > >>>>> arse(MatrixBlock.java:1790) > >>>>> > >>>>> at > >>>>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri > >>>>> xBlock.java:1736) > >>>>> > >>>>> at > >>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega > >>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627) > >>>>> > >>>>> at > >>>>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega > >>>>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596) > >>>>> > >>>>> at > >>>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti > >>>>> on2$1.apply(JavaPairRDD.scala:1037) > >>>>> > >>>>> at > >>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5. > >>>>> apply(ExternalSorter.scala:189) > >>>>> > >>>>> at > >>>>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5. > >>>>> apply(ExternalSorter.scala:188) > >>>>> > >>>>> at > >>>>> org.apache.spark.util.collection.AppendOnlyMap.changeValue( > >>>>> AppendOnlyMap.scala:150) > >>>>> > >>>>> at > >>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c > >>>>> hangeValue(SizeTrackingAppendOnlyMap.scala:32) > >>>>> > >>>>> at > >>>>> org.apache.spark.util.collection.ExternalSorter.insertAll( > >>>>> ExternalSorter.scala:194) > >>>>> > >>>>> at > >>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh > >>>>> uffleWriter.scala:63) > >>>>> > >>>>> at > >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap > >>>>> Task.scala:96) > >>>>> > >>>>> at > >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap > >>>>> Task.scala:53) > >>>>> > >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:108) > >>>>> at > >>>>> org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:335) > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool > >>>>> Executor.java:1149) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo > >>>>> lExecutor.java:624) > >>>>> > >>>>> at java.lang.Thread.run(Thread.java:748) > >>>>> > >>>>> Anthony > >>>>> > >>>>> > >>>>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mboe...@gmail.com> > >>>>> wrote: > >>>>> > >>>>> Given the line numbers from the stacktrace, it seems that you use a > >>>>> > >>>>>> rather > >>>>>> old version of SystemML. Hence, I would recommend to upgrade to > >>>>>> SystemML > >>>>>> 1.0 or at least 0.15 first. > >>>>>> > >>>>>> If the error persists or you're not able to upgrade, please try to > >>>>>> call > >>>>>> dataFrameToBinaryBlock with provided matrix characteristics of > >>>>>> dimensions > >>>>>> and blocksizes. The issue you've shown usually originates from > >>>>>> incorrect > >>>>>> meta data (e.g., negative number of columns or block sizes), which > >>>>>> prevents > >>>>>> the sparse rows from growing to the necessary sizes. > >>>>>> > >>>>>> Regards, > >>>>>> Matthias > >>>>>> > >>>>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote: > >>>>>> > >>>>>> Hi Matthias, > >>>>>> > >>>>>>> > >>>>>>> Thanks for the help! In response to your questions: > >>>>>>> > >>>>>>> 1. Sorry - this was a typo: the correct schema is: [y: int, > >>>>>>> features: > >>>>>>> vector] - the column "features" was created using Spark's > >>>>>>> VectorAssembler > >>>>>>> and the underlying type is an > >>>>>>> org.apache.spark.ml.linalg.SparseVector. > >>>>>>> Calling x.schema results in: org.apache.spark.sql.types.Str > >>>>>>> uctType > >>>>>>> = > >>>>>>> StructType(StructField(features,org.apache.spark.ml. > >>>>>>> linalg.VectorUDT@3bfc3ba7,true) > >>>>>>> 2. "y" converts fine - it appears the only issue is with X. The > >>>>>>> script > >>>>>>> still crashes when running "print(sum(X))". The full stack trace > >>>>>>> is > >>>>>>> attached at the end of the message. > >>>>>>> 3. Unfortunately, the error persists when calling > >>>>>>> RDDConverterUtils.dataFrameToBinaryBlock directly. > >>>>>>> 4. Also just in case this matters: I'm packaging the script into > >>>>>>> a jar > >>>>>>> > >>>>>>> using SBT assembly and submitting via spark-submit. > >>>>>>> > >>>>>>> Here's an updated script: > >>>>>>> > >>>>>>> val input_df = spark.read.parquet(inputPath) > >>>>>>> val x = input_df.select(featureNames) > >>>>>>> val y = input_df.select("y") > >>>>>>> val meta_x = new MatrixMetadata(DF_VECTOR) > >>>>>>> val meta_y = new MatrixMetadata(DF_DOUBLES) > >>>>>>> > >>>>>>> val script_x = dml("print(sum(X))").in("X", x, meta_x) > >>>>>>> println("Reading X") > >>>>>>> val res_x = ml.execute(script_x) > >>>>>>> > >>>>>>> Here is the output of the runtime plan generated by SystemML: > >>>>>>> > >>>>>>> # EXPLAIN (RUNTIME): > >>>>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB > >>>>>>> # Degree of Parallelism (vcores) local/remote = 24/? > >>>>>>> PROGRAM ( size CP/SP = 3/0 ) > >>>>>>> --MAIN PROGRAM > >>>>>>> ----GENERIC (lines 1-2) [recompile=false] > >>>>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24 > >>>>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING > >>>>>>> ------CP rmvar _Var0 _Var1 > >>>>>>> > >>>>>>> And the resulting stack trace: > >>>>>>> > >>>>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 > >>>>>>> (TID 205, > >>>>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException > : > >>>>>>> 0 > >>>>>>> at org.apache.sysml.runtime.matrix.data.SparseRow.append( > >>>>>>> SparseRow.java:215) > >>>>>>> at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR. > >>>>>>> append(SparseBlockMCSR.java:253) > >>>>>>> at org.apache.sysml.runtime.matrix.data.MatrixBlock. > >>>>>>> appendValue(MatrixBlock.java:663) > >>>>>>> at org.apache.sysml.runtime.instructions.spark.utils. > RDDConvert > >>>>>>> erUtils$ > >>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076) > >>>>>>> at org.apache.sysml.runtime.instructions.spark.utils. > RDDConvert > >>>>>>> erUtils$ > >>>>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008) > >>>>>>> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1. > >>>>>>> apply(JavaRDDLike.scala:186) > >>>>>>> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1. > >>>>>>> apply(JavaRDDLike.scala:186) > >>>>>>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$ > >>>>>>> anonfun$apply$23.apply(RDD.scala:797) > >>>>>>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$ > >>>>>>> anonfun$apply$23.apply(RDD.scala:797) > >>>>>>> at org.apache.spark.rdd.MapPartitionsRDD.compute( > >>>>>>> MapPartitionsRDD.scala:38) > >>>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: > >>>>>>> 323) > >>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > >>>>>>> at org.apache.spark.scheduler.ShuffleMapTask.runTask( > >>>>>>> ShuffleMapTask.scala:96) > >>>>>>> at org.apache.spark.scheduler.ShuffleMapTask.runTask( > >>>>>>> ShuffleMapTask.scala:53) > >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:108) > >>>>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor. > >>>>>>> scala:335) > >>>>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( > >>>>>>> ThreadPoolExecutor.java:1149) > >>>>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( > >>>>>>> ThreadPoolExecutor.java:624) > >>>>>>> at java.lang.Thread.run(Thread.java:748) > >>>>>>> > >>>>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 > failed 4 > >>>>>>> times; aborting job > >>>>>>> Exception in thread "main" org.apache.sysml.api.mlcontext > >>>>>>> .MLContextException: > >>>>>>> Exception when executing script > >>>>>>> at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext. > >>>>>>> java:311) > >>>>>>> at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext. > >>>>>>> java:280) > >>>>>>> at SystemMLMLAlgorithms$$anonfun$ > main$1.apply$mcVI$sp(systemml_ > >>>>>>> ml_algorithms.scala:63) > >>>>>>> at scala.collection.immutable.Range.foreach$mVc$sp(Range. > scala: > >>>>>>> 160) > >>>>>>> at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60) > >>>>>>> at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala) > >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke( > >>>>>>> NativeMethodAccessorImpl.java:62) > >>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( > >>>>>>> DelegatingMethodAccessorImpl.java:43) > >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>>>>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$ > >>>>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) > >>>>>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1( > >>>>>>> SparkSubmit.scala:180) > >>>>>>> at > >>>>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) > >>>>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit. > scala: > >>>>>>> 119) > >>>>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > >>>>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException: > >>>>>>> Exception > >>>>>>> occurred while executing runtime program > >>>>>>> at org.apache.sysml.api.mlcontext.ScriptExecutor. > executeRuntime > >>>>>>> Program( > >>>>>>> ScriptExecutor.java:390) > >>>>>>> at org.apache.sysml.api.mlcontext.ScriptExecutor. > >>>>>>> execute(ScriptExecutor.java:298) > >>>>>>> at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext. > >>>>>>> java:303) > >>>>>>> ... 14 more > >>>>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException: > >>>>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error > >>>>>>> in > >>>>>>> program block generated from statement block between lines 1 and 2 > -- > >>>>>>> Error > >>>>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0· > >>>>>>> SCALAR·STRING°24 > >>>>>>> at org.apache.sysml.runtime.controlprogram.Program. > >>>>>>> execute(Program.java:130) > >>>>>>> at org.apache.sysml.api.mlcontext.ScriptExecutor. > executeRuntime > >>>>>>> Program( > >>>>>>> ScriptExecutor.java:388) > >>>>>>> ... 16 more > >>>>>>> ... > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mboe...@gmail.com > > > >>>>>>> wrote: > >>>>>>> > >>>>>>> well, let's do the following to figure this out: > >>>>>>> > >>>>>>> > >>>>>>>> 1) If the schema is indeed [label: Integer, features: > SparseVector], > >>>>>>>> please change the third line to val y = > input_data.select("label"). > >>>>>>>> > >>>>>>>> 2) For debugging, I would recommend to use a simple script like > >>>>>>>> "print(sum(X));" and try converting X and y separately to isolate > >>>>>>>> the > >>>>>>>> problem. > >>>>>>>> > >>>>>>>> 3) If it's still failing, it would be helpful to known (a) if it's > >>>>>>>> an > >>>>>>>> issue of converting X, y, or both, as well as (b) the full > >>>>>>>> stacktrace. > >>>>>>>> > >>>>>>>> 4) As a workaround you might also call our internal converter > >>>>>>>> directly > >>>>>>>> via: > >>>>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID, > >>>>>>>> isVector), > >>>>>>>> where jsc is the java spark context, df is the dataset, mc are > >>>>>>>> matrix > >>>>>>>> characteristics (if unknown, simply use new > >>>>>>>> MatrixCharacteristics()), > >>>>>>>> containsID indicates if the dataset contains a column "__INDEX" > >>>>>>>> with the > >>>>>>>> row indexes, and isVector indicates if the passed datasets > contains > >>>>>>>> vectors > >>>>>>>> or basic types such as double. > >>>>>>>> > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote: > >>>>>>>> > >>>>>>>> Hi SystemML folks, > >>>>>>>> > >>>>>>>> > >>>>>>>>> I'm trying to pass some data from Spark to a DML script via the > >>>>>>>>> MLContext > >>>>>>>>> API. The data is derived from a parquet file containing a > >>>>>>>>> dataframe with > >>>>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing > >>>>>>>>> the > >>>>>>>>> following: > >>>>>>>>> > >>>>>>>>> val input_data = spark.read.parquet(inputPath) > >>>>>>>>> val x = input_data.select("features") > >>>>>>>>> val y = input_data.select("y") > >>>>>>>>> val x_meta = new MatrixMetadata(DF_VECTOR) > >>>>>>>>> val y_meta = new MatrixMetadata(DF_DOUBLES) > >>>>>>>>> val script = dmlFromFile(s"${script_path}/script.dml"). > >>>>>>>>> in("X", x, x_meta). > >>>>>>>>> in("Y", y, y_meta) > >>>>>>>>> ... > >>>>>>>>> > >>>>>>>>> However, this results in an error from SystemML: > >>>>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0 > >>>>>>>>> I'm guessing this has something to do with SparkML being zero > >>>>>>>>> indexed > >>>>>>>>> and > >>>>>>>>> SystemML being 1 indexed. Is there something I should be doing > >>>>>>>>> differently > >>>>>>>>> here? Note that I also tried converting the dataframe to a > >>>>>>>>> CoordinateMatrix > >>>>>>>>> and then creating an RDD[String] in IJV format. That too resulted > >>>>>>>>> in > >>>>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's > something > >>>>>>>>> simple > >>>>>>>>> I'm doing wrong here, but I haven't been able to figure out > exactly > >>>>>>>>> what. > >>>>>>>>> Please let me know if you need more information (I can send along > >>>>>>>>> the > >>>>>>>>> full > >>>>>>>>> error stacktrace if that would be helpful)! > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Anthony > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >> >