[ https://issues.apache.org/jira/browse/TEZ-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14269939#comment-14269939 ]
Siddharth Seth commented on TEZ-1923: ------------------------------------- {code} + if (usedMemory > memoryLimit) { + LOG.info("Starting inMemoryMerger's merge since usedMemory=" + + memoryLimit + " > memoryLimit=" + memoryLimit + + ". commitMemory=" + commitMemory + ", mergeThreshold=" + mergeThreshold); + startMemToDiskMerge(); + } {code} This will, at best, attempt to start the memToDiskMerger - there's no guarantee that it'll actually run since one may already be in progress. It ends up not waiting for the MemToMemMerger to complete - which would free up some memory - and potentially trigger another merge based on thresholds. The usedMemory at this point will be determined by a race between the current thread and the memtomemmerge thread (whether the unconditional reserve has been done yet or not). Meanwhile, Fetchers block in any case - since memory isn't available. I think it's better to leave this section of the patch out - to be fixed in the MemToMem merger jiras. {code} + if ((usedMemory + mergeOutputSize) > memoryLimit) { + LOG.info("Not enough memory to carry out mem-to-mem merging. usedMemory=" + usedMemory + + " > memoryLimit=" + memoryLimit); + return; + } {code} usedMemory may not be visible correctly - since it isn't inside the main MergeManager lock. This could also be part of the MemToMemMerger fixes. {code}merger.waitForShuffleToMergeMemory();{code} Would this be a problem in terms of connection timeouts - since this wait is while the connection is established. IThis could be in the run() method similar to merger.waitForInMemoryMerge() instead. > FetcherOrderedGrouped gets into infinite loop due to memory pressure > -------------------------------------------------------------------- > > Key: TEZ-1923 > URL: https://issues.apache.org/jira/browse/TEZ-1923 > Project: Apache Tez > Issue Type: Bug > Reporter: Rajesh Balamohan > Assignee: Rajesh Balamohan > Attachments: TEZ-1923.1.patch, TEZ-1923.2.patch > > > - Ran a comparatively large job (temp table creation) at 10 TB scale. > - Turned on intermediate mem-to-mem > (tez.runtime.shuffle.memory-to-memory.enable=true and > tez.runtime.shuffle.memory-to-memory.segments=4) > - Some reducers get lots of data and quickly gets into infinite loop > {code} > 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] > orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned > Status.WAIT ... > 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] > orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 3ms > 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for > url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true > sent hash and receievd reply 0 ms > 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] > orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned > Status.WAIT ... > 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] > orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 1ms > 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for > url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true > sent hash and receievd reply 0 ms > 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] > orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned > Status.WAIT ... > 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] > orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 2ms > 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for > url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true > sent hash and receievd reply 0 ms > 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] > orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned > Status.WAIT ... > 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] > orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 5ms > 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for > url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true > sent hash and receievd reply 0 ms > 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] > orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned > Status.WAIT ... > {code} > Additional debug/patch statements revealed that InMemoryMerge is not invoked > appropriately and not releasing the memory back for fetchers to proceed. e.g > debug/patch messages are given below > {code} > syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:48,332 INFO > [fetcher [Map_1] #2] orderedgrouped.MergeManager: > Patch..usedMemory=1551867234, memoryLimit=1073741824, commitMemory=883028388, > mergeThreshold=708669632 <<=== InMemoryMerge would be started in this case > as commitMemory >= mergeThreshold > syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:52,900 INFO > [fetcher [Map_1] #2] orderedgrouped.MergeManager: > Patch..usedMemory=1273349784, memoryLimit=1073741824, commitMemory=347296632, > mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this > case as commitMemory < mergeThreshold. But the usedMemory is higher than > memoryLimit. Fetchers would keep waiting indefinitely until memory is > released. InMemoryMerge will not kick in and not release memory. > syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:53,163 INFO > [fetcher [Map_1] #1] orderedgrouped.MergeManager: > Patch..usedMemory=1191994052, memoryLimit=1073741824, commitMemory=523155206, > mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this > case as commitMemory < mergeThreshold. But the usedMemory is higher than > memoryLimit. Fetchers would keep waiting indefinitely until memory is > released. InMemoryMerge will not kick in and not release memory. > {code} > In MergeManager, in memory merging is invoked under the following condition > {code} > if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) > {code} > Attaching the sample hive command just for reference > {code} > $HIVE_HOME/bin/hive -hiveconf tez.runtime.io.sort.factor=200 --hiveconf > hive.tez.auto.reducer.parallelism=false --hiveconf > tez.am.heartbeat.interval-ms.max=20 --hiveconf tez.runtime.io.sort.mb=1200 > --hiveconf tez.runtime.sort.threads=2 --hiveconf hive.tez.container.size=4096 > --hiveconf tez.runtime.shuffle.memory-to-memory.enable=true --hiveconf > tez.runtime.shuffle.memory-to-memory.segments=4 > create table testData as select > ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_quantity,ss_sold_date > from store_sales distribute by ss_sold_date; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)