Hi MoTao, What about broadcasting the model? Cheers, Ndjido.
> On 08 Aug 2016, at 11:00, MoTao <mo...@sensetime.com> wrote: > > Hi all, > > I'm trying to append a column to a df. > I understand that the new column must be created by > 1) using literals, > 2) transforming an existing column in df, > or 3) generated from udf over this df > > In my case, the column to be appended is created by processing each row, > like > > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value") > val func = udf { > v: Double => { > val model = initModel() > model.process(v) > } > } > val df2 = df.withColumn("valueWithBias", func(col("value"))) > > This works fine. However, for performance reason, I want to avoid > initModel() for each row. > So I come with mapParitions, like > > val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value") > val df2 = df.mapPartitions(rows => { > val model = initModel() > rows.map(row => model.process(row.getAs[Double](0))) > }) > val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL > > But this is wrong as a column of df2 *CANNOT* be appended to df. > > The only solution I got is to force mapPartitions to return a whole row > instead of the new column, > ( Something like "row => Row.fromSeq(row.toSeq ++ > Array(model.process(...)))" ) > which requires a lot of copy as well. > > I wonder how to deal with this problem with as few overhead as possible? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org