>>> DataFrame transformerdDF = df.withColumn(fieldName, newCol); >>> org.apache.spark.sql.AnalysisException: resolved attribute(s) _c0#2 missing from id#0,labelStr#1 in operator !Project [id#0,labelStr#1,_c0#2 AS transformedByUDF#3];
I don't think it's a spark bug, it is your application bug. I haven't checked your code details, but from the error message, df's schema is (id, label) and it doesn't have column _c0. You need to keep that column in df. On Wed, Dec 23, 2015 at 4:24 AM, Andy Davidson < a...@santacruzintegration.com> wrote: > Hi Jeff > > Just a reminder of the original python code I am trying to port to java. I > think think there is a bug in withColumn(). What do you think should I file > a bug report? > > Also I have to write a lot of code. And go from a DataFrame to a > JavaRDD<Row> > > def convertMultinomialLabelToBinary(dataFrame): > newColName = "binomialLabel" > binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else > “signal", StringType()) > ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"])) > return ret > > > DataFrame transformerdDF = df.withColumn(fieldName, newCol); raises > > org.apache.spark.sql.AnalysisException: resolved attribute(s) _c0#2 > missing from id#0,labelStr#1 in operator !Project [id#0,labelStr#1,_c0#2 AS > transformedByUDF#3]; > > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37) > > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44) > > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:154) > > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49) > > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103) > > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:49) > > at > org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44) > > at > org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914) > > at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) > > at org.apache.spark.sql.DataFrame.org > $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) > > at org.apache.spark.sql.DataFrame.select(DataFrame.scala:691) > > at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1150) > > at com.pws.fantasySport.ml.UDFTest.test(UDFTest.java:75) > > > > @Test > > public void test() { > > logger.info("BEGIN”); > > > DataFrame df = createData(); > > final String tableName = "myTable"; > > sqlContext.registerDataFrameAsTable(df, tableName); > > > logger.info("print schema"); > > df.printSchema(); > > logger.info("original data before we applied UDF"); > > df.show(); > > > MyUDF udf = new MyUDF(); > > final String udfName = "myUDF"; > > sqlContext.udf().register(udfName, udf, DataTypes.StringType); > > > String fmt = "SELECT %s(%s) FROM %s"; > > String stmt = String.format(fmt, udfName, tableName+".labelStr", > tableName); > > logger.info("AEDWIP stmt:{}", stmt); > > DataFrame udfDF = sqlContext.sql(stmt); > > Row[] results = udfDF.head(3); > > for (Row row : results) { > > logger.info("row returned by applying UDF {}", row); > > } > > > logger.info("udfDF schema"); > > udfDF.printSchema(); > > logger.info("udfDF data"); > > udfDF.show(); > > > final String fieldName = "transformedByUDF"; > > /* > > * as() does not work > > * // https://issues.apache.org/jira/browse/SPARK-12483 > > DataFrame niceUdfDF = udfDF.as(fieldName); // by default the col > is '_c0' > > niceUdfDF.printSchema(); > > logger.info("niceUdfDF data"); > > niceUdfDF.show(); > > */ > > > // get column from data frame call df.withColumnName > > Column newCol = udfDF.col("_c0"); > > > > DataFrame transformerdDF = df.withColumn(fieldName, newCol); > > logger.info("print schema after calling df.withColumn()"); > > transformerdDF.printSchema(); > > logger.info("show() after calling df.withColumn()"); > > transformerdDF.show(); > > > logger.info("END"); > > } > > > DataFrame createData() { > > Features f1 = new Features(1, category1); > > Features f2 = new Features(2, category2); > > ArrayList<Features> data = new ArrayList<Features>(2); > > data.add(f1); > > data.add(f2); > > //JavaRDD<Features> rdd = > javaSparkContext.parallelize(Arrays.asList(f1, f2)); // does not work > > JavaRDD<Features> rdd = javaSparkContext.parallelize(data); > > DataFrame df = sqlContext.createDataFrame(rdd, Features.class); > > return df; > > } > > > class MyUDF implements UDF1<String, String> { > > @Override > > public String call(String s) throws Exception { > > logger.info("AEDWIP s:{}", s); > > String ret = s.equalsIgnoreCase(category1) ? category1 : > category3; > > return ret; > > } > > } > > > public class Features implements Serializable{ > > private static final long serialVersionUID = 1L; > > int id; > > String labelStr; > > > Features(int id, String l) { > > this.id = id; > > this.labelStr = l; > > } > > > public int getId() { > > return id; > > } > > > public void setId(int id) { > > this.id = id; > > } > > > public String getLabelStr() { > > return labelStr; > > } > > > public void setLabelStr(String labelStr) { > > this.labelStr = labelStr; > > } > > } > > > > From: Andrew Davidson <a...@santacruzintegration.com> > Date: Monday, December 21, 2015 at 7:47 PM > To: Jeff Zhang <zjf...@gmail.com> > Cc: "user @spark" <user@spark.apache.org> > Subject: Re: trouble implementing Transformer and calling > DataFrame.withColumn() > > Hi Jeff > > I took a look at Tokenizer.cal, UnaryTransformer.scala, and > Transformer.scala. How ever I can not figure out how implement > createTransformFunc() > in Java 8. > > It would be nice to be able to use this transformer in my pipe line but > not required. The real problem is I can not figure out how to create a > Column I can pass to dataFrame.withColumn() in my Java code. Here is my > original python > > binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else > “signal", StringType()) > ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"])) > > > Any suggestions would be greatly appreciated. > > Andy > > public class LabelToBinaryTransformer > > extends UnaryTransformer<String, String, > LabelToBinaryTransformer> { > > private static final long serialVersionUID = 4202800448830968904L; > > private final UUID uid = UUID.randomUUID(); > > > @Override > > public String uid() { > > return uid.toString(); > > } > > > @Override > > public Function1<String, String> createTransformFunc() { > > // original python code > > // binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else > “signal", StringType()) > > Function1 interface is not easy to implement lots of functions > > ??? > > } > > > @Override > > public DataType outputDataType() { > > StringType ret = new StringType(); > > return ret; > > } > > > > } > > > From: Jeff Zhang <zjf...@gmail.com> > Date: Monday, December 21, 2015 at 6:43 PM > To: Andrew Davidson <a...@santacruzintegration.com> > Cc: "user @spark" <user@spark.apache.org> > Subject: Re: trouble implementing Transformer and calling > DataFrame.withColumn() > > In your case, I would suggest you to extends UnaryTransformer which is > much easier. > > Yeah, I have to admit that there's no document about how to write a custom > Transformer, I think we need to add that, since writing custom Transformer > is a very typical work in machine learning. > > On Tue, Dec 22, 2015 at 9:54 AM, Andy Davidson < > a...@santacruzintegration.com> wrote: > >> >> I am trying to port the following python function to Java 8. I would like >> my java implementation to implement Transformer so I can use it in a >> pipeline. >> >> I am having a heck of a time trying to figure out how to create a Column >> variable I can pass to DataFrame.withColumn(). As far as I know >> withColumn() the only way to append a column to a data frame. >> >> Any comments or suggestions would be greatly appreciated >> >> Andy >> >> >> def convertMultinomialLabelToBinary(dataFrame): >> newColName = "binomialLabel" >> binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else >> “signal", StringType()) >> ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"])) >> return rettrainingDF2 = convertMultinomialLabelToBinary(trainingDF1) >> >> >> >> public class LabelToBinaryTransformer extends Transformer { >> >> private static final long serialVersionUID = 4202800448830968904L; >> >> private final UUID uid = UUID.randomUUID(); >> >> public String inputCol; >> >> public String outputCol; >> >> >> >> @Override >> >> public String uid() { >> >> return uid.toString(); >> >> } >> >> >> @Override >> >> public Transformer copy(ParamMap pm) { >> >> Params xx = defaultCopy(pm); >> >> return ???; >> >> } >> >> >> @Override >> >> public DataFrame transform(DataFrame df) { >> >> MyUDF myUDF = new MyUDF(myUDF, null, null); >> >> Column c = df.col(inputCol); >> >> ??? UDF apply does not take a col???? >> >> Column col = myUDF.apply(df.col(inputCol)); >> >> DataFrame ret = df.withColumn(outputCol, col); >> >> return ret; >> >> } >> >> >> @Override >> >> public StructType transformSchema(StructType arg0) { >> >> *??? What is this function supposed to do???* >> >> ???Is this the type of the new output column???? >> >> } >> >> >> >> class MyUDF extends UserDefinedFunction { >> >> public MyUDF(Object f, DataType dataType, Seq<DataType> >> inputTypes) { >> >> super(f, dataType, inputTypes); >> >> ??? Why do I have to implement this constructor ??? >> >> ??? What are the arguments ??? >> >> } >> >> >> >> @Override >> >> public >> >> Column apply(scala.collection.Seq<Column> exprs) { >> >> What do you do with a scala seq? >> >> return ???; >> >> } >> >> } >> >> } >> >> >> > > > -- > Best Regards > > Jeff Zhang > > -- Best Regards Jeff Zhang