[
https://issues.apache.org/jira/browse/PIG-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14028742#comment-14028742
]
Philip (flip) Kromer commented on PIG-3979:
-------------------------------------------
This is great. Using partial aggregation with TOP was causing constant spills
and terrible performance; this patch reduced the runtime by a factor of 10. I'd
prefer that most of the log messages be at DEBUG priority, but in that case it
was very helpful.
However, I also have some questions about whether there's more going on than
the patch currently addresses -- there are still pathological cases that cause
the SpillableMemoryManager to enter seppuku mode.
With a large heap size set, the initial FIRST_TIER_THRESHOLD of 20,000 is hit
way before the sample size is, and so it never actually adjusts that threshold.
Is that behavior desirable, or should the code still include the test for
(numRecsInRawMap >= NUM_RECS_TO_SAMPLE) as follows?
{code}
if (!sizeReductionChecked && ((sampleSize >= sampleMem) ||
(numRecsInRawMap >= NUM_RECS_TO_SAMPLE))) {
checkSizeReduction();
sampleSize = 0;
}
if (!estimatedMemThresholds && ((sampleSize >= sampleMem) ||
(numRecsInRawMap >= NUM_RECS_TO_SAMPLE))) {
estimateMemThresholds();
}
// ...
{code}
As long as estimateMemThresholds() hasn't been called, avgTupleSize has its
initial value of zero -- meaning that getMemorySize() returns zero as well. If
I'm reading this correctly, with large heap sizes and this patch
getMemorySize() will always return zero.
Lastly, I'm concerned there's still an interaction between POPartialAgg and
SpillableMemoryManager this doesn't address. I'm not deeply familiar with
what's going on, so read my questions as "I don't understand why X is" rather
than "I don't think X should be".
With a large JVM heap size, when the POPartialAgg does actually get to a
certain size the SpillableMemoryManager goes into a gc hell of its own
creation. Here's a taste:
{code}
2014-06-11 21:07:23,248 4974 INFO o.a.p.b.h.e.m.PigGenericMapReduce$Map |
Aliases being processed per job phase (AliasName[line,offset]): M:
events[36,9],events[-1,-1],events_final_event_1[59,23],1-3[59,32] C:
events_final_event_1[59,23],1-3[59,32] R: events_final_event_1[59,23]
2014-06-11 21:07:24,432 6158 INFO o.a.p.b.h.e.p.r.POPartialAgg |
After reduction, processed map: 126; raw map: 0
2014-06-11 21:07:24,433 6159 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Observed reduction factor: from 10000 to 126 => 79.
2014-06-11 21:07:24,714 6440 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Getting mem limits; considering 1 POPArtialAgg objects.
2014-06-11 21:07:24,721 6447 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Estimated total tuples to buffer, based on 10000 tuples that took up 14954552
bytes: 191715
2014-06-11 21:07:24,721 6447 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Setting thresholds. Primary: 189288. Secondary: 2427
2014-06-11 21:07:24,827 6553 INFO o.a.p.i.u.SpillableMemoryManager |
first memory handler call- Usage threshold init = 5505024(5376K) used =
795475864(776831K) committed = 795607040(776960K) max = 1048576000(1024000K)
2014-06-11 21:07:24,832 6558 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Spill triggered by SpillableMemoryManager
2014-06-11 21:07:24,832 6558 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating raw inputs. 13545 tuples.
2014-06-11 21:07:24,832 6558 INFO o.a.p.i.u.SpillableMemoryManager |
Spilled an estimate of 20423195 bytes from 1 objects. init = 5505024(5376K)
used = 795475864(776831K) committed = 795607040(776960K) max =
1048576000(1024000K)
2014-06-11 21:07:24,852 6578 INFO o.a.p.b.h.e.p.r.POPartialAgg |
processed inputs: 302 tuples.
2014-06-11 21:07:24,852 6578 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating processed inputs. 302 tuples.
2014-06-11 21:07:24,855 6581 INFO o.a.p.b.h.e.p.r.POPartialAgg |
processed inputs: 301 tuples.
2014-06-11 21:07:24,928 6654 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In spillResults(), processed map is empty -- done spilling.
2014-06-11 21:07:30,052 11778 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Aggregating 189289 raw records at first level.
2014-06-11 21:07:32,416 14142 INFO o.a.p.i.u.SpillableMemoryManager |
first memory handler call - Collection threshold init = 5505024(5376K) used =
645973328(630833K) committed = 795607040(776960K) max = 1048576000(1024000K)
2014-06-11 21:07:32,418 14144 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Spill triggered by SpillableMemoryManager
2014-06-11 21:07:32,418 14144 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating raw inputs. 84343 tuples.
2014-06-11 21:07:32,418 14144 INFO o.a.p.i.u.SpillableMemoryManager |
hard invoking GC: accumulatedFreeSize 150120425 gcActivationSize 40000000
estimatedFreed 129697230 toFree: 383829328.
2014-06-11 21:07:33,049 14775 INFO o.a.p.i.u.SpillableMemoryManager |
Spilled an estimate of 129697230 bytes from 1 objects. init = 5505024(5376K)
used = 645973328(630833K) committed = 795607040(776960K) max =
1048576000(1024000K)
2014-06-11 21:07:33,051 14777 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Spill triggered by SpillableMemoryManager
...
... (Repeats several hundred times, causing a GC on each)
... (Three minutes later, it becomes able to carry on...)
...
2014-06-11 21:08:58,423 100149 INFO o.a.p.b.h.e.p.r.POPartialAgg
| Spill triggered by SpillableMemoryManager
2014-06-11 21:08:58,423 100149 INFO o.a.p.i.u.SpillableMemoryManager
| hard invoking GC: accumulatedFreeSize 129706200 gcActivationSize 40000000
estimatedFreed 129706200 toFree: 262180472.
2014-06-11 21:08:58,697 100423 INFO o.a.p.i.u.SpillableMemoryManager
| Spilled an estimate of 129706200 bytes from 1 objects. init = 5505024(5376K)
used = 524324472(512035K) committed = 1048576000(1024000K) max =
1048576000(1024000K)
2014-06-11 21:08:58,761 100487 INFO o.a.p.b.h.e.p.r.POPartialAgg
| processed inputs: 3481 tuples.
2014-06-11 21:08:58,762 100488 INFO o.a.p.b.h.e.p.r.POPartialAgg
| In startSpill(), aggregating processed inputs. 3481 tuples.
2014-06-11 21:08:58,789 100515 INFO o.a.p.b.h.e.p.r.POPartialAgg
| processed inputs: 3480 tuples.
2014-06-11 21:08:58,916 100642 INFO o.a.p.b.h.e.p.r.POPartialAgg
| In spillResults(), processed map is empty -- done spilling.
2014-06-11 21:09:02,949 104675 INFO o.a.p.b.h.e.p.r.POPartialAgg
| Aggregating 189289 raw records at first level.
2014-06-11 21:09:05,812 107538 INFO o.a.p.b.h.e.p.r.POPartialAgg
| Spill triggered by SpillableMemoryManager
2014-06-11 21:09:05,813 107539 INFO o.a.p.b.h.e.p.r.POPartialAgg
| In startSpill(), aggregating raw inputs. 120387 tuples.
2014-06-11 21:09:05,813 107539 INFO o.a.p.i.u.SpillableMemoryManager
| hard invoking GC: accumulatedFreeSize 183538160 gcActivationSize 40000000
estimatedFreed 183538160 toFree: 404073608.
2014-06-11 21:09:06,717 108443 INFO o.a.p.i.u.SpillableMemoryManager
| Spilled an estimate of 183538160 bytes from 1 objects. init = 5505024(5376K)
used = 771075208(753003K) committed = 1048576000(1024000K) max =
1048576000(1024000K)
2014-06-11 21:09:06,717 108443 INFO o.a.p.b.h.e.p.r.POPartialAgg
| Spill triggered by SpillableMemoryManager
2014-06-11 21:09:06,717 108443 INFO o.a.p.i.u.SpillableMemoryManager
| hard invoking GC: accumulatedFreeSize 183571050 gcActivationSize 40000000
estimatedFreed 183571050 toFree: 353267104.
2014-06-11 21:09:07,422 109148 INFO o.a.p.i.u.SpillableMemoryManager
| Spilled an estimate of 183571050 bytes from 1 objects. init = 5505024(5376K)
used = 615411104(600987K) committed = 1048576000(1024000K) max =
1048576000(1024000K)
...
... (and so forth)
...
{code}
There are two thresholds -- the user adjustable pig.spill.gc.activation.size
and the hard-coded extraGCThresholdFraction -- that cause
SpillableMemoryManager to force a hard GC.
Somehow or another in a way that SpillableMemoryManager doesn't expect
POPartialAgg either exceeds those thresholds or enters the sort order of
spillables or something.
When I disable the GC thresholds (by setting pig.spill.gc.activation.size and
by manually changing extraGCThresholdFraction in the code each to a large
value),
the job finishes in under a minute:
{code}
2014-06-11 20:53:47,951 5265 INFO o.a.p.b.h.e.m.PigGenericMapReduce$Map |
Aliases being processed per job phase (AliasName[line,offset]): M:
events[36,9],events[-1,-1],events_final_event_1[59,23],1-3[59,32] C:
events_final_event_1[59,23],1-3[59,32] R: events_final_event_1[59,23]
2014-06-11 20:53:49,057 6371 INFO o.a.p.b.h.e.p.r.POPartialAgg |
After reduction, processed map: 126; raw map: 0
2014-06-11 20:53:49,058 6372 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Observed reduction factor: from 10000 to 126 => 79.
2014-06-11 20:53:49,256 6570 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Getting mem limits; considering 1 POPArtialAgg objects.
2014-06-11 20:53:49,266 6580 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Estimated total tuples to buffer, based on 10000 tuples that took up 14954552
bytes: 191715
2014-06-11 20:53:49,266 6580 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Setting thresholds. Primary: 189288. Secondary: 2427
2014-06-11 20:53:53,835 11149 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Aggregating 189289 raw records at first level.
2014-06-11 20:53:54,023 11337 INFO o.a.p.i.u.SpillableMemoryManager |
first memory handler call - Collection threshold init = 5505024(5376K) used =
661747360(646237K) committed = 795602944(776956K) max = 1048576000(1024000K)
2014-06-11 20:53:54,152 11466 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Spill triggered by SpillableMemoryManager
2014-06-11 20:53:54,153 11467 INFO o.a.p.i.u.SpillableMemoryManager |
Spilled an estimate of 283175425 bytes from 1 objects. init = 5505024(5376K)
used = 661747360(646237K) committed = 795602944(776956K) max =
1048576000(1024000K)
2014-06-11 20:53:54,391 11705 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Aggregating 2543 secondary records to be combined.
2014-06-11 20:53:54,435 11749 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Aggregating 2542 secondary records to be combined.
2014-06-11 20:53:54,435 11749 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Starting spill.
2014-06-11 20:53:54,435 11749 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating processed inputs. 2542 tuples.
2014-06-11 20:53:54,466 11780 INFO o.a.p.b.h.e.p.r.POPartialAgg |
processed inputs: 2542 tuples.
2014-06-11 20:53:54,699 12013 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In spillResults(), processed map is empty -- done spilling.
...
... about a dozen spills in all
...
2014-06-11 20:54:37,085 54399 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating raw inputs. 129522 tuples.
2014-06-11 20:54:37,086 54400 INFO o.a.p.i.u.SpillableMemoryManager |
Spilled an estimate of 193614460 bytes from 1 objects. init = 5505024(5376K)
used = 568816808(555485K) committed = 1048576000(1024000K) max =
1048576000(1024000K)
2014-06-11 20:54:37,244 54558 INFO o.a.p.b.h.e.p.r.POPartialAgg |
processed inputs: 1616 tuples.
2014-06-11 20:54:37,245 54559 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating processed inputs. 1616 tuples.
2014-06-11 20:54:37,251 54565 INFO o.a.p.b.h.e.p.r.POPartialAgg |
processed inputs: 1616 tuples.
2014-06-11 20:54:37,260 54574 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In spillResults(), processed map is empty -- done spilling.
2014-06-11 20:54:41,159 58473 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating raw inputs. 180461 tuples.
2014-06-11 20:54:41,486 58800 INFO o.a.p.b.h.e.p.r.POPartialAgg |
processed inputs: 2270 tuples.
2014-06-11 20:54:41,486 58800 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In startSpill(), aggregating processed inputs. 2270 tuples.
2014-06-11 20:54:41,502 58816 INFO o.a.p.b.h.e.p.r.POPartialAgg |
processed inputs: 2270 tuples.
2014-06-11 20:54:41,502 58816 INFO o.a.p.b.h.e.p.r.POPartialAgg |
Spilling last bits.
2014-06-11 20:54:41,523 58837 INFO o.a.p.b.h.e.p.r.POPartialAgg |
In spillResults(), processed map is empty -- done spilling.
2014-06-11 20:54:41,528 58842 INFO o.a.h.m.MapTask |
Starting flush of map output
2014-06-11 20:54:41,528 58842 INFO o.a.h.m.MapTask |
Spilling map output
2014-06-11 20:54:41,528 58842 INFO o.a.h.m.MapTask |
bufstart = 0; bufend = 4240460; bufvoid = 471859200
2014-06-11 20:54:41,528 58842 INFO o.a.h.m.MapTask |
kvstart = 117964796(471859184); kvend = 117861448(471445792); length =
103349/29491200
2014-06-11 20:54:41,644 58958 INFO o.a.h.i.c.CodecPool |
Got brand-new compressor [.deflate]
2014-06-11 20:54:41,689 59003 INFO o.a.p.b.h.e.m.PigCombiner$Combine |
Aliases being processed per job phase (AliasName[line,offset]): M:
events[36,9],events[-1,-1],events_final_event_1[59,23],1-3[59,32] C:
events_final_event_1[59,23],1-3[59,32] R: events_final_event_1[59,23]
2014-06-11 20:54:43,913 61227 INFO o.a.h.m.MapTask |
Finished spill 0
2014-06-11 20:54:43,917 61231 INFO o.a.h.m.Task |
Task:attempt_1402538004132_0001_m_000000_0 is done. And is in the process of
committing
2014-06-11 20:54:43,922 61236 INFO o.a.h.m.Task |
Task 'attempt_1402538004132_0001_m_000000_0' done.
2014-06-11 20:54:44,023 61337 INFO o.a.h.m.i.MetricsSystemImpl |
Stopping MapTask metrics system...
2014-06-11 20:54:44,023 61337 INFO o.a.h.m.i.MetricsSystemImpl |
MapTask metrics system stopped.
2014-06-11 20:54:44,023 61337 INFO o.a.h.m.i.MetricsSystemImpl |
MapTask metrics system shutdown complete.
{code}
* POPartialAgg's spill() method is lazy -- it only sets the doSpill flag for a
later iteration. Since the SpillableMemoryManager action is synchronized, I
think there's no way for the spill to actually take place. If I understand this
right, the invokeGC can't ever help with a POPartialAgg spillable, as no memory
could possibly have been freed.
* Why is SpillableMemoryManager looping the way it is in the pathological case?
* Manually forcing a GC seems brutish. How confident are folks that this helps
compared to proper JVM gc tuning?
> group all performance, garbage collection, and incremental aggregation
> ----------------------------------------------------------------------
>
> Key: PIG-3979
> URL: https://issues.apache.org/jira/browse/PIG-3979
> Project: Pig
> Issue Type: Improvement
> Components: impl
> Affects Versions: 0.12.0, 0.11.1
> Reporter: David Dreyfus
> Assignee: David Dreyfus
> Fix For: 0.14.0
>
> Attachments: PIG-3979-v1.patch, POPartialAgg.java
>
>
> I have a PIG statement similar to:
> summary = foreach (group data ALL) generate
> COUNT(data.col1), SUM(data.col2), SUM(data.col2)
> , Moments(col3)
> , Moments(data.col4)
> There are a couple of hundred columns.
> I set the following:
> SET pig.exec.mapPartAgg true;
> SET pig.exec.mapPartAgg.minReduction 3;
> SET pig.cachedbag.memusage 0.05;
> I found that when I ran this on a JVM with insufficient memory, the process
> eventually timed out because of an infinite garbage collection loop.
> The problem was invariant to the memusage setting.
> I solved the problem by making changes to:
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperator.POPartialAgg.java
> Rather than reading in 10000 records to establish an estimate of the
> reduction, I make an estimate after reading in enough tuples to fill
> pig.cachedbag.memusage percent of Runtime.getRuntime().maxMemory().
> I also made a change to guarantee at least one record allowed in second tier
> storage. In the current implementation, if the reduction is very high 1000:1,
> space in second tier storage is zero.
> With these changes, I can summarize large data sets with small JVMs. I also
> find that setting pig.cachedbag.memusage to a small number such as 0.05
> results in much better garbage collection performance without reducing
> throughput. I suppose tuning GC would also solve a problem with excessive
> garbage collection.
> The performance is sweet.
--
This message was sent by Atlassian JIRA
(v6.2#6252)