Thanks Matthias - unfortunately I'm still running into an ArrayIndexOutOfBounds exception both in reading the file as IJV and when calling dataFrametoBinaryBlock. Just to confirm: I downloaded and compiled the latest version using:
git clone https://github.com/apache/systemml cd systemml mvn clean package mvn -version Apache Maven 3.3.9 Maven home: /usr/share/maven Java version: 1.8.0_151, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-oracle/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "4.4.0-103-generic", arch: "amd64", family: "unix" I have a simple driver script written in Scala which calls the API methods. I compile the script using SBT (version 1.0.4) and submit using spark-submit (spark version 2.2.0). Here's how I'm calling the methods: val x = spark.read.parquet(inputPath).select(featureNames) val mc = new MatrixCharacteristics(199563535L, 71403L, 1024, 1024, 2444225947L) // as far as I know 1024x1024 is default block size in sysml? println("Reading Direct") val xrdd = RDDConverterUtils.dataFrameToBinaryBlock(jsc, x, mc, false, true) xrdd.count here is the stacktrace from calling dataFrameToBinaryBlock: java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.sysml.runtime.matrix.data.SparseRowVector.append(SparseRowVector.java:196) at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR.append(SparseBlockMCSR.java:267) at org.apache.sysml.runtime.matrix.data.MatrixBlock.appendValue(MatrixBlock.java:685) at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1067) at org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils$DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:999) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) and here is the stacktrace from calling "read()" directly: java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.sysml.runtime.matrix.data.SparseBlockCOO.sort(SparseBlockCOO.java:399) at org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSparse(MatrixBlock.java:1784) at org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(MatrixBlock.java:1687) at org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627) at org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1037) at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189) at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Best, Anthony On Sun, Dec 24, 2017 at 3:14 AM, Matthias Boehm <mboe...@gmail.com> wrote: > Thanks again for catching this issue Anthony - this IJV reblock issue with > large ultra-sparse matrices is now fixed in master. It likely did not show > up on the 1% sample because the data was small enough to read it directly > into the driver. > > However, the dataFrameToBinaryBlock might be another issue that I could > not reproduce yet, so it would be very helpful if you could give it another > try. Thanks. > > Regards, > Matthias > > > On 12/24/2017 9:57 AM, Matthias Boehm wrote: > >> Hi Anthony, >> >> thanks for helping to debug this issue. There are no limits other than >> the dimensions and number of non-zeros being of type long. It sounds >> more like an issues of converting special cases of ultra-sparse >> matrices. I'll try to reproduce this issue and give an update as soon as >> I know more. In the meantime, could you please (a) also provide the >> stacktrace of calling dataFrameToBinaryBlock with SystemML 1.0, and (b) >> try calling your IJV conversion script via spark submit to exclude that >> this issue is API-related? Thanks. >> >> Regards, >> Matthias >> >> On 12/24/2017 1:40 AM, Anthony Thomas wrote: >> >>> Okay thanks for the suggestions - I upgraded to 1.0 and tried providing >>> dimensions and blocksizes to dataFrameToBinaryBlock both without >>> success. I >>> additionally wrote out the matrix to hdfs in IJV format and am still >>> getting the same error when calling "read()" directly in the DML. >>> However, >>> I created a 1% sample of the original data in IJV format and SystemML was >>> able to read the smaller file without any issue. This would seem to >>> suggest >>> that either there is some corruption in the full file or I'm running into >>> some limit. The matrix is on the larger side: 1.9e8 rows by 7e4 cols with >>> 2.4e9 nonzero values, but this seems like it should be well within the >>> limits of what SystemML/Spark can handle. I also checked for obvious data >>> errors (file is not 1 indexed or contains blank lines). In case it's >>> helpful, the stacktrace from reading the data from hdfs in IJV format is >>> attached. Thanks again for your help - I really appreciate it. >>> >>> 00:24:18 WARN TaskSetManager: Lost task 30.0 in stage 0.0 (TID 126, >>> 10.11.10.13, executor 0): java.lang.ArrayIndexOutOfBoundsException >>> at java.lang.System.arraycopy(Native Method) >>> at >>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.shiftRig >>> htByN(SparseBlockCOO.java:594) >>> >>> at >>> org.apache.sysml.runtime.matrix.data.SparseBlockCOO.set( >>> SparseBlockCOO.java:323) >>> >>> at >>> org.apache.sysml.runtime.matrix.data.MatrixBlock.mergeIntoSp >>> arse(MatrixBlock.java:1790) >>> >>> at >>> org.apache.sysml.runtime.matrix.data.MatrixBlock.merge(Matri >>> xBlock.java:1736) >>> >>> at >>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega >>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:627) >>> >>> at >>> org.apache.sysml.runtime.instructions.spark.utils.RDDAggrega >>> teUtils$MergeBlocksFunction.call(RDDAggregateUtils.java:596) >>> >>> at >>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFuncti >>> on2$1.apply(JavaPairRDD.scala:1037) >>> >>> at >>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5. >>> apply(ExternalSorter.scala:189) >>> >>> at >>> org.apache.spark.util.collection.ExternalSorter$$anonfun$5. >>> apply(ExternalSorter.scala:188) >>> >>> at >>> org.apache.spark.util.collection.AppendOnlyMap.changeValue( >>> AppendOnlyMap.scala:150) >>> >>> at >>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.c >>> hangeValue(SizeTrackingAppendOnlyMap.scala:32) >>> >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll( >>> ExternalSorter.scala:194) >>> >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortSh >>> uffleWriter.scala:63) >>> >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >>> Task.scala:96) >>> >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >>> Task.scala:53) >>> >>> at org.apache.spark.scheduler.Task.run(Task.scala:108) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1149) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:624) >>> >>> at java.lang.Thread.run(Thread.java:748) >>> >>> Anthony >>> >>> >>> On Sat, Dec 23, 2017 at 4:27 AM, Matthias Boehm <mboe...@gmail.com> >>> wrote: >>> >>> Given the line numbers from the stacktrace, it seems that you use a >>>> rather >>>> old version of SystemML. Hence, I would recommend to upgrade to SystemML >>>> 1.0 or at least 0.15 first. >>>> >>>> If the error persists or you're not able to upgrade, please try to call >>>> dataFrameToBinaryBlock with provided matrix characteristics of >>>> dimensions >>>> and blocksizes. The issue you've shown usually originates from incorrect >>>> meta data (e.g., negative number of columns or block sizes), which >>>> prevents >>>> the sparse rows from growing to the necessary sizes. >>>> >>>> Regards, >>>> Matthias >>>> >>>> On 12/22/2017 10:42 PM, Anthony Thomas wrote: >>>> >>>> Hi Matthias, >>>>> >>>>> Thanks for the help! In response to your questions: >>>>> >>>>> 1. Sorry - this was a typo: the correct schema is: [y: int, >>>>> features: >>>>> vector] - the column "features" was created using Spark's >>>>> VectorAssembler >>>>> and the underlying type is an >>>>> org.apache.spark.ml.linalg.SparseVector. >>>>> Calling x.schema results in: org.apache.spark.sql.types.StructType >>>>> = >>>>> StructType(StructField(features,org.apache.spark.ml. >>>>> linalg.VectorUDT@3bfc3ba7,true) >>>>> 2. "y" converts fine - it appears the only issue is with X. The >>>>> script >>>>> still crashes when running "print(sum(X))". The full stack trace is >>>>> attached at the end of the message. >>>>> 3. Unfortunately, the error persists when calling >>>>> RDDConverterUtils.dataFrameToBinaryBlock directly. >>>>> 4. Also just in case this matters: I'm packaging the script into >>>>> a jar >>>>> >>>>> using SBT assembly and submitting via spark-submit. >>>>> >>>>> Here's an updated script: >>>>> >>>>> val input_df = spark.read.parquet(inputPath) >>>>> val x = input_df.select(featureNames) >>>>> val y = input_df.select("y") >>>>> val meta_x = new MatrixMetadata(DF_VECTOR) >>>>> val meta_y = new MatrixMetadata(DF_DOUBLES) >>>>> >>>>> val script_x = dml("print(sum(X))").in("X", x, meta_x) >>>>> println("Reading X") >>>>> val res_x = ml.execute(script_x) >>>>> >>>>> Here is the output of the runtime plan generated by SystemML: >>>>> >>>>> # EXPLAIN (RUNTIME): >>>>> # Memory Budget local/remote = 76459MB/?MB/?MB/?MB >>>>> # Degree of Parallelism (vcores) local/remote = 24/? >>>>> PROGRAM ( size CP/SP = 3/0 ) >>>>> --MAIN PROGRAM >>>>> ----GENERIC (lines 1-2) [recompile=false] >>>>> ------CP uak+ X.MATRIX.DOUBLE _Var0.SCALAR.STRING 24 >>>>> ------CP print _Var0.SCALAR.STRING.false _Var1.SCALAR.STRING >>>>> ------CP rmvar _Var0 _Var1 >>>>> >>>>> And the resulting stack trace: >>>>> >>>>> 7/12/22 21:27:20 WARN TaskSetManager: Lost task 3.0 in stage 7.0 >>>>> (TID 205, >>>>> 10.11.10.12, executor 3): java.lang.ArrayIndexOutOfBoundsException: 0 >>>>> at org.apache.sysml.runtime.matrix.data.SparseRow.append( >>>>> SparseRow.java:215) >>>>> at org.apache.sysml.runtime.matrix.data.SparseBlockMCSR. >>>>> append(SparseBlockMCSR.java:253) >>>>> at org.apache.sysml.runtime.matrix.data.MatrixBlock. >>>>> appendValue(MatrixBlock.java:663) >>>>> at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert >>>>> erUtils$ >>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1076) >>>>> at org.apache.sysml.runtime.instructions.spark.utils.RDDConvert >>>>> erUtils$ >>>>> DataFrameToBinaryBlockFunction.call(RDDConverterUtils.java:1008) >>>>> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1. >>>>> apply(JavaRDDLike.scala:186) >>>>> at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1. >>>>> apply(JavaRDDLike.scala:186) >>>>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$ >>>>> anonfun$apply$23.apply(RDD.scala:797) >>>>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$ >>>>> anonfun$apply$23.apply(RDD.scala:797) >>>>> at org.apache.spark.rdd.MapPartitionsRDD.compute( >>>>> MapPartitionsRDD.scala:38) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >>>>> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >>>>> ShuffleMapTask.scala:96) >>>>> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >>>>> ShuffleMapTask.scala:53) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:108) >>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor. >>>>> scala:335) >>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>>> ThreadPoolExecutor.java:1149) >>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>>> ThreadPoolExecutor.java:624) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> 17/12/22 21:27:21 ERROR TaskSetManager: Task 19 in stage 7.0 failed 4 >>>>> times; aborting job >>>>> Exception in thread "main" org.apache.sysml.api.mlcontext >>>>> .MLContextException: >>>>> Exception when executing script >>>>> at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext. >>>>> java:311) >>>>> at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext. >>>>> java:280) >>>>> at SystemMLMLAlgorithms$$anonfun$main$1.apply$mcVI$sp(systemml_ >>>>> ml_algorithms.scala:63) >>>>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala: >>>>> 160) >>>>> at SystemMLMLAlgorithms$.main(systemml_ml_algorithms.scala:60) >>>>> at SystemMLMLAlgorithms.main(systemml_ml_algorithms.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke( >>>>> NativeMethodAccessorImpl.java:62) >>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>>>> DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$ >>>>> deploy$SparkSubmit$$runMain(SparkSubmit.scala:755) >>>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1( >>>>> SparkSubmit.scala:180) >>>>> at >>>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) >>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala: >>>>> 119) >>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>>>> Caused by: org.apache.sysml.api.mlcontext.MLContextException: >>>>> Exception >>>>> occurred while executing runtime program >>>>> at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime >>>>> Program( >>>>> ScriptExecutor.java:390) >>>>> at org.apache.sysml.api.mlcontext.ScriptExecutor. >>>>> execute(ScriptExecutor.java:298) >>>>> at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext. >>>>> java:303) >>>>> ... 14 more >>>>> Caused by: org.apache.sysml.runtime.DMLRuntimeException: >>>>> org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in >>>>> program block generated from statement block between lines 1 and 2 -- >>>>> Error >>>>> evaluating instruction: CP°uak+°X·MATRIX·DOUBLE°_Var0·SCALAR·STRING°24 >>>>> at org.apache.sysml.runtime.controlprogram.Program. >>>>> execute(Program.java:130) >>>>> at org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntime >>>>> Program( >>>>> ScriptExecutor.java:388) >>>>> ... 16 more >>>>> ... >>>>> >>>>> >>>>> >>>>> On Fri, Dec 22, 2017 at 5:48 AM, Matthias Boehm <mboe...@gmail.com> >>>>> wrote: >>>>> >>>>> well, let's do the following to figure this out: >>>>> >>>>>> >>>>>> 1) If the schema is indeed [label: Integer, features: SparseVector], >>>>>> please change the third line to val y = input_data.select("label"). >>>>>> >>>>>> 2) For debugging, I would recommend to use a simple script like >>>>>> "print(sum(X));" and try converting X and y separately to isolate the >>>>>> problem. >>>>>> >>>>>> 3) If it's still failing, it would be helpful to known (a) if it's an >>>>>> issue of converting X, y, or both, as well as (b) the full stacktrace. >>>>>> >>>>>> 4) As a workaround you might also call our internal converter directly >>>>>> via: >>>>>> RDDConverterUtils.dataFrameToBinaryBlock(jsc, df, mc, containsID, >>>>>> isVector), >>>>>> where jsc is the java spark context, df is the dataset, mc are matrix >>>>>> characteristics (if unknown, simply use new MatrixCharacteristics()), >>>>>> containsID indicates if the dataset contains a column "__INDEX" >>>>>> with the >>>>>> row indexes, and isVector indicates if the passed datasets contains >>>>>> vectors >>>>>> or basic types such as double. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Matthias >>>>>> >>>>>> >>>>>> On 12/22/2017 12:00 AM, Anthony Thomas wrote: >>>>>> >>>>>> Hi SystemML folks, >>>>>> >>>>>>> >>>>>>> I'm trying to pass some data from Spark to a DML script via the >>>>>>> MLContext >>>>>>> API. The data is derived from a parquet file containing a >>>>>>> dataframe with >>>>>>> the schema: [label: Integer, features: SparseVector]. I am doing the >>>>>>> following: >>>>>>> >>>>>>> val input_data = spark.read.parquet(inputPath) >>>>>>> val x = input_data.select("features") >>>>>>> val y = input_data.select("y") >>>>>>> val x_meta = new MatrixMetadata(DF_VECTOR) >>>>>>> val y_meta = new MatrixMetadata(DF_DOUBLES) >>>>>>> val script = dmlFromFile(s"${script_path}/script.dml"). >>>>>>> in("X", x, x_meta). >>>>>>> in("Y", y, y_meta) >>>>>>> ... >>>>>>> >>>>>>> However, this results in an error from SystemML: >>>>>>> java.lang.ArrayIndexOutOfBoundsException: 0 >>>>>>> I'm guessing this has something to do with SparkML being zero indexed >>>>>>> and >>>>>>> SystemML being 1 indexed. Is there something I should be doing >>>>>>> differently >>>>>>> here? Note that I also tried converting the dataframe to a >>>>>>> CoordinateMatrix >>>>>>> and then creating an RDD[String] in IJV format. That too resulted in >>>>>>> "ArrayIndexOutOfBoundsExceptions." I'm guessing there's something >>>>>>> simple >>>>>>> I'm doing wrong here, but I haven't been able to figure out exactly >>>>>>> what. >>>>>>> Please let me know if you need more information (I can send along the >>>>>>> full >>>>>>> error stacktrace if that would be helpful)! >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Anthony >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>> >>>