Re: Keep state inside map function
doing cleanup in an iterator like that assumes the iterator always gets fully read, which is not necessary the case (for example RDD.take does not). instead i would use mapPartitionsWithContext, in which case you can write a function of the form. f: (TaskContext, Iterator[T]) => Iterator[U] now you can register a cleanup with the task context, like this: context.addTaskCompletionListener(new TaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = dosomething ) and after that proceed with an iterator transformation as usual On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen wrote: > On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer wrote: > > rdd.mapPartitions { partition => > >// Some setup code here > >val result = partition.map(yourfunction) > > > >// Some cleanup code here > >result > > } > > Yes, I realized that after I hit send. You definitely have to store > and return the result from the mapping! > > > > rdd.mapPartitions { partition => > >if (!partition.isEmpty) { > > > > // Some setup code here > > partition.map(item => { > >val output = yourfunction(item) > >if (!partition.hasNext) { > > // Some cleanup code here > >} > >output > > }) > >} else { > > // return an empty Iterator of your return type > >} > > } > > Great point, yeah. If you knew the number of values were small you > could collect them and process locally, but this is the right general > way to do it. >
Re: Keep state inside map function
On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer wrote: > rdd.mapPartitions { partition => >// Some setup code here >val result = partition.map(yourfunction) > >// Some cleanup code here >result > } Yes, I realized that after I hit send. You definitely have to store and return the result from the mapping! > rdd.mapPartitions { partition => >if (!partition.isEmpty) { > > // Some setup code here > partition.map(item => { >val output = yourfunction(item) >if (!partition.hasNext) { > // Some cleanup code here >} >output > }) >} else { > // return an empty Iterator of your return type >} > } Great point, yeah. If you knew the number of values were small you could collect them and process locally, but this is the right general way to do it.
Re: Keep state inside map function
Hi, On Thu, Jul 31, 2014 at 2:23 AM, Sean Owen wrote: > > ... you can run setup code before mapping a bunch of records, and > after, like so: > > rdd.mapPartitions { partition => >// Some setup code here >partition.map(yourfunction) >// Some cleanup code here > } > Please be careful with that, it will not work as expected. First, it would have to be rdd.mapPartitions { partition => // Some setup code here val result = partition.map(yourfunction) // Some cleanup code here result } because the function passed in to mapPartitions() needs to return an Iterator, and if you do it like this, then the cleanup code will run *before* the processing takes place because partition.map() is executed lazily. One example of what actually works is: rdd.mapPartitions { partition => if (!partition.isEmpty) { // Some setup code here partition.map(item => { val output = yourfunction(item) if (!partition.hasNext) { // Some cleanup code here } output }) } else { // return an empty Iterator of your return type } } That is not very pretty, but it is the only way I found to actually get tearDown code run after map() is run. Tobias
Re: Keep state inside map function
Thanks to the both of you for your inputs. Looks like I'll play with the mapPartitions function to start porting MapReduce algorithms to Spark. On Wed, Jul 30, 2014 at 1:23 PM, Sean Owen wrote: > Really, the analog of a Mapper is not map(), but mapPartitions(). Instead > of: > > rdd.map(yourFunction) > > ... you can run setup code before mapping a bunch of records, and > after, like so: > > rdd.mapPartitions { partition => >// Some setup code here >partition.map(yourfunction) >// Some cleanup code here > } > > You couldn't share state across Mappers, or Mappers and Reducers in > Hadoop. (At least there was no direct way.) Same here. But you can > maintain state across many map calls. > > On Wed, Jul 30, 2014 at 6:07 PM, Kevin wrote: > > Hi, > > > > Is it possible to maintain state inside a Spark map function? With Hadoop > > MapReduce, Mappers and Reducers are classes that can have their own state > > using instance variables. Can this be done with Spark? Are there any > > examples? > > > > Most examples I have seen do a simple operating on the value passed into > the > > map function and then pass it along to the reduce function. > > > > Thanks in advance. > > > > -Kevin >
Re: Keep state inside map function
Really, the analog of a Mapper is not map(), but mapPartitions(). Instead of: rdd.map(yourFunction) ... you can run setup code before mapping a bunch of records, and after, like so: rdd.mapPartitions { partition => // Some setup code here partition.map(yourfunction) // Some cleanup code here } You couldn't share state across Mappers, or Mappers and Reducers in Hadoop. (At least there was no direct way.) Same here. But you can maintain state across many map calls. On Wed, Jul 30, 2014 at 6:07 PM, Kevin wrote: > Hi, > > Is it possible to maintain state inside a Spark map function? With Hadoop > MapReduce, Mappers and Reducers are classes that can have their own state > using instance variables. Can this be done with Spark? Are there any > examples? > > Most examples I have seen do a simple operating on the value passed into the > map function and then pass it along to the reduce function. > > Thanks in advance. > > -Kevin
Re: Keep state inside map function
use mapPartitions to get the equivalent functionality to hadoop -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p10969.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Keep state inside map function
Hi, Is it possible to maintain state inside a Spark map function? With Hadoop MapReduce, Mappers and Reducers are classes that can have their own state using instance variables. Can this be done with Spark? Are there any examples? Most examples I have seen do a simple operating on the value passed into the map function and then pass it along to the reduce function. Thanks in advance. -Kevin