Tomasz Belina created SPARK-29009: ------------------------------------- Summary: Returning pojo from udf not working Key: SPARK-29009 URL: https://issues.apache.org/jira/browse/SPARK-29009 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.3 Reporter: Tomasz Belina
It looks like spark is unable to construct row from pojo returned from udf. Give POJO: {code:java} public class SegmentStub { private int id; private Date statusDateTime; private int healthPointRatio; } {code} Registration of the UDF: {code:java} public class ParseResultsUdf { public String registerUdf(SparkSession sparkSession) { Encoder<SegmentStub> encoder = Encoders.bean(SegmentStub.class); final StructType schema = encoder.schema(); sparkSession.udf().register(UDF_NAME, (UDF2<String, String, SegmentStub>) (s, s2) -> new SegmentStub(1, Date.valueOf(LocalDate.now()), 2), schema ); return UDF_NAME; } } {code} Test code: {code:java} List<String[]> strings = Arrays.asList(new String[]{"one", "two"},new String[]{"3", "4"}); JavaRDD<Row> rowJavaRDD = sparkContext.parallelize(strings).map(RowFactory::create); StructType schema = DataTypes .createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false), DataTypes.createStructField("foe2", DataTypes.StringType, false) }); Dataset<Row> dataFrame = sparkSession.sqlContext().createDataFrame(rowJavaRDD, schema); Seq<Column> columnSeq = new Set.Set2<>(col("foe1"), col("foe2")).toSeq(); dataFrame.select(callUDF(udfName, columnSeq)).show(); {code} throws exception: {code:java} Caused by: java.lang.IllegalArgumentException: The value (SegmentStub(id=1, statusDateTime=2019-09-06, healthPointRatio=2)) of the type (udf.SegmentStub) cannot be converted to struct<healthPointRatio:int,id:int,statusDateTime:date> at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396) ... 21 more } {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org