[ 
https://issues.apache.org/jira/browse/STORM-1006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14993547#comment-14993547
 ] 

Sachin Pasalkar commented on STORM-1006:
----------------------------------------

Now we are no more using CoordinatedBolt, but I didn't understand why any user 
should care about setting up the unique id. Shouldn't it be take care by 
system. Don't you think get(index) is very risky call?

> Storm is not garbage collecting the messages (causing memory hit)
> -----------------------------------------------------------------
>
>                 Key: STORM-1006
>                 URL: https://issues.apache.org/jira/browse/STORM-1006
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka
>    Affects Versions: 0.9.3
>            Reporter: Sachin Pasalkar
>
> We are reading whole file in memory around 5 MB, which is send through Kafaka 
> to Storm. In next bolt, we performs the operation on file and sends out tuple 
> to next bolt. After profiling we found that file (bytes of file) does not get 
> garbage collected. So after further investigation we found that  
> backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
>  Collection<Tuple>, List<Object>) API gets the first object and use it for 
> tracking :(. Can you confirm reason behind this? Is there any way we can send 
> different unique id as first element in list or the unique id of tuple used 
> as indicator.
> However, for time being we have made changes in schema assigned to 
> KafkaSpout, so that it will parse the file and send out list of values.
> If you below code CoordinatedBolt, "Object id = tuple.getValue(0);” takes the 
> 1st element from tuple instead of taking id of tuple. This "id" is then saved 
> to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
> data. This gets stored in the _tracked map till tree of tuple doesn’t get 
> complete. As we are processing huge data we run outofMemory issue.
> Code:
> public void execute(Tuple tuple) {
>         *Object id = tuple.getValue(0);*
>         TrackingInfo track;
>         TupleType type = getTupleType(tuple);
>         synchronized(_tracked) {
>             track = _tracked.get(id);
>             if(track==null) {
>                 track = new TrackingInfo();
>                 if(_idStreamSpec==null) track.receivedId = true;
>                 _tracked.put(id, track);*
>             }
>         }
>         if(type==TupleType.ID) {
>             synchronized(_tracked) {
>                 track.receivedId = true;
>             }
>             checkFinishId(tuple, type);
>         } else if(type==TupleType.COORD) {
>             int count = (Integer) tuple.getValue(1);
>             synchronized(_tracked) {
>                 track.reportCount++;
>                 track.expectedTupleCount+=count;
>             }
>             checkFinishId(tuple, type);
>         } else {
>             synchronized(_tracked) {
>                 _delegate.execute(tuple);
>             }
>         }
>     }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to