[ 
https://issues.apache.org/jira/browse/TEZ-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Balamohan updated TEZ-1923:
----------------------------------
    Attachment: TEZ-1923.2.patch

>>
Rajesh, I believe this will be triggered If there's parallel chunks being 
fetched without enough completing to hit the current merge condition ?
>>

Yes, this is very easily reproduced with mem-to-mem merging (even though such 
tight loops are possible without mem-to-mem merging).

Agreed that the initial patch can end up triggering more spills to disk.  
Uploading refined patch to address the following
1. Fetchers would wait instead of getting into tight loop.
2. IntermediateMemoryToMemoryMerger would start merging only when there is 
enough memory available.
3. When mem-to-mem merging is enabled, it would additionally check for 
(usedMemory > memoryLimit).  If so, it would kick off mem-to-disk merging to 
release the memory pressure and to avoid fetchers indefinitely waiting.

[~hitesh] - I can backport to 0.5.4 after review.

> FetcherOrderedGrouped gets into infinite loop due to memory pressure
> --------------------------------------------------------------------
>
>                 Key: TEZ-1923
>                 URL: https://issues.apache.org/jira/browse/TEZ-1923
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-1923.1.patch, TEZ-1923.2.patch
>
>
> - Ran a comparatively large job (temp table creation) at 10 TB scale.
> - Turned on intermediate mem-to-mem 
> (tez.runtime.shuffle.memory-to-memory.enable=true and 
> tez.runtime.shuffle.memory-to-memory.segments=4)
> - Some reducers get lots of data and quickly gets into infinite loop
> {code}
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 3ms
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 1ms
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 2ms
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 5ms
> 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> {code}
> Additional debug/patch statements revealed that InMemoryMerge is not invoked 
> appropriately and not releasing the memory back for fetchers to proceed. e.g 
> debug/patch messages are given below
> {code}
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:48,332 INFO 
> [fetcher [Map_1] #2] orderedgrouped.MergeManager: 
> Patch..usedMemory=1551867234, memoryLimit=1073741824, commitMemory=883028388, 
> mergeThreshold=708669632  <<=== InMemoryMerge would be started in this case 
> as commitMemory >= mergeThreshold
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:52,900 INFO 
> [fetcher [Map_1] #2] orderedgrouped.MergeManager: 
> Patch..usedMemory=1273349784, memoryLimit=1073741824, commitMemory=347296632, 
> mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this 
> case as commitMemory < mergeThreshold.  But the usedMemory is higher than 
> memoryLimit.  Fetchers would keep waiting indefinitely until memory is 
> released. InMemoryMerge will not kick in and not release memory.
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:53,163 INFO 
> [fetcher [Map_1] #1] orderedgrouped.MergeManager: 
> Patch..usedMemory=1191994052, memoryLimit=1073741824, commitMemory=523155206, 
> mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this 
> case as commitMemory < mergeThreshold.  But the usedMemory is higher than 
> memoryLimit.  Fetchers would keep waiting indefinitely until memory is 
> released.  InMemoryMerge will not kick in and not release memory.
> {code}
> In MergeManager, in memory merging is invoked under the following condition
> {code}
> if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold)
> {code}
> Attaching the sample hive command just for reference
> {code}
> $HIVE_HOME/bin/hive -hiveconf tez.runtime.io.sort.factor=200 --hiveconf 
> hive.tez.auto.reducer.parallelism=false --hiveconf 
> tez.am.heartbeat.interval-ms.max=20 --hiveconf tez.runtime.io.sort.mb=1200 
> --hiveconf tez.runtime.sort.threads=2 --hiveconf hive.tez.container.size=4096 
> --hiveconf tez.runtime.shuffle.memory-to-memory.enable=true --hiveconf 
> tez.runtime.shuffle.memory-to-memory.segments=4
> create table testData as select 
> ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_quantity,ss_sold_date
>  from store_sales distribute by ss_sold_date;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to