Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "NativeMapReduce" page has been changed by Aniket Mokashi. http://wiki.apache.org/pig/NativeMapReduce?action=diff&rev1=9&rev2=10 -------------------------------------------------- - = Under Construction = #format wiki #language en @@ -53, +52 @@ }}} === Pig Plans === - Logical Plan- Logical Plan creates a LONative operator with an internal plan that consists of a store and a load operator. The store operator cannot be attached to X at this level as it would start storing X at inputLocation for every plan that includes X, which is not intended. Although we can LOLoad operator for Y at this point, we delay this to physical plan and track this with LONative operator. Since Y has dataflow dependency on X, we make a connection between operators corresponding to these aliased at logical plan. + Logical Plan- Logical Plan creates a LONative operator with an internal plan that consists of a store and a load operator. The store operator cannot be attached to X at this level as it would start storing X at inputLocation for every plan that includes X, which is not intended. Although we can LOLoad operator for Y at this point, we delay this until the mapreduce plan and track this with LONative operator. Since Y has dataflow dependency on X, we make a connection between operators corresponding to these aliased at logical plan. {{{ X = ... ; @@ -68, +67 @@ | ... }}} - TypeCastInserter- + + TypeCastInserter- This is a mandatory optimizer that adds a foreach and a cast operator after a load so that if a field is loaded from a load we can convert it to required type. In absence of this, we fail with a cast exception after load is completed. Currently, we apply this optimizer on LOLoad and LOStream as they can be loaded "AS schema". As, mapreduce clause corresponds to a load operation, this optimization is also applicable to LONative operator. + A test case for this scenario is- + {{{ + B = mapreduce 'mapreduce.jar' Store A into 'input' Load 'output' as (name:chararray, count:int) `wordcount input output`; + C = foreach B generate count+1; + }}} Physical Plan- Logical plan is visited to convert internal plan of load store combination into corresponding physical plan operators and connections are maintained as per the logical plan. {{{ @@ -85, +90 @@ ... }}} - MapReduce Plan- While compiling the mapreduce plan, with MRCompiler, we introduce + MapReduce Plan- While compiling the mapreduce plan, with MRCompiler, we introduce a new MapReduceOper, NativeMapReduceOper that tracks the presence of native mapreduce job inside the plan. It also holds required parameters and jarname. {{{ X = ... ; | | - | |--- (POStore) Store X into 'inputLocation' + |--- (POStore) Store X into 'inputLocation' + + --------------- MR boundary ------------------------- - Y = MapReduce ... ; | + Y = MapReduce ... ; - (PONative) -- innnerPlan ---| + (NativeMapReduceOper) - mymr.jar | + mymr.jar - params |--- (POLoad) Load 'outputLocation' + params + --------------- MR boundary ------------------------- + Y = (POLoad) Load 'outputLocation' | | ... }}} - Inside the JobControlCompiler's compile method if we find the native mapreduce operator we run the org.apache.hadoop.util.RunJar's Main method with the specified parameters. + Inside the JobControlCompiler's compile method if we find the native mapreduce operator we run the org.apache.hadoop.util.RunJar's Main method with the specified parameters. We also make sure all the dependencies of job are obeyed for the native jobs. === Security Manager === - hadoop jar command is equivalent to invoking org.apache.hadoop.util.RunJar's main function with required arguments. RunJar internally can invoke several levels of driver classes before executing the hadoop job (for example- hadoop-example.jar). With the + hadoop jar command is equivalent to invoking org.apache.hadoop.util.RunJar's main function with required arguments. RunJar internally can invoke several levels of driver classes before executing the hadoop job (for example- hadoop-example.jar). To detect failure or success of the job we need to detect the innermost error value and return it to Pig. To achieve this we install our own RunJarSecurityManager that delegates the security management to current security manager and captures the innermost exit code. === Pig Stats === + Pig Stats are populated by assuming Native job as a single instance of mapreduce job and progress is also reported with the same assumption. As the native job is not under control of pig, except for the exit code, it is hard to capture any information about this job. Thus, job id generated is a pseudo id and it doesnt correspond to actual hadoop job id. We mark the native jobs with a new ''FEATURE'' called ''NATIVE''. == References == 1. <<Anchor(ref1)>> PIG-506, "Does pig need a NATIVE keyword?", https://issues.apache.org/jira/browse/PIG-506