[ 
https://issues.apache.org/jira/browse/TEZ-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14269939#comment-14269939
 ] 

Siddharth Seth commented on TEZ-1923:
-------------------------------------

{code}
+      if (usedMemory > memoryLimit) {
+        LOG.info("Starting inMemoryMerger's merge since usedMemory=" +
+            memoryLimit + " > memoryLimit=" + memoryLimit +
+            ". commitMemory=" + commitMemory + ", mergeThreshold=" + 
mergeThreshold);
+        startMemToDiskMerge();
+      }
{code}
This will, at best, attempt to start the memToDiskMerger - there's no guarantee 
that it'll actually run since one may already be in progress. It ends up not 
waiting for the MemToMemMerger to complete - which would free up some memory - 
and potentially trigger another merge based on thresholds. The usedMemory at 
this point will be determined by a race between the current thread and the 
memtomemmerge thread (whether the unconditional reserve has been done yet or 
not).  Meanwhile, Fetchers block in any case - since memory isn't available. I 
think it's better to leave this section of the patch out - to be fixed in the 
MemToMem merger jiras.

{code}
+      if ((usedMemory + mergeOutputSize) > memoryLimit) {
+        LOG.info("Not enough memory to carry out mem-to-mem merging. 
usedMemory=" + usedMemory +
+            " > memoryLimit=" + memoryLimit);
+        return;
+      }
{code}
usedMemory may not be visible correctly - since it isn't inside the main 
MergeManager lock. This could also be part of the MemToMemMerger fixes.

{code}merger.waitForShuffleToMergeMemory();{code}
Would this be a problem in terms of connection timeouts - since this wait is 
while the connection is established. IThis could be in the run() method similar 
to merger.waitForInMemoryMerge() instead.

> FetcherOrderedGrouped gets into infinite loop due to memory pressure
> --------------------------------------------------------------------
>
>                 Key: TEZ-1923
>                 URL: https://issues.apache.org/jira/browse/TEZ-1923
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-1923.1.patch, TEZ-1923.2.patch
>
>
> - Ran a comparatively large job (temp table creation) at 10 TB scale.
> - Turned on intermediate mem-to-mem 
> (tez.runtime.shuffle.memory-to-memory.enable=true and 
> tez.runtime.shuffle.memory-to-memory.segments=4)
> - Some reducers get lots of data and quickly gets into infinite loop
> {code}
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 3ms
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 1ms
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 2ms
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] 
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 5ms
> 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for 
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
>  sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] 
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned 
> Status.WAIT ...
> {code}
> Additional debug/patch statements revealed that InMemoryMerge is not invoked 
> appropriately and not releasing the memory back for fetchers to proceed. e.g 
> debug/patch messages are given below
> {code}
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:48,332 INFO 
> [fetcher [Map_1] #2] orderedgrouped.MergeManager: 
> Patch..usedMemory=1551867234, memoryLimit=1073741824, commitMemory=883028388, 
> mergeThreshold=708669632  <<=== InMemoryMerge would be started in this case 
> as commitMemory >= mergeThreshold
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:52,900 INFO 
> [fetcher [Map_1] #2] orderedgrouped.MergeManager: 
> Patch..usedMemory=1273349784, memoryLimit=1073741824, commitMemory=347296632, 
> mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this 
> case as commitMemory < mergeThreshold.  But the usedMemory is higher than 
> memoryLimit.  Fetchers would keep waiting indefinitely until memory is 
> released. InMemoryMerge will not kick in and not release memory.
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:53,163 INFO 
> [fetcher [Map_1] #1] orderedgrouped.MergeManager: 
> Patch..usedMemory=1191994052, memoryLimit=1073741824, commitMemory=523155206, 
> mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this 
> case as commitMemory < mergeThreshold.  But the usedMemory is higher than 
> memoryLimit.  Fetchers would keep waiting indefinitely until memory is 
> released.  InMemoryMerge will not kick in and not release memory.
> {code}
> In MergeManager, in memory merging is invoked under the following condition
> {code}
> if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold)
> {code}
> Attaching the sample hive command just for reference
> {code}
> $HIVE_HOME/bin/hive -hiveconf tez.runtime.io.sort.factor=200 --hiveconf 
> hive.tez.auto.reducer.parallelism=false --hiveconf 
> tez.am.heartbeat.interval-ms.max=20 --hiveconf tez.runtime.io.sort.mb=1200 
> --hiveconf tez.runtime.sort.threads=2 --hiveconf hive.tez.container.size=4096 
> --hiveconf tez.runtime.shuffle.memory-to-memory.enable=true --hiveconf 
> tez.runtime.shuffle.memory-to-memory.segments=4
> create table testData as select 
> ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_quantity,ss_sold_date
>  from store_sales distribute by ss_sold_date;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to