Repository: incubator-systemml Updated Branches: refs/heads/master abd19df94 -> 35319ba70
Adding ID support for converter utils vectorDataFrameToBinaryBlock. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/35319ba7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/35319ba7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/35319ba7 Branch: refs/heads/master Commit: 35319ba708e7ef3b03d7c916f51f975d53161663 Parents: abd19df Author: Niketan Pansare <npan...@us.ibm.com> Authored: Wed Dec 9 12:43:44 2015 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Wed Dec 9 12:43:44 2015 -0800 ---------------------------------------------------------------------- .../spark/utils/RDDConverterUtilsExt.java | 63 +++++++++++--------- 1 file changed, 36 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/35319ba7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index e64fb89..3a227a9 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -147,9 +147,7 @@ public class RDDConverterUtilsExt throws DMLRuntimeException { if(containsID) { - // Uncomment this when we move to Spark 1.4.0 or higher - // df = df.sort("ID").drop("ID"); - throw new DMLRuntimeException("containsID is not supported yet"); + inputDF = dropColumn(inputDF.sort("ID"), "ID"); } DataFrame df = inputDF.select(vectorColumnName); @@ -178,34 +176,45 @@ public class RDDConverterUtilsExt return out; } + /** + * Adding utility to support for dropping columns for older Spark versions. + * @param df + * @param column + * @return + * @throws DMLRuntimeException + */ + public static DataFrame dropColumn(DataFrame df, String column) throws DMLRuntimeException { + ArrayList<String> columnToSelect = new ArrayList<String>(); + String firstCol = null; + boolean colPresent = false; + for(String col : df.columns()) { + if(col.compareTo(column) == 0) { + colPresent = true; + } + else if(firstCol == null) { + firstCol = col; + } + else { + columnToSelect.add(col); + } + } + + if(!colPresent) { + throw new DMLRuntimeException("The column \"" + column + "\" is not present in the dataframe."); + } + else if(firstCol == null) { + throw new DMLRuntimeException("No column other than \"" + column + "\" present in the dataframe."); + } + + // Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID"); + return df.select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList()); + } + public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc, DataFrame df, MatrixCharacteristics mcOut, boolean containsID) throws DMLRuntimeException { if(containsID) { - ArrayList<String> columnToSelect = new ArrayList<String>(); - String firstCol = null; - boolean colIDPresent = false; - for(String col : df.columns()) { - if(col.compareTo("ID") == 0) { - colIDPresent = true; - } - else if(firstCol == null) { - firstCol = col; - } - else { - columnToSelect.add(col); - } - } - - if(!colIDPresent) { - throw new DMLRuntimeException("The column \"ID\" is not present in the dataframe."); - } - else if(firstCol == null) { - throw new DMLRuntimeException("No column other than \"ID\" present in the dataframe."); - } - - // Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID"); - df = df.sort("ID").select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList()); + df = dropColumn(df.sort("ID"), "ID"); } //determine unknown dimensions and sparsity if required