[ 
https://issues.apache.org/jira/browse/SYSTEMML-1232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deron Eriksson reassigned SYSTEMML-1232:
----------------------------------------

    Assignee: Deron Eriksson

> Migrate stringDataFrameToVectorDataFrame if needed
> --------------------------------------------------
>
>                 Key: SYSTEMML-1232
>                 URL: https://issues.apache.org/jira/browse/SYSTEMML-1232
>             Project: SystemML
>          Issue Type: Task
>          Components: APIs
>            Reporter: Deron Eriksson
>            Assignee: Deron Eriksson
>
> Restore and migrate RDDConvererUtilsExt.stringDataFrameToVectorDataFrame 
> method to Spark 2 (mllib->ml Vector) if needed.
> The RDDConvererUtilsExt.stringDataFrameToVectorDataFrame method was removed 
> by commit 
> https://github.com/apache/incubator-systemml/commit/578e595fdc506fb8a0c0b18c312fe420a406276d.
>   If this method is needed, migrate it to Spark 2.
> Old method:
> {code}
>       public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext 
> sqlContext, Dataset<Row> inputDF)
>                       throws DMLRuntimeException {
>               StructField[] oldSchema = inputDF.schema().fields();
>               //create the new schema
>               StructField[] newSchema = new StructField[oldSchema.length];
>               for(int i = 0; i < oldSchema.length; i++) {
>                       String colName = oldSchema[i].name();
>                       newSchema[i] = DataTypes.createStructField(colName, new 
> VectorUDT(), true);
>               }
>               //converter
>               class StringToVector implements Function<Tuple2<Row, Long>, 
> Row> {
>                       private static final long serialVersionUID = 
> -4733816995375745659L;
>                       @Override
>                       public Row call(Tuple2<Row, Long> arg0) throws 
> Exception {
>                               Row oldRow = arg0._1;
>                               int oldNumCols = oldRow.length();
>                               if (oldNumCols > 1) {
>                                       throw new DMLRuntimeException("The row 
> must have at most one column");
>                               }
>                               // parse the various strings. i.e
>                               // ((1.2,4.3, 3.4))  or (1.2, 3.4, 2.2) or (1.2 
> 3.4)
>                               // [[1.2,34.3, 1.2, 1.2]] or [1.2, 3.4] or [1.3 
> 1.2]
>                               Object [] fields = new Object[oldNumCols];
>                               ArrayList<Object> fieldsArr = new 
> ArrayList<Object>();
>                               for (int i = 0; i < oldRow.length(); i++) {
>                                       Object ci=oldRow.get(i);
>                                       if (ci instanceof String) {
>                                               String cis = (String)ci;
>                                               StringBuffer sb = new 
> StringBuffer(cis.trim());
>                                               for (int nid=0; i < 2; i++) { 
> //remove two level nesting
>                                                       if ((sb.charAt(0) == 
> '(' && sb.charAt(sb.length() - 1) == ')') ||
>                                                                       
> (sb.charAt(0) == '[' && sb.charAt(sb.length() - 1) == ']')
>                                                                       ) {
>                                                               
> sb.deleteCharAt(0);
>                                                               
> sb.setLength(sb.length() - 1);
>                                                       }
>                                               }
>                                               //have the replace code
>                                               String ncis = "[" + 
> sb.toString().replaceAll(" *, *", ",") + "]";
>                                               Vector v = Vectors.parse(ncis);
>                                               fieldsArr.add(v);
>                                       } else {
>                                               throw new 
> DMLRuntimeException("Only String is supported");
>                                       }
>                               }
>                               Row row = 
> RowFactory.create(fieldsArr.toArray());
>                               return row;
>                       }
>               }
>               //output DF
>               JavaRDD<Row> newRows = 
> inputDF.rdd().toJavaRDD().zipWithIndex().map(new StringToVector());
>               // DataFrame outDF = sqlContext.createDataFrame(newRows, new 
> StructType(newSchema)); //TODO investigate why it doesn't work
>               Dataset<Row> outDF = sqlContext.createDataFrame(newRows.rdd(),
>                               DataTypes.createStructType(newSchema));
>               return outDF;
>       }
> {code}
> Note: the org.apache.spark.ml.linalg.Vectors.parse() method does not exist in 
> Spark 2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to