Re: Keep state inside map function

2014-10-28 Thread Koert Kuipers
doing cleanup in an iterator like that assumes the iterator always gets
fully read, which is not necessary the case (for example RDD.take does not).

instead i would use mapPartitionsWithContext, in which case you can write a
function of the form.
 f: (TaskContext, Iterator[T]) => Iterator[U]

now you can register a cleanup with the task context, like this:
context.addTaskCompletionListener(new TaskCompletionListener {
  override def onTaskCompletion(context: TaskContext): Unit = dosomething
)

and after that proceed with an iterator transformation as usual


On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen  wrote:

> On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer  wrote:
> > rdd.mapPartitions { partition =>
> >// Some setup code here
> >val result = partition.map(yourfunction)
> >
> >// Some cleanup code here
> >result
> > }
>
> Yes, I realized that after I hit send. You definitely have to store
> and return the result from the mapping!
>
>
> > rdd.mapPartitions { partition =>
> >if (!partition.isEmpty) {
> >
> >  // Some setup code here
> >  partition.map(item => {
> >val output = yourfunction(item)
> >if (!partition.hasNext) {
> >  // Some cleanup code here
> >}
> >output
> >  })
> >} else {
> >  // return an empty Iterator of your return type
> >}
> > }
>
> Great point, yeah. If you knew the number of values were small you
> could collect them and process locally, but this is the right general
> way to do it.
>


Re: Keep state inside map function

2014-07-31 Thread Sean Owen
On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer  wrote:
> rdd.mapPartitions { partition =>
>// Some setup code here
>val result = partition.map(yourfunction)
>
>// Some cleanup code here
>result
> }

Yes, I realized that after I hit send. You definitely have to store
and return the result from the mapping!


> rdd.mapPartitions { partition =>
>if (!partition.isEmpty) {
>
>  // Some setup code here
>  partition.map(item => {
>val output = yourfunction(item)
>if (!partition.hasNext) {
>  // Some cleanup code here
>}
>output
>  })
>} else {
>  // return an empty Iterator of your return type
>}
> }

Great point, yeah. If you knew the number of values were small you
could collect them and process locally, but this is the right general
way to do it.


Re: Keep state inside map function

2014-07-30 Thread Tobias Pfeiffer
Hi,

On Thu, Jul 31, 2014 at 2:23 AM, Sean Owen  wrote:
>
> ... you can run setup code before mapping a bunch of records, and
> after, like so:
>
> rdd.mapPartitions { partition =>
>// Some setup code here
>partition.map(yourfunction)
>// Some cleanup code here
> }
>

Please be careful with that, it will not work as expected. First, it would
have to be

rdd.mapPartitions { partition =>
   // Some setup code here
   val result = partition.map(yourfunction)
   // Some cleanup code here
   result
}

because the function passed in to mapPartitions() needs to return an
Iterator, and if you do it like this, then the cleanup code will run
*before* the processing takes place because partition.map() is executed
lazily.

One example of what actually works is:

rdd.mapPartitions { partition =>
   if (!partition.isEmpty) {
 // Some setup code here
 partition.map(item => {
   val output = yourfunction(item)
   if (!partition.hasNext) {
 // Some cleanup code here
   }
   output
 })
   } else {
 // return an empty Iterator of your return type
   }
}

That is not very pretty, but it is the only way I found to actually get
tearDown code run after map() is run.

Tobias


Re: Keep state inside map function

2014-07-30 Thread Kevin
Thanks to the both of you for your inputs. Looks like I'll play with the
mapPartitions function to start porting MapReduce algorithms to Spark.


On Wed, Jul 30, 2014 at 1:23 PM, Sean Owen  wrote:

> Really, the analog of a Mapper is not map(), but mapPartitions(). Instead
> of:
>
> rdd.map(yourFunction)
>
> ... you can run setup code before mapping a bunch of records, and
> after, like so:
>
> rdd.mapPartitions { partition =>
>// Some setup code here
>partition.map(yourfunction)
>// Some cleanup code here
> }
>
> You couldn't share state across Mappers, or Mappers and Reducers in
> Hadoop. (At least there was no direct way.) Same here. But you can
> maintain state across many map calls.
>
> On Wed, Jul 30, 2014 at 6:07 PM, Kevin  wrote:
> > Hi,
> >
> > Is it possible to maintain state inside a Spark map function? With Hadoop
> > MapReduce, Mappers and Reducers are classes that can have their own state
> > using instance variables. Can this be done with Spark? Are there any
> > examples?
> >
> > Most examples I have seen do a simple operating on the value passed into
> the
> > map function and then pass it along to the reduce function.
> >
> > Thanks in advance.
> >
> > -Kevin
>


Re: Keep state inside map function

2014-07-30 Thread Sean Owen
Really, the analog of a Mapper is not map(), but mapPartitions(). Instead of:

rdd.map(yourFunction)

... you can run setup code before mapping a bunch of records, and
after, like so:

rdd.mapPartitions { partition =>
   // Some setup code here
   partition.map(yourfunction)
   // Some cleanup code here
}

You couldn't share state across Mappers, or Mappers and Reducers in
Hadoop. (At least there was no direct way.) Same here. But you can
maintain state across many map calls.

On Wed, Jul 30, 2014 at 6:07 PM, Kevin  wrote:
> Hi,
>
> Is it possible to maintain state inside a Spark map function? With Hadoop
> MapReduce, Mappers and Reducers are classes that can have their own state
> using instance variables. Can this be done with Spark? Are there any
> examples?
>
> Most examples I have seen do a simple operating on the value passed into the
> map function and then pass it along to the reduce function.
>
> Thanks in advance.
>
> -Kevin


Re: Keep state inside map function

2014-07-30 Thread aaronjosephs
use mapPartitions to get the equivalent functionality to hadoop



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p10969.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Keep state inside map function

2014-07-30 Thread Kevin
Hi,

Is it possible to maintain state inside a Spark map function? With Hadoop
MapReduce, Mappers and Reducers are classes that can have their own state
using instance variables. Can this be done with Spark? Are there any
examples?

Most examples I have seen do a simple operating on the value passed into
the map function and then pass it along to the reduce function.

Thanks in advance.

-Kevin