[ 
https://issues.apache.org/jira/browse/SPARK-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171668#comment-16171668
 ] 

Simeon H.K. Fitch edited comment on SPARK-12823 at 9/19/17 1:30 PM:
--------------------------------------------------------------------

Here is a combined, runnable example, including an attempt at using SQL:

{code:java}
package examples

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object UDFSadness extends App {
  implicit val spark = SparkSession.builder()
    .master("local").appName(getClass.getName).getOrCreate()
  import spark.implicits._

  case class KV(key: Long, value: String)
  case class MyRow(kv: KV)

  val ds: Dataset[MyRow] = spark.createDataset(List(MyRow(KV(1L, "a")), 
MyRow(KV(5L, "b"))))

  val firstColumn = ds(ds.columns.head)

  // Works, but is not what we want (can't always use `map` over `select`)
  ds.map(_.kv.value).show
  // +-----+
  // |value|
  // +-----+
  // |    a|
  // |    b|
  // +-----+

  // This is what we want to be able to implement
  val udf1 = udf((row: MyRow) ⇒ row.kv.value)

  try {
    ds.select(udf1(firstColumn)).show
  }
  catch {
    case t: Throwable ⇒ t.printStackTrace()
    // Exception in thread "main" org.apache.spark.sql.AnalysisException:
    // cannot resolve 'UDF(kv)' due to data type mismatch: argument 1 requires
    // struct<kv:struct<key:bigint,value:string>> type, however,
    // '`kv`' is of struct<key:bigint,value:string> type.;;
  }

  // So lets try something of the form reported in the error
  val udf2 = udf((kv: KV) ⇒ kv.value)

  try {
    ds.select(udf2(firstColumn)).show
  }
  catch {
    case t: Throwable ⇒ //t.printStackTrace()
    // java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    // cannot be cast to examples.UDFSadness$KV
  }

  // What if it's a problem with the use of untyped columns?
  // Try the above again with typed columns.

  try {
    ds.select(udf1(firstColumn.as[MyRow])).show
  }
  catch {
    case t: Throwable ⇒ t.printStackTrace()
    // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
data type
    // mismatch: argument 1 requires struct<kv:struct<key:bigint,value:string>> 
type,
    // however, '`kv`' is of struct<key:bigint,value:string> type.;;
  }

  try {
    ds.select(udf2(firstColumn.as[KV])).show
  }
  catch {
    case t: Throwable ⇒ t.printStackTrace()
    // java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    // cannot be cast to examples.UDFSadness$KV

  }

  // Let's see if we can use SQL as a back door.
  spark.sqlContext.udf.register("udf1", (row: MyRow) ⇒ row.kv.value)
  try {
    ds.createOrReplaceTempView("myKVs")
    spark.sql(s"select udf1($firstColumn) from myKVs").show
  }
  catch {
    case t: Throwable ⇒ t.printStackTrace()
    // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
data
    // type mismatch: argument 1 requires 
struct<kv:struct<key:bigint,value:string>> type,
    // however, 'mykvs.`kv`' is of struct<key:bigint,value:string> type.; line 
1 pos 7;
  }

  // This is the unfortunate workaround. Note that `Row` is
  // `org.apache.spark.sql.Row`
  val udf3 = udf((row: Row) ⇒ row.getString(1))

  ds.select(udf3(firstColumn)).show
  //  +-------+
  //  |UDF(kv)|
  //  +-------+
  //  |      a|
  //  |      b|
  //  +-------+

}

{code}



was (Author: metasim):
Here is a combined, runnable example:


{code:java}

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object UDFSadness extends App {
  implicit val spark = SparkSession.builder()
    .master("local").appName(getClass.getName).getOrCreate()
  import spark.implicits._

  case class KV(key: Long, value: String)
  case class MyRow(kv: KV)

  val ds: Dataset[MyRow] = spark.createDataset(List(MyRow(KV(1L, "a")), 
MyRow(KV(5L, "b"))))

  val firstColumn = ds(ds.columns.head)

  // Works, but is not what we want (can't always use `map` over `select`)
  ds.map(_.kv.value).show
  // +-----+
  // |value|
  // +-----+
  // |    a|
  // |    b|
  // +-----+

  // This is what we want to be able to implement
  val udf1 = udf((row: MyRow) ⇒ row.kv.value)

  try {
    ds.select(udf1(firstColumn)).show
  }
  catch {
    case t: Throwable ⇒ t.printStackTrace()
    // Exception in thread "main" org.apache.spark.sql.AnalysisException:
    // cannot resolve 'UDF(kv)' due to data type mismatch: argument 1 requires
    // struct<kv:struct<key:bigint,value:string>> type, however,
    // '`kv`' is of struct<key:bigint,value:string> type.;;
  }

  // So lets try something of the form reported in the error
  val udf2 = udf((kv: KV) ⇒ kv.value)

  try {
    ds.select(udf2(firstColumn)).show
  }
  catch {
    case t: Throwable ⇒ //t.printStackTrace()
    // java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    // cannot be cast to examples.UDFSadness$KV
  }

  // What if it's a problem with the use of untyped columns?
  // Try the above again with typed columns.

  try {
    ds.select(udf1(firstColumn.as[MyRow])).show
  }
  catch {
    case t: Throwable ⇒ t.printStackTrace()
    // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
data type
    // mismatch: argument 1 requires struct<kv:struct<key:bigint,value:string>> 
type,
    // however, '`kv`' is of struct<key:bigint,value:string> type.;;
  }

  try {
    ds.select(udf2(firstColumn.as[KV])).show
  }
  catch {
    case t: Throwable ⇒ t.printStackTrace()
    // java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    // cannot be cast to examples.UDFSadness$KV

  }

  // This is the unfortunate workaround. Note that `Row` is
  // `org.apache.spark.sql.Row`
  val udf3 = udf((row: Row) ⇒ row.getString(1))

  ds.select(udf3(firstColumn)).show
  //  +-------+
  //  |UDF(kv)|
  //  +-------+
  //  |      a|
  //  |      b|
  //  +-------+

}

{code}


> Cannot create UDF with StructType input
> ---------------------------------------
>
>                 Key: SPARK-12823
>                 URL: https://issues.apache.org/jira/browse/SPARK-12823
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.2
>            Reporter: Frank Rosner
>
> h5. Problem
> It is not possible to apply a UDF to a column that has a struct data type. 
> Two previous requests to the mailing list remained unanswered.
> h5. How-To-Reproduce
> {code}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> import sql.implicits._
> case class KV(key: Long, value: String)
> case class Row(kv: KV)
> val df = sc.parallelize(List(Row(KV(1L, "a")), Row(KV(5L, "b")))).toDF
> val udf1 = org.apache.spark.sql.functions.udf((kv: KV) => kv.value)
> df.select(udf1(df("kv"))).show
> // java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to $line78.$read$$iwC$$iwC$KV
> val udf2 = org.apache.spark.sql.functions.udf((kv: (Long, String)) => kv._2)
> df.select(udf2(df("kv"))).show
> // org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(kv)' due to 
> data type mismatch: argument 1 requires struct<_1:bigint,_2:string> type, 
> however, 'kv' is of struct<key:bigint,value:string> type.;
> {code}
> h5. Mailing List Entries
> - 
> https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3CCACUahd8M=ipCbFCYDyein_=vqyoantn-tpxe6sq395nh10g...@mail.gmail.com%3E
> - https://www.mail-archive.com/user@spark.apache.org/msg43092.html
> h5. Possible Workaround
> If you create a {{UserDefinedFunction}} manually, not using the {{udf}} 
> helper functions, it works. See https://github.com/FRosner/struct-udf, which 
> exposes the {{UserDefinedFunction}} constructor (public from package 
> private). However, then you have to work with a {{Row}}, because it does not 
> automatically convert the row to a case class / tuple.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to