Kai Londenberg created PIG-3212: ----------------------------------- Summary: Race Conditions in POSort and (Internal)SortedBag during Proactive Spill. Key: PIG-3212 URL: https://issues.apache.org/jira/browse/PIG-3212 Project: Pig Issue Type: Bug Reporter: Kai Londenberg Priority: Critical
The following bug exists in the latest release of Pig 0.11.0 While running some large jobs involving groups and sorts like these: {code} events_by_user = GROUP events BY user_id; sorted_events_by_user = FOREACH events_by_user { A = ORDER events BY ts, split_idx, line_num; GENERATE group, A; } {code} I got a pretty strange behaviour: While this worked on small datasets, if I ran it on large datasets, the results were sometimes not sorted perfectly. So after a long debugging session, I tracked it down to at least one race condition: The following partial stack trace shows how a proactive spill gets triggered on an InternalSortedBag. A spill in turn triggers a sort of that InternalSortedBag. {code} at org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83) at org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455) at org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243) at sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138) at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171) at sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272) at sun.management.Sensor.trigger(Sensor.java:120) {code} At the same time, the same InternalSortedBag might be sorted or accessed within a POSort Operation. For example using the following Code path (line numbers might be off, I had to add debug statements to diagnose this) {code} at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) at org.apache.hadoop.mapred.Child.main(Child.java:170) {code} The key here is: Both operations try to compare and modify elements of the SortedBag simultaneously. This leads to all kinds of problems, most notably incorrectly sorted data. POSort.SortComparator that's passed as a Comparison function to (Internal)SortedBag is not thread safe, since it works by attaching single input tuples to PhysicalOperator's - these Operators in turn are part of the POSort.sortPlans and are re-used among each thread accessing the (Internal)SortedBag. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira