[ https://issues.apache.org/jira/browse/PIG-3212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kai Londenberg updated PIG-3212: -------------------------------- Attachment: PIG-3212-p1.patch I have attached a patch which should fix this problem. To clarify a bit more, what caused the problems. The root cause of all problems here is that POSort.SortComparator is not thread safe - which is very hard to fix in a performant way. So I didn't do that in this fix. Another problem is that the SpillableMemoryManager may call the spill method in it's own thread at any time, and that InternalSortedBag itself is not thread safe enough itself. The approach I took in this patch is simple: First - synchronize modifications to InternalSortedBag correctly. Second - prevent all modifications (including spills), once a read iterator has been started. The last point is extremely important. POSort may create multiple InternalSortedBags after each other, and each of these is registered with the SpillableMemoryManager. Since the WeakReferences held by the SpillableMemoryManager are not cleared immediately if all other references to the old InternalSortedBag have been removed, it's possible that the "spill" method gets called on an otherwise dead instance of an InternalSortedBag. That way, it would be possible that two different InternalSortedBag instances (one dead, one alive) are trying to use the same POSort.SortComparator instance at the same time. > Race Conditions in POSort and (Internal)SortedBag during Proactive Spill. > ------------------------------------------------------------------------- > > Key: PIG-3212 > URL: https://issues.apache.org/jira/browse/PIG-3212 > Project: Pig > Issue Type: Bug > Affects Versions: 0.11 > Reporter: Kai Londenberg > Priority: Critical > Fix For: 0.12, 0.11.1 > > Attachments: PIG-3212-p1.patch > > > The following bug exists in the latest release of Pig 0.11.0 > While running some large jobs involving groups and sorts like these: > {code} > events_by_user = GROUP events BY user_id; > sorted_events_by_user = FOREACH events_by_user { > A = ORDER events BY ts, split_idx, line_num; > GENERATE group, A; > } > {code} > I got a pretty strange behaviour: While this worked on small datasets, if I > ran it on large datasets, the results were sometimes not sorted perfectly. > So after a long debugging session, I tracked it down to at least one race > condition: > The following partial stack trace shows how a proactive spill gets triggered > on an InternalSortedBag. A spill in turn triggers a sort of that > InternalSortedBag. > {code} > at > org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83) > at > org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455) > at > org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243) > at > sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138) > at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171) > at > sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272) > at sun.management.Sensor.trigger(Sensor.java:120) > {code} > At the same time, the same InternalSortedBag might be sorted or accessed > within a POSort Operation. For example using the following Code path (line > numbers might be off, I had to add debug statements to diagnose this) > {code} > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > at org.apache.hadoop.mapred.Child.main(Child.java:170) > {code} > The key here is: Both operations try to compare and modify elements of the > SortedBag simultaneously. This leads to all kinds of problems, most notably > incorrectly sorted data. > POSort.SortComparator that's passed as a Comparison function to > (Internal)SortedBag is not thread safe, since it works by attaching single > input tuples to PhysicalOperator's - these Operators in turn are part of the > POSort.sortPlans and are re-used among each thread accessing the > (Internal)SortedBag. > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira