Hi MoTao,
What about broadcasting the model?

Cheers,
Ndjido.

> On 08 Aug 2016, at 11:00, MoTao <mo...@sensetime.com> wrote:
> 
> Hi all,
> 
> I'm trying to append a column to a df.
> I understand that the new column must be created by
> 1) using literals,
> 2) transforming an existing column in df,
> or 3) generated from udf over this df
> 
> In my case, the column to be appended is created by processing each row,
> like
> 
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val func = udf { 
>  v: Double => {
>    val model = initModel()
>    model.process(v)
>  }
> }
> val df2 = df.withColumn("valueWithBias", func(col("value")))
> 
> This works fine. However, for performance reason, I want to avoid
> initModel() for each row.
> So I come with mapParitions, like
> 
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val df2 = df.mapPartitions(rows => {
>  val model = initModel()  
>  rows.map(row => model.process(row.getAs[Double](0)))
> })
> val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
> 
> But this is wrong as a column of df2 *CANNOT* be appended to df.
> 
> The only solution I got is to force mapPartitions to return a whole row
> instead of the new column,
> ( Something like "row => Row.fromSeq(row.toSeq ++
> Array(model.process(...)))" )
> which requires a lot of copy as well.
> 
> I wonder how to deal with this problem with as few overhead as possible?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to