[ 
https://issues.apache.org/jira/browse/IGNITE-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566271#comment-15566271
 ] 

Ivan Veselovsky commented on IGNITE-4037:
-----------------------------------------

Key classes that implement spilling functionality in Hadoop on Map and Reduce 
sides (version 2.7.2): 
{code}
org.apache.hadoop.mapred.MapTask
org.apache.hadoop.mapreduce.Partitioner
org.apache.hadoop.mapred.MapTask#runNewMapper

org.apache.hadoop.mapred.MapTask.NewOutputCollector
org.apache.hadoop.mapreduce.MRJobConfig#MAP_OUTPUT_COLLECTOR_CLASS_ATTR
org.apache.hadoop.mapred.MapTask.MapOutputBuffer -- default collector 
implementation.

org.apache.hadoop.util.IndexedSorter -- sorting interface.

org.apache.hadoop.mapred.MapTask.MapOutputBuffer#spillThread, 
org.apache.hadoop.mapred.MapTask.MapOutputBuffer.SpillThread
org.apache.hadoop.mapred.MapTask.MapOutputBuffer#sortAndSpill

org.apache.hadoop.mapred.SpillRecord
org.apache.hadoop.mapreduce.task.reduce.Shuffle

org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput
org.apache.hadoop.mapreduce.task.reduce.Fetcher

pull: org.apache.hadoop.mapreduce.task.reduce.Fetcher#copyFromHost

org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler, 
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl

org.apache.hadoop.mapreduce.task.reduce.Fetcher#copyMapOutput
org.apache.hadoop.mapreduce.task.reduce.Fetcher#copyFromHost
{code}

In Ignite we have 
{code}org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage{code}
 to transfer data between nodes.
The data from these messages are sent from and stored into 
{code}org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimapBase{code}-based
 structures. 
Base classes involve:
{code}
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap

org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob#msgs

send:    
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob#collectUpdatesAndSend
receive: 
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob#onShuffleMessage
 
{code}

As a simplest solution (way of minimal changes) we can try to implement disk 
spilling transparently, preserving  the interface of 
{code}org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimapBase{code}
 , but spill/read in the background, behind the scenes.  
The configuration should be similar to that Hadoop has: (1) max memory buffer 
size , and (2) percent when the buffer spilled to disk, typically 80% by 
default.
This should be similarly implemented on both Maps and Reduces sides. 


> High memory consumption when executing TeraSort Hadoop example
> --------------------------------------------------------------
>
>                 Key: IGNITE-4037
>                 URL: https://issues.apache.org/jira/browse/IGNITE-4037
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 1.6
>            Reporter: Ivan Veselovsky
>            Assignee: Ivan Veselovsky
>             Fix For: 1.7
>
>
> When executing TeraSort Hadoop example, we observe high memory consumption 
> that frequently leads to cluster malfunction.
> The problem can be reproduced in unit test, even with 1 node, and with not 
> huge input data set as 100Mb. 
> Dump analysis shows that  memory is taken in various queues: 
> org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService#queue
>  
> and 
> task queue of 
> org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker#evtProcSvc
>   .
> Since objects stored in these queues hold byte arrays of significant size, 
> memory if consumed very fast.
> It looks like real cause of the problem is that some tasks are blocked.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to