Author: daijy Date: Thu Apr 1 22:38:14 2010 New Revision: 930118 URL: http://svn.apache.org/viewvc?rev=930118&view=rev Log: PIG-1336: Optimize POStore serialized into JobConf
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=930118&r1=930117&r2=930118&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Apr 1 22:38:14 2010 @@ -193,6 +193,8 @@ OPTIMIZATIONS BUG FIXES +PIG-1336: Optimize POStore serialized into JobConf (daijy) + PIG-1335: UDFFinder should find LoadFunc used by POCast (daijy) PIG-1307: when we spill the DefaultDataBag we are not setting the sized changed flag to be true. (breed via daijy) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=930118&r1=930117&r2=930118&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Apr 1 22:38:14 2010 @@ -425,9 +425,6 @@ public class JobControlCompiler{ LinkedList<POStore> mapStores = PlanHelper.getStores(mro.mapPlan); LinkedList<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan); - conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores)); - conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores)); - for (POStore st: mapStores) { storeLocations.add(st); StoreFuncInterface sFunc = st.getStoreFunc(); @@ -448,11 +445,11 @@ public class JobControlCompiler{ POStore st; if (reduceStores.isEmpty()) { - st = mapStores.remove(0); + st = mapStores.get(0); mro.mapPlan.remove(st); } else { - st = reduceStores.remove(0); + st = reduceStores.get(0); mro.reducePlan.remove(st); } @@ -605,6 +602,13 @@ public class JobControlCompiler{ nwJob.setMapOutputKeyClass(NullablePartitionWritable.class); nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class); } + + // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized + for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);} + for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);} + + conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores)); + conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores)); // Serialize the UDF specific context info. UDFContext.getUDFContext().serialize(conf);