Tomasz Belina created SPARK-29009:
-------------------------------------

             Summary: Returning pojo from udf not working
                 Key: SPARK-29009
                 URL: https://issues.apache.org/jira/browse/SPARK-29009
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.3
            Reporter: Tomasz Belina


 It looks like spark is unable to construct row from pojo returned from udf.

Give POJO:
{code:java}
public class SegmentStub {
    private int id;
    private Date statusDateTime;
    private int healthPointRatio;
}
{code}
Registration of the UDF:
{code:java}
public class ParseResultsUdf {

    public String registerUdf(SparkSession sparkSession) {
        Encoder<SegmentStub> encoder = Encoders.bean(SegmentStub.class);
        final StructType schema = encoder.schema();
        sparkSession.udf().register(UDF_NAME,
                (UDF2<String, String, SegmentStub>) (s, s2) -> new 
SegmentStub(1, Date.valueOf(LocalDate.now()), 2),
                schema
        );
        return UDF_NAME;
    }
}
{code}
Test code:
{code:java}
        List<String[]> strings = Arrays.asList(new String[]{"one", "two"},new 
String[]{"3", "4"});
        JavaRDD<Row> rowJavaRDD = 
sparkContext.parallelize(strings).map(RowFactory::create);

        StructType schema = DataTypes
                .createStructType(new StructField[] { 
DataTypes.createStructField("foe1", DataTypes.StringType, false),
                        DataTypes.createStructField("foe2", 
DataTypes.StringType, false) });


        Dataset<Row> dataFrame = 
sparkSession.sqlContext().createDataFrame(rowJavaRDD, schema);
        Seq<Column> columnSeq = new Set.Set2<>(col("foe1"), 
col("foe2")).toSeq();
        dataFrame.select(callUDF(udfName, columnSeq)).show();
{code}
 throws exception: 
{code:java}
Caused by: java.lang.IllegalArgumentException: The value (SegmentStub(id=1, 
statusDateTime=2019-09-06, healthPointRatio=2)) of the type (udf.SegmentStub) 
cannot be converted to struct<healthPointRatio:int,id:int,statusDateTime:date>
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
        ... 21 more
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to