Repository: incubator-systemml
Updated Branches:
  refs/heads/master abd19df94 -> 35319ba70


Adding ID support for converter utils vectorDataFrameToBinaryBlock.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/35319ba7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/35319ba7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/35319ba7

Branch: refs/heads/master
Commit: 35319ba708e7ef3b03d7c916f51f975d53161663
Parents: abd19df
Author: Niketan Pansare <npan...@us.ibm.com>
Authored: Wed Dec 9 12:43:44 2015 -0800
Committer: Niketan Pansare <npan...@us.ibm.com>
Committed: Wed Dec 9 12:43:44 2015 -0800

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtilsExt.java       | 63 +++++++++++---------
 1 file changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/35319ba7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index e64fb89..3a227a9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -147,9 +147,7 @@ public class RDDConverterUtilsExt
                        throws DMLRuntimeException {
                
                if(containsID) {
-                       // Uncomment this when we move to Spark 1.4.0 or higher 
-                       // df = df.sort("ID").drop("ID");
-                       throw new DMLRuntimeException("containsID is not 
supported yet");
+                       inputDF = dropColumn(inputDF.sort("ID"), "ID");
                }
                
                DataFrame df = inputDF.select(vectorColumnName);
@@ -178,34 +176,45 @@ public class RDDConverterUtilsExt
                return out;
        }
        
+       /**
+        * Adding utility to support for dropping columns for older Spark 
versions.
+        * @param df
+        * @param column
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static DataFrame dropColumn(DataFrame df, String column) throws 
DMLRuntimeException {
+               ArrayList<String> columnToSelect = new ArrayList<String>();
+               String firstCol = null;
+               boolean colPresent = false;
+               for(String col : df.columns()) {
+                       if(col.compareTo(column) == 0) {
+                               colPresent = true;
+                       }
+                       else if(firstCol == null) {
+                               firstCol = col;
+                       }
+                       else {
+                               columnToSelect.add(col);
+                       }
+               }
+               
+               if(!colPresent) {
+                       throw new DMLRuntimeException("The column \"" + column 
+ "\" is not present in the dataframe.");
+               }
+               else if(firstCol == null) {
+                       throw new DMLRuntimeException("No column other than \"" 
+ column + "\" present in the dataframe.");
+               }
+               
+               // Round about way to do in Java (not exposed in Spark 1.3.0): 
df = df.sort("ID").drop("ID");
+               return df.select(firstCol, 
scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
+       }
+       
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
                        DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID) 
                        throws DMLRuntimeException {
                if(containsID) {
-                       ArrayList<String> columnToSelect = new 
ArrayList<String>();
-                       String firstCol = null;
-                       boolean colIDPresent = false;
-                       for(String col : df.columns()) {
-                               if(col.compareTo("ID") == 0) {
-                                       colIDPresent = true;
-                               }
-                               else if(firstCol == null) {
-                                       firstCol = col;
-                               }
-                               else {
-                                       columnToSelect.add(col);
-                               }
-                       }
-                       
-                       if(!colIDPresent) {
-                               throw new DMLRuntimeException("The column 
\"ID\" is not present in the dataframe.");
-                       }
-                       else if(firstCol == null) {
-                               throw new DMLRuntimeException("No column other 
than \"ID\" present in the dataframe.");
-                       }
-                       
-                       // Round about way to do in Java (not exposed in Spark 
1.3.0): df = df.sort("ID").drop("ID");
-                       df = df.sort("ID").select(firstCol, 
scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
+                       df = dropColumn(df.sort("ID"), "ID");
                }
                        
                //determine unknown dimensions and sparsity if required

Reply via email to