答复: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread
Hi Ryan,


The attachment is the event timeline on executors. They are always busy 
computing.

More executors are helpful but that's not my job as a developer.


1. The bad performance could be caused by my poor implementation, as "checkID" 
would not pushdown as a user defined function.

2. To make the group index works, I need to sort the data by id, which leads to 
shuffle of 50T data. That's somehow crazy.


I'm on the way testing HAR, but the discussion brings me lots of insight about 
ORC.

Thanks for your help!



发件人: Ryan 
发送时间: 2017年4月17日 16:48:47
收件人: 莫涛
抄送: user
主题: Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

how about the event timeline on executors? It seems add more executor could 
help.

1. I found a jira(https://issues.apache.org/jira/browse/SPARK-11621) that 
states the ppd should work. And I think "only for matched ones the binary data 
is read" is true if proper index is configured. The row group wouldn't be read 
if the predicate isn't satisfied due to index.

2. It is absolutely true the performance gain depends on the id distribution...

On Mon, Apr 17, 2017 at 4:23 PM, 莫涛 
mailto:mo...@sensetime.com>> wrote:

Hi Ryan,


The attachment is a screen shot for the spark job and this is the only stage 
for this job.

I've changed the partition size to 1GB by "--conf 
spark.sql.files.maxPartitionBytes=1073741824".


1. spark-orc seems not that smart. The input size is almost the whole data. I 
guess "only for matched ones the binary data is read" is not true as orc does 
not know the offset of each BINARY so things like seek could not happen

2. I've tried orc and it does skip the partition that has no hit. This could be 
a solution but the performance depends on the distribution of the given ID 
list. No partition could be skipped in the worst case.


Mo Tao



____
发件人: Ryan mailto:ryan.hd@gmail.com>>
发送时间: 2017年4月17日 15:42:46
收件人: 莫涛
抄送: user
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

1. Per my understanding, for orc files, it should push down the filters, which 
means all id columns will be scanned but only for matched ones the binary data 
is read. I haven't dig into spark-orc reader though..

2. orc itself have row group index and bloom filter index. you may try 
configurations like 'orc.bloom.filter.columns' on the source table first. From 
the spark side, with mapPartitions, it's possible to build sort of index for 
each partition.

And could you check how many tasks does the filter stage have? maybe there's 
too few partitions..

On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 
mailto:mo...@sensetime.com>> wrote:

Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so 
the response time would be "10K * 5MB / disk read speed", or several times of 
this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data 
takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, 
or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark 
document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao


发件人: Ryan mailto:ryan.hd@gmail.com>>
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an 
index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao 
mailto:mo...@sensetime.com>> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-t

答复: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread
It's hadoop archive.


https://hadoop.apache.org/docs/r1.2.1/hadoop_archives.html



发件人: Alonso Isidoro Roman 
发送时间: 2017年4月20日 17:03:33
收件人: 莫涛
抄送: Jörn Franke; user@spark.apache.org
主题: Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

forgive my ignorance, but, what does it mean HAR? a acronym to High available 
record?

Thanks

<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-04-20 10:58 GMT+02:00 莫涛 mailto:mo...@sensetime.com>>:

Hi Jörn,


HAR is a great idea!


For POC, I've archived 1M records and stored the id -> path mapping in text 
(for better readability).

Filtering 1K records takes only 2 minutes now (30 seconds to get the path list 
and 0.5 second per thread to read a record).

Such performance is exactly what I expected: "only the requested BINARY are 
scanned".

Moreover, HAR provides directly access to each record by hdfs shell command.


Thank you very much!


发件人: Jörn Franke mailto:jornfra...@gmail.com>>
发送时间: 2017年4月17日 22:37:48
收件人: 莫涛
抄送: user@spark.apache.org<mailto:user@spark.apache.org>
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

Yes 5 mb is a difficult size, too small for HDFS too big for parquet/orc.
Maybe you can put the data in a HAR and store id, path in orc/parquet.

On 17. Apr 2017, at 10:52, 莫涛 mailto:mo...@sensetime.com>> 
wrote:


Hi Jörn,


I do think a 5 MB column is odd but I don't have any other idea before asking 
this question. The binary data is a short video and the maximum size is no more 
than 50 MB.


Hadoop archive sounds very interesting and I'll try it first to check whether 
filtering is fast on it.


To my best knowledge, HBase works best for record around hundreds of KB and it 
requires extra work of the cluster administrator. So this would be the last 
option.


Thanks!


Mo Tao


发件人: Jörn Franke mailto:jornfra...@gmail.com>>
发送时间: 2017年4月17日 15:59:28
收件人: 莫涛
抄送: user@spark.apache.org<mailto:user@spark.apache.org>
主题: Re: How to store 10M records in HDFS to speed up further filtering?

You need to sort the data by id otherwise q situation can occur where the index 
does not work. Aside from this, it sounds odd to put a 5 MB column using those 
formats. This will be also not so efficient.
What is in the 5 MB binary data?
You could use HAR or maybe Hbase to store this kind of data (if it does not get 
much larger than 5 MB).

> On 17. Apr 2017, at 08:23, MoTao 
> mailto:mo...@sensetime.com>> wrote:
>
> Hi all,
>
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
>
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
>
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>  .filter(checkID($"ID"))
>  .select($"ID", $"BINARY")
>  .write...
>
> Thanks for any advice!
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at 
> Nabble.com<http://Nabble.com>.
>
> -
> To unsubscribe e-mail: 
> user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
>



答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread
Hi Jörn,


HAR is a great idea!


For POC, I've archived 1M records and stored the id -> path mapping in text 
(for better readability).

Filtering 1K records takes only 2 minutes now (30 seconds to get the path list 
and 0.5 second per thread to read a record).

Such performance is exactly what I expected: "only the requested BINARY are 
scanned".

Moreover, HAR provides directly access to each record by hdfs shell command.


Thank you very much!


发件人: Jörn Franke 
发送时间: 2017年4月17日 22:37:48
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

Yes 5 mb is a difficult size, too small for HDFS too big for parquet/orc.
Maybe you can put the data in a HAR and store id, path in orc/parquet.

On 17. Apr 2017, at 10:52, 莫涛 mailto:mo...@sensetime.com>> 
wrote:


Hi Jörn,


I do think a 5 MB column is odd but I don't have any other idea before asking 
this question. The binary data is a short video and the maximum size is no more 
than 50 MB.


Hadoop archive sounds very interesting and I'll try it first to check whether 
filtering is fast on it.


To my best knowledge, HBase works best for record around hundreds of KB and it 
requires extra work of the cluster administrator. So this would be the last 
option.


Thanks!


Mo Tao


发件人: Jörn Franke mailto:jornfra...@gmail.com>>
发送时间: 2017年4月17日 15:59:28
收件人: 莫涛
抄送: user@spark.apache.org<mailto:user@spark.apache.org>
主题: Re: How to store 10M records in HDFS to speed up further filtering?

You need to sort the data by id otherwise q situation can occur where the index 
does not work. Aside from this, it sounds odd to put a 5 MB column using those 
formats. This will be also not so efficient.
What is in the 5 MB binary data?
You could use HAR or maybe Hbase to store this kind of data (if it does not get 
much larger than 5 MB).

> On 17. Apr 2017, at 08:23, MoTao 
> mailto:mo...@sensetime.com>> wrote:
>
> Hi all,
>
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
>
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
>
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>  .filter(checkID($"ID"))
>  .select($"ID", $"BINARY")
>  .write...
>
> Thanks for any advice!
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at 
> Nabble.com<http://Nabble.com>.
>
> -
> To unsubscribe e-mail: 
> user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
>


答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread
Hi Jörn,


I do think a 5 MB column is odd but I don't have any other idea before asking 
this question. The binary data is a short video and the maximum size is no more 
than 50 MB.


Hadoop archive sounds very interesting and I'll try it first to check whether 
filtering is fast on it.


To my best knowledge, HBase works best for record around hundreds of KB and it 
requires extra work of the cluster administrator. So this would be the last 
option.


Thanks!


Mo Tao


发件人: Jörn Franke 
发送时间: 2017年4月17日 15:59:28
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: How to store 10M records in HDFS to speed up further filtering?

You need to sort the data by id otherwise q situation can occur where the index 
does not work. Aside from this, it sounds odd to put a 5 MB column using those 
formats. This will be also not so efficient.
What is in the 5 MB binary data?
You could use HAR or maybe Hbase to store this kind of data (if it does not get 
much larger than 5 MB).

> On 17. Apr 2017, at 08:23, MoTao  wrote:
>
> Hi all,
>
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
>
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
>
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>  .filter(checkID($"ID"))
>  .select($"ID", $"BINARY")
>  .write...
>
> Thanks for any advice!
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread
Hi Ryan,


The attachment is a screen shot for the spark job and this is the only stage 
for this job.

I've changed the partition size to 1GB by "--conf 
spark.sql.files.maxPartitionBytes=1073741824".


1. spark-orc seems not that smart. The input size is almost the whole data. I 
guess "only for matched ones the binary data is read" is not true as orc does 
not know the offset of each BINARY so things like seek could not happen

2. I've tried orc and it does skip the partition that has no hit. This could be 
a solution but the performance depends on the distribution of the given ID 
list. No partition could be skipped in the worst case.


Mo Tao




发件人: Ryan 
发送时间: 2017年4月17日 15:42:46
收件人: 莫涛
抄送: user
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

1. Per my understanding, for orc files, it should push down the filters, which 
means all id columns will be scanned but only for matched ones the binary data 
is read. I haven't dig into spark-orc reader though..

2. orc itself have row group index and bloom filter index. you may try 
configurations like 'orc.bloom.filter.columns' on the source table first. From 
the spark side, with mapPartitions, it's possible to build sort of index for 
each partition.

And could you check how many tasks does the filter stage have? maybe there's 
too few partitions..

On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 
mailto:mo...@sensetime.com>> wrote:

Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so 
the response time would be "10K * 5MB / disk read speed", or several times of 
this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data 
takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, 
or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark 
document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao

____
发件人: Ryan mailto:ryan.hd@gmail.com>>
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an 
index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao 
mailto:mo...@sensetime.com>> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread
Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so 
the response time would be "10K * 5MB / disk read speed", or several times of 
this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data 
takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, 
or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark 
document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao


发件人: Ryan 
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an 
index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao 
mailto:mo...@sensetime.com>> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>




答复: 答复: how to generate a column using mapParition and then add it back to the df?

2016-08-08 Thread
Hi guha,

Thanks a lot!
This is perfectly what I want and I'll try to implement it.


MoTao

发件人: ayan guha 
发送时间: 2016年8月8日 18:05:37
收件人: 莫涛
抄送: ndj...@gmail.com; user@spark.apache.org
主题: Re: 答复: how to generate a column using mapParition and then add it back to 
the df?

Hi

I think you should modify initModel() function to getOrCreateModel() and create 
the model as singleton object. You may want to refer this 
link<http://stackoverflow.com/questions/30450763/spark-streaming-and-connection-pool-implementation>

On Mon, Aug 8, 2016 at 7:44 PM, 莫涛 
mailto:mo...@sensetime.com>> wrote:
Hi Ndjido,

Thanks for your reply.

Yes, it is good idea if the model can be broadcast.

I'm working with a built library (on Linux, say classifier.so and classifier.h) 
and it requires the model file is in the local file system.
As I don't have access to the library code, I write JNI to wrap the classifier.
The model file can be sent to each executor efficiently by addFile and getFile.
But initModel() is still expensive as it actually loads a local file into C++ 
heap memory, which is not serializable.

That's the reason I can not broadcast the model and I have to avoid load model 
as possible as I can.

Best


发件人: ndj...@gmail.com<mailto:ndj...@gmail.com> 
mailto:ndj...@gmail.com>>
发送时间: 2016年8月8日 17:16:27
收件人: 莫涛
抄送: user@spark.apache.org<mailto:user@spark.apache.org>
主题: Re: how to generate a column using mapParition and then add it back to the 
df?


Hi MoTao,
What about broadcasting the model?

Cheers,
Ndjido.

> On 08 Aug 2016, at 11:00, MoTao 
> mailto:mo...@sensetime.com>> wrote:
>
> Hi all,
>
> I'm trying to append a column to a df.
> I understand that the new column must be created by
> 1) using literals,
> 2) transforming an existing column in df,
> or 3) generated from udf over this df
>
> In my case, the column to be appended is created by processing each row,
> like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val func = udf {
>  v: Double => {
>val model = initModel()
>model.process(v)
>  }
> }
> val df2 = df.withColumn("valueWithBias", func(col("value")))
>
> This works fine. However, for performance reason, I want to avoid
> initModel() for each row.
> So I come with mapParitions, like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val df2 = df.mapPartitions(rows => {
>  val model = initModel()
>  rows.map(row => model.process(row.getAs[Double](0)))
> })
> val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
>
> But this is wrong as a column of df2 *CANNOT* be appended to df.
>
> The only solution I got is to force mapPartitions to return a whole row
> instead of the new column,
> ( Something like "row => Row.fromSeq(row.toSeq ++
> Array(model.process(...)))" )
> which requires a lot of copy as well.
>
> I wonder how to deal with this problem with as few overhead as possible?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: 
> user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
>



--
Best Regards,
Ayan Guha


答复: how to generate a column using mapParition and then add it back to the df?

2016-08-08 Thread
Hi Ndjido,

Thanks for your reply.

Yes, it is good idea if the model can be broadcast.

I'm working with a built library (on Linux, say classifier.so and classifier.h) 
and it requires the model file is in the local file system.
As I don't have access to the library code, I write JNI to wrap the classifier.
The model file can be sent to each executor efficiently by addFile and getFile.
But initModel() is still expensive as it actually loads a local file into C++ 
heap memory, which is not serializable.

That's the reason I can not broadcast the model and I have to avoid load model 
as possible as I can.

Best


发件人: ndj...@gmail.com 
发送时间: 2016年8月8日 17:16:27
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: how to generate a column using mapParition and then add it back to the 
df?


Hi MoTao,
What about broadcasting the model?

Cheers,
Ndjido.

> On 08 Aug 2016, at 11:00, MoTao  wrote:
>
> Hi all,
>
> I'm trying to append a column to a df.
> I understand that the new column must be created by
> 1) using literals,
> 2) transforming an existing column in df,
> or 3) generated from udf over this df
>
> In my case, the column to be appended is created by processing each row,
> like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val func = udf {
>  v: Double => {
>val model = initModel()
>model.process(v)
>  }
> }
> val df2 = df.withColumn("valueWithBias", func(col("value")))
>
> This works fine. However, for performance reason, I want to avoid
> initModel() for each row.
> So I come with mapParitions, like
>
> val df = spark.createDataFrame(Seq(1.0, 2.0, 3.0)).toDF("value")
> val df2 = df.mapPartitions(rows => {
>  val model = initModel()
>  rows.map(row => model.process(row.getAs[Double](0)))
> })
> val df3 = df.withColumn("valueWithBias", df2.col("value")) // FAIL
>
> But this is wrong as a column of df2 *CANNOT* be appended to df.
>
> The only solution I got is to force mapPartitions to return a whole row
> instead of the new column,
> ( Something like "row => Row.fromSeq(row.toSeq ++
> Array(model.process(...)))" )
> which requires a lot of copy as well.
>
> I wonder how to deal with this problem with as few overhead as possible?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-generate-a-column-using-mapParition-and-then-add-it-back-to-the-df-tp27493.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>