Repository: incubator-systemml Updated Branches: refs/heads/master 35319ba70 -> 94fd4d741
Adding converter utils to extract specific columns from DataFrame Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/94fd4d74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/94fd4d74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/94fd4d74 Branch: refs/heads/master Commit: 94fd4d7413490c66174336e00355ef12f7b56f47 Parents: 35319ba Author: Niketan Pansare <npan...@us.ibm.com> Authored: Wed Dec 9 15:06:01 2015 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Wed Dec 9 15:06:01 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/MLContext.java | 3 --- .../spark/utils/RDDConverterUtilsExt.java | 28 +++++++++++++++++--- 2 files changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/94fd4d74/src/main/java/org/apache/sysml/api/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java index bcfc976..4b9ad8d 100644 --- a/src/main/java/org/apache/sysml/api/MLContext.java +++ b/src/main/java/org/apache/sysml/api/MLContext.java @@ -248,9 +248,6 @@ public class MLContext { /** * Register DataFrame as input. - * Current version doesnot support containsID=true. - * Note: for Spark 1.4.0 or higher, registerInput(varName, df.sort("ID").drop("ID"), true) = registerInput(varName, df, false) - * <p> * Marks the variable in the DML script as input variable. * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation * would have been created by reading a HDFS file. http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/94fd4d74/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index 3a227a9..af4885a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -206,13 +206,35 @@ public class RDDConverterUtilsExt throw new DMLRuntimeException("No column other than \"" + column + "\" present in the dataframe."); } - // Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID"); + // Round about way to do in Java (not exposed in Spark 1.3.0): df = df.drop("ID"); return df.select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList()); } + public static DataFrame projectColumns(DataFrame df, ArrayList<String> columns) throws DMLRuntimeException { + ArrayList<String> columnToSelect = new ArrayList<String>(); + for(int i = 1; i < columns.size(); i++) { + columnToSelect.add(columns.get(i)); + } + return df.select(columns.get(0), scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList()); + } + + public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc, + DataFrame df, MatrixCharacteristics mcOut, boolean containsID) throws DMLRuntimeException { + return dataFrameToBinaryBlock(sc, df, mcOut, containsID, null); + } + + public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc, + DataFrame df, MatrixCharacteristics mcOut, ArrayList<String> columns) throws DMLRuntimeException { + return dataFrameToBinaryBlock(sc, df, mcOut, false, columns); + } + public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc, - DataFrame df, MatrixCharacteristics mcOut, boolean containsID) + DataFrame df, MatrixCharacteristics mcOut, boolean containsID, ArrayList<String> columns) throws DMLRuntimeException { + if(columns != null) { + df = projectColumns(df, columns); + } + if(containsID) { df = dropColumn(df.sort("ID"), "ID"); } @@ -276,7 +298,7 @@ public class RDDConverterUtilsExt for(int i = 0; i < oldNumCols; i++) { fields[i] = arg0._1.get(i); } - fields[oldNumCols] = new Double(arg0._2); + fields[oldNumCols] = new Double(arg0._2 + 1); return RowFactory.create(fields); }