[ 
https://issues.apache.org/jira/browse/CRUNCH-73?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13460816#comment-13460816
 ] 

Kiyan Ahmadizadeh commented on CRUNCH-73:
-----------------------------------------

The problem is that PipelineApp extends the Scala trait DelayedInit, which uses 
final static references in its implementation.  

Scoobi has hit this problem as well.  See this note in their user guide:  
http://nicta.github.com/scoobi/guide/Advanced%20Notes.html#Advanced+Notes

Seeing as the ability to transfer side data in function closures is a big 
usability win, I propose changing PipelineApp to not use the DelayedInit trait. 
 Instead, clients should override a method 

run(args: Array[String]): Unit

to implement their pipeline logic.  While this creates a need for a touch more 
boilerplate code when authoring a PipelineApp, I think the value of sending 
side data through closures outweighs this.  

I've written a patch and verified it works using a pipelineapp I've been 
developing.  I'm attaching the first version for review.  I'll attach another 
version shortly that includes a test.
                
> Scrunch applications using PipelineApp do not properly serialize closures to 
> MapReduce tasks.
> ---------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-73
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-73
>             Project: Crunch
>          Issue Type: Bug
>          Components: Scrunch
>    Affects Versions: 0.4.0
>            Reporter: Kiyan Ahmadizadeh
>            Assignee: Kiyan Ahmadizadeh
>
> One of the great potential advantages of using Scala for writing MapReduce 
> pipelines is the ability to send side data as part of function closures, 
> rather than through Hadoop Configurations or the Distributed Cache.  As an 
> absurdly simple example, consider the following Scala PipelineApp that 
> divides all elements of a numeric PCollection by an arbitrary argument:
> object DivideApp extends PipelineApp {
>   val divisor = Integer.valueOf(args(0))
>   val nums = read(From.textFile("numbers.txt"))
>   val dividedNums = nums.map { n => n / divisor }
>   dividedNums.write(To.textFile("dividedNums"))
>   run()
> }
> Executing this PipelineApp fails.  MapReduce tasks get a value of "null" for 
> divisor (or 0 if divisor is forced to be a primitive numeric type).  This 
> indicates that an error is occurring in the serialization of Scala function 
> closures that causes unbound variables in the closure to take on their 
> default JVM values.  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to