[ https://issues.apache.org/jira/browse/SYSTEMML-1281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872451#comment-15872451 ]
Mike Dusenberry commented on SYSTEMML-1281: ------------------------------------------- cc [~fschueler], [~acs_s], [~nakul02], [~niketanpansare], [~mboehm7], [~reinw...@us.ibm.com] > OOM Error On Binary Write > ------------------------- > > Key: SYSTEMML-1281 > URL: https://issues.apache.org/jira/browse/SYSTEMML-1281 > Project: SystemML > Issue Type: Bug > Affects Versions: SystemML 0.13 > Reporter: Mike Dusenberry > Priority: Blocker > > I'm running into the following heap space OOM error while attempting to save > a large Spark DataFrame to a SystemML binary format via DML {{write}} > statements. > Script: > {code} > tr_sample_filename = os.path.join("data", "train_{}{}.parquet".format(size, > "_grayscale" if grayscale else "")) > val_sample_filename = os.path.join("data", "val_{}{}.parquet".format(size, > "_grayscale" if grayscale else "")) > train_df = sqlContext.read.load(tr_sample_filename) > val_df = sqlContext.read.load(val_sample_filename) > train_df, val_df > # Note: Must use the row index column, or X may not > # necessarily correspond correctly to Y > X_df = train_df.select("__INDEX", "sample") > X_val_df = val_df.select("__INDEX", "sample") > y_df = train_df.select("__INDEX", "tumor_score") > y_val_df = val_df.select("__INDEX", "tumor_score") > X_df, X_val_df, y_df, y_val_df > script = """ > # Scale images to [-1,1] > X = X / 255 > X_val = X_val / 255 > X = X * 2 - 1 > X_val = X_val * 2 - 1 > # One-hot encode the labels > num_tumor_classes = 3 > n = nrow(y) > n_val = nrow(y_val) > Y = table(seq(1, n), y, n, num_tumor_classes) > Y_val = table(seq(1, n_val), y_val, n_val, num_tumor_classes) > """ > outputs = ("X", "X_val", "Y", "Y_val") > script = dml(script).input(X=X_df, X_val=X_val_df, y=y_df, > y_val=y_val_df).output(*outputs) > X, X_val, Y, Y_val = ml.execute(script).get(*outputs) > X, X_val, Y, Y_val > script = """ > write(X, "data/systemml/X_"+size+"_"+c+"_binary", format="binary") > write(Y, "data/systemml/Y_"+size+"_"+c+"_binary", format="binary") > write(X_val, "data/systemml/X_val_"+size+"_"+c+"_binary", format="binary") > write(Y_val, "data/systemml/Y_val_"+size+"_"+c+"_binary", format="binary") > """ > script = dml(script).input(X=X, X_val=X_val, Y=Y, Y_val=Y_val, size=size, c=c) > ml.execute(script) > {code} > General error: > {code} > Caused by: org.apache.sysml.api.mlcontext.MLContextException: Exception > occurred while executing runtime program > at > org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(ScriptExecutor.java:371) > at > org.apache.sysml.api.mlcontext.ScriptExecutor.execute(ScriptExecutor.java:292) > at org.apache.sysml.api.mlcontext.MLContext.execute(MLContext.java:293) > ... 12 more > Caused by: org.apache.sysml.runtime.DMLRuntimeException: > org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in program > block generated from statement block between lines 1 and 11 -- Error > evaluating instruction: CP°mvvar°X°¶_Var49¶°binaryblock > at > org.apache.sysml.runtime.controlprogram.Program.execute(Program.java:130) > at > org.apache.sysml.api.mlcontext.ScriptExecutor.executeRuntimeProgram(ScriptExecutor.java:369) > ... 14 more > Caused by: org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error > in program block generated from statement block between lines 1 and 11 -- > Error evaluating instruction: CP°mvvar°X°¶_Var49¶°binaryblock > at > org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:320) > at > org.apache.sysml.runtime.controlprogram.ProgramBlock.executeInstructions(ProgramBlock.java:221) > at > org.apache.sysml.runtime.controlprogram.ProgramBlock.execute(ProgramBlock.java:168) > at > org.apache.sysml.runtime.controlprogram.Program.execute(Program.java:123) > ... 15 more > Caused by: org.apache.sysml.runtime.controlprogram.caching.CacheException: > Move to data/systemml/X_256_3_binary failed. > at > org.apache.sysml.runtime.controlprogram.caching.CacheableData.moveData(CacheableData.java:1329) > at > org.apache.sysml.runtime.instructions.cp.VariableCPInstruction.processMoveInstruction(VariableCPInstruction.java:706) > at > org.apache.sysml.runtime.instructions.cp.VariableCPInstruction.processInstruction(VariableCPInstruction.java:511) > at > org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:290) > ... 18 more > Caused by: org.apache.sysml.runtime.controlprogram.caching.CacheException: > Export to data/systemml/X_256_3_binary failed. > at > org.apache.sysml.runtime.controlprogram.caching.CacheableData.exportData(CacheableData.java:800) > at > org.apache.sysml.runtime.controlprogram.caching.CacheableData.exportData(CacheableData.java:688) > at > org.apache.sysml.runtime.controlprogram.caching.CacheableData.moveData(CacheableData.java:1315) > ... 21 more > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 269 in stage 40.0 failed 4 times, most recent failure: Lost task 269.3 > in stage 40.0 (TID 61177, 9.30.110.145, executor 10): ExecutorLostFailure > (executor 10 exited caused by one of the running tasks) Reason: Remote RPC > client disassociated. Likely due to containers exceeding thresholds, or > network issues. Check driver logs for WARN messages. > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1456) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1444) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1443) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1443) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1671) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1626) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1615) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2015) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2036) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1154) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1095) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1095) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1095) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1069) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035) > at > org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:803) > at > org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext.writeRDDtoHDFS(SparkExecutionContext.java:976) > at > org.apache.sysml.runtime.controlprogram.caching.MatrixObject.writeBlobFromRDDtoHDFS(MatrixObject.java:558) > at > org.apache.sysml.runtime.controlprogram.caching.CacheableData.exportData(CacheableData.java:796) > ... 23 more > {code} > Actual OOM error on one of many tasks: > {code} > java.lang.OutOfMemoryError: Java heap space > at > org.apache.sysml.runtime.matrix.data.MatrixBlock.allocateDenseBlock(MatrixBlock.java:363) > at > org.apache.sysml.runtime.matrix.data.MatrixBlock.copyDenseToDense(MatrixBlock.java:1308) > at > org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1271) > at > org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1249) > at > org.apache.sysml.runtime.matrix.data.MatrixBlock.<init>(MatrixBlock.java:153) > at > org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$CreateBlockCombinerFunction.call(RDDAggregateUtils.java:260) > at > org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils$CreateBlockCombinerFunction.call(RDDAggregateUtils.java:251) > at > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188) > at > org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144) > at > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:113) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)