[ https://issues.apache.org/jira/browse/SYSTEMML-1232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Deron Eriksson reassigned SYSTEMML-1232: ---------------------------------------- Assignee: Deron Eriksson > Migrate stringDataFrameToVectorDataFrame if needed > -------------------------------------------------- > > Key: SYSTEMML-1232 > URL: https://issues.apache.org/jira/browse/SYSTEMML-1232 > Project: SystemML > Issue Type: Task > Components: APIs > Reporter: Deron Eriksson > Assignee: Deron Eriksson > > Restore and migrate RDDConvererUtilsExt.stringDataFrameToVectorDataFrame > method to Spark 2 (mllib->ml Vector) if needed. > The RDDConvererUtilsExt.stringDataFrameToVectorDataFrame method was removed > by commit > https://github.com/apache/incubator-systemml/commit/578e595fdc506fb8a0c0b18c312fe420a406276d. > If this method is needed, migrate it to Spark 2. > Old method: > {code} > public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext > sqlContext, Dataset<Row> inputDF) > throws DMLRuntimeException { > StructField[] oldSchema = inputDF.schema().fields(); > //create the new schema > StructField[] newSchema = new StructField[oldSchema.length]; > for(int i = 0; i < oldSchema.length; i++) { > String colName = oldSchema[i].name(); > newSchema[i] = DataTypes.createStructField(colName, new > VectorUDT(), true); > } > //converter > class StringToVector implements Function<Tuple2<Row, Long>, > Row> { > private static final long serialVersionUID = > -4733816995375745659L; > @Override > public Row call(Tuple2<Row, Long> arg0) throws > Exception { > Row oldRow = arg0._1; > int oldNumCols = oldRow.length(); > if (oldNumCols > 1) { > throw new DMLRuntimeException("The row > must have at most one column"); > } > // parse the various strings. i.e > // ((1.2,4.3, 3.4)) or (1.2, 3.4, 2.2) or (1.2 > 3.4) > // [[1.2,34.3, 1.2, 1.2]] or [1.2, 3.4] or [1.3 > 1.2] > Object [] fields = new Object[oldNumCols]; > ArrayList<Object> fieldsArr = new > ArrayList<Object>(); > for (int i = 0; i < oldRow.length(); i++) { > Object ci=oldRow.get(i); > if (ci instanceof String) { > String cis = (String)ci; > StringBuffer sb = new > StringBuffer(cis.trim()); > for (int nid=0; i < 2; i++) { > //remove two level nesting > if ((sb.charAt(0) == > '(' && sb.charAt(sb.length() - 1) == ')') || > > (sb.charAt(0) == '[' && sb.charAt(sb.length() - 1) == ']') > ) { > > sb.deleteCharAt(0); > > sb.setLength(sb.length() - 1); > } > } > //have the replace code > String ncis = "[" + > sb.toString().replaceAll(" *, *", ",") + "]"; > Vector v = Vectors.parse(ncis); > fieldsArr.add(v); > } else { > throw new > DMLRuntimeException("Only String is supported"); > } > } > Row row = > RowFactory.create(fieldsArr.toArray()); > return row; > } > } > //output DF > JavaRDD<Row> newRows = > inputDF.rdd().toJavaRDD().zipWithIndex().map(new StringToVector()); > // DataFrame outDF = sqlContext.createDataFrame(newRows, new > StructType(newSchema)); //TODO investigate why it doesn't work > Dataset<Row> outDF = sqlContext.createDataFrame(newRows.rdd(), > DataTypes.createStructType(newSchema)); > return outDF; > } > {code} > Note: the org.apache.spark.ml.linalg.Vectors.parse() method does not exist in > Spark 2. -- This message was sent by Atlassian JIRA (v6.3.15#6346)