[ 
https://issues.apache.org/jira/browse/STORM-1006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1006:
-----------------------------------
    Description: 
We are reading whole file in memory around 5 MB, which is send through Kafaka 
to Storm. In next bolt, we performs the operation on file and sends out tuple 
to next bolt. After profiling we found that file (bytes of file) does not get 
garbage collected. So after further investigation we found that  
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
 Collection<Tuple>, List<Object>) API gets the first object and use it for 
tracking :(. Can you confirm reason behind this? Is there any way we can send 
different unique id as first element in list or the unique id of tuple used as 
indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, 
so that it will parse the file and send out list of values.

If you below code CoordinatedBolt, "Object id = tuple.getValue(0);” takes the 
1st element from tuple instead of taking id of tuple. This "id" is then saved 
to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
data. This gets stored in the _tracked map till tree of tuple doesn’t get 
complete. As we are processing huge data we run outofMemory issue.

Code:

public void execute(Tuple tuple) {

        *Object id = tuple.getValue(0);*

        TrackingInfo track;

        TupleType type = getTupleType(tuple);

        synchronized(_tracked) {

            track = _tracked.get(id);

            if(track==null) {

                track = new TrackingInfo();

                if(_idStreamSpec==null) track.receivedId = true;

                _tracked.put(id, track);*

            }

        }



        if(type==TupleType.ID) {

            synchronized(_tracked) {

                track.receivedId = true;

            }

            checkFinishId(tuple, type);

        } else if(type==TupleType.COORD) {

            int count = (Integer) tuple.getValue(1);

            synchronized(_tracked) {

                track.reportCount++;

                track.expectedTupleCount+=count;

            }

            checkFinishId(tuple, type);

        } else {

            synchronized(_tracked) {

                _delegate.execute(tuple);

            }

        }

    }




  was:
We are reading whole file in memory around 5 MB, which is send through Kafaka 
to Storm. In next bolt, we performs the operation on file and sends out tuple 
to next bolt. After profiling we found that file (bytes of file) does not get 
garbage collected. So after further investigation we found that  
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
 Collection<Tuple>, List<Object>) API gets the first object and use it for 
tracking :(. Can you confirm reason behind this? Is there any way we can send 
different unique id as first element in list or the unique id of tuple used as 
indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, 
so that it will parse the file and send out list of values.

If you below code CoordinatedBolt, "Object id = tuple.getValue(0);” takes the 
1st element from tuple instead of taking id of tuple. This "id" is then saved 
to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
data. This gets stored in the _tracked map till tree of tuple doesn’t get 
complete. As we are processing huge data we run outofMemory issue.

Code:

public void execute(Tuple tuple) {

        Object id = tuple.getValue(0);

        TrackingInfo track;

        TupleType type = getTupleType(tuple);

        synchronized(_tracked) {

            track = _tracked.get(id);

            if(track==null) {

                track = new TrackingInfo();

                if(_idStreamSpec==null) track.receivedId = true;

                _tracked.put(id, track);

            }

        }



        if(type==TupleType.ID) {

            synchronized(_tracked) {

                track.receivedId = true;

            }

            checkFinishId(tuple, type);

        } else if(type==TupleType.COORD) {

            int count = (Integer) tuple.getValue(1);

            synchronized(_tracked) {

                track.reportCount++;

                track.expectedTupleCount+=count;

            }

            checkFinishId(tuple, type);

        } else {

            synchronized(_tracked) {

                _delegate.execute(tuple);

            }

        }

    }





> Storm is not garbage collecting the messages (causing memory hit)
> -----------------------------------------------------------------
>
>                 Key: STORM-1006
>                 URL: https://issues.apache.org/jira/browse/STORM-1006
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka
>    Affects Versions: 0.9.3
>            Reporter: Sachin Pasalkar
>
> We are reading whole file in memory around 5 MB, which is send through Kafaka 
> to Storm. In next bolt, we performs the operation on file and sends out tuple 
> to next bolt. After profiling we found that file (bytes of file) does not get 
> garbage collected. So after further investigation we found that  
> backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
>  Collection<Tuple>, List<Object>) API gets the first object and use it for 
> tracking :(. Can you confirm reason behind this? Is there any way we can send 
> different unique id as first element in list or the unique id of tuple used 
> as indicator.
> However, for time being we have made changes in schema assigned to 
> KafkaSpout, so that it will parse the file and send out list of values.
> If you below code CoordinatedBolt, "Object id = tuple.getValue(0);” takes the 
> 1st element from tuple instead of taking id of tuple. This "id" is then saved 
> to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
> data. This gets stored in the _tracked map till tree of tuple doesn’t get 
> complete. As we are processing huge data we run outofMemory issue.
> Code:
> public void execute(Tuple tuple) {
>         *Object id = tuple.getValue(0);*
>         TrackingInfo track;
>         TupleType type = getTupleType(tuple);
>         synchronized(_tracked) {
>             track = _tracked.get(id);
>             if(track==null) {
>                 track = new TrackingInfo();
>                 if(_idStreamSpec==null) track.receivedId = true;
>                 _tracked.put(id, track);*
>             }
>         }
>         if(type==TupleType.ID) {
>             synchronized(_tracked) {
>                 track.receivedId = true;
>             }
>             checkFinishId(tuple, type);
>         } else if(type==TupleType.COORD) {
>             int count = (Integer) tuple.getValue(1);
>             synchronized(_tracked) {
>                 track.reportCount++;
>                 track.expectedTupleCount+=count;
>             }
>             checkFinishId(tuple, type);
>         } else {
>             synchronized(_tracked) {
>                 _delegate.execute(tuple);
>             }
>         }
>     }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to