Repository: incubator-systemml Updated Branches: refs/heads/master 301977fc3 -> abd19df94
Adding ID support for DataFrame required for MLContext This change also works for older Spark 1.3.0. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/abd19df9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/abd19df9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/abd19df9 Branch: refs/heads/master Commit: abd19df94ecdb54cd764cce7793340711f858811 Parents: 301977f Author: Niketan Pansare <npan...@us.ibm.com> Authored: Wed Dec 9 12:22:02 2015 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Wed Dec 9 12:22:02 2015 -0800 ---------------------------------------------------------------------- .../spark/utils/RDDConverterUtilsExt.java | 29 ++++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/abd19df9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index 4b8fdde..e64fb89 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Scanner; import org.apache.hadoop.io.Text; import org.apache.spark.Accumulator; @@ -65,6 +66,7 @@ import org.apache.sysml.runtime.util.UtilFunctions; * NOTE: These are experimental converter utils. Once thoroughly tested, they * can be moved to RDDConverterUtils. */ +@SuppressWarnings("unused") public class RDDConverterUtilsExt { public enum RDDConverterTypes { @@ -180,9 +182,30 @@ public class RDDConverterUtilsExt DataFrame df, MatrixCharacteristics mcOut, boolean containsID) throws DMLRuntimeException { if(containsID) { - // Uncomment this when we move to Spark 1.4.0 or higher - // df = df.sort("ID").drop("ID"); - throw new DMLRuntimeException("containsID is not supported yet"); + ArrayList<String> columnToSelect = new ArrayList<String>(); + String firstCol = null; + boolean colIDPresent = false; + for(String col : df.columns()) { + if(col.compareTo("ID") == 0) { + colIDPresent = true; + } + else if(firstCol == null) { + firstCol = col; + } + else { + columnToSelect.add(col); + } + } + + if(!colIDPresent) { + throw new DMLRuntimeException("The column \"ID\" is not present in the dataframe."); + } + else if(firstCol == null) { + throw new DMLRuntimeException("No column other than \"ID\" present in the dataframe."); + } + + // Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID"); + df = df.sort("ID").select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList()); } //determine unknown dimensions and sparsity if required