[ https://issues.apache.org/jira/browse/STORM-1006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14993547#comment-14993547 ]
Sachin Pasalkar commented on STORM-1006: ---------------------------------------- Now we are no more using CoordinatedBolt, but I didn't understand why any user should care about setting up the unique id. Shouldn't it be take care by system. Don't you think get(index) is very risky call? > Storm is not garbage collecting the messages (causing memory hit) > ----------------------------------------------------------------- > > Key: STORM-1006 > URL: https://issues.apache.org/jira/browse/STORM-1006 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka > Affects Versions: 0.9.3 > Reporter: Sachin Pasalkar > > We are reading whole file in memory around 5 MB, which is send through Kafaka > to Storm. In next bolt, we performs the operation on file and sends out tuple > to next bolt. After profiling we found that file (bytes of file) does not get > garbage collected. So after further investigation we found that > backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String, > Collection<Tuple>, List<Object>) API gets the first object and use it for > tracking :(. Can you confirm reason behind this? Is there any way we can send > different unique id as first element in list or the unique id of tuple used > as indicator. > However, for time being we have made changes in schema assigned to > KafkaSpout, so that it will parse the file and send out list of values. > If you below code CoordinatedBolt, "Object id = tuple.getValue(0);” takes the > 1st element from tuple instead of taking id of tuple. This "id" is then saved > to _tracked hashhMap(TimeCache). In our case the 0th element is files byte > data. This gets stored in the _tracked map till tree of tuple doesn’t get > complete. As we are processing huge data we run outofMemory issue. > Code: > public void execute(Tuple tuple) { > *Object id = tuple.getValue(0);* > TrackingInfo track; > TupleType type = getTupleType(tuple); > synchronized(_tracked) { > track = _tracked.get(id); > if(track==null) { > track = new TrackingInfo(); > if(_idStreamSpec==null) track.receivedId = true; > _tracked.put(id, track);* > } > } > if(type==TupleType.ID) { > synchronized(_tracked) { > track.receivedId = true; > } > checkFinishId(tuple, type); > } else if(type==TupleType.COORD) { > int count = (Integer) tuple.getValue(1); > synchronized(_tracked) { > track.reportCount++; > track.expectedTupleCount+=count; > } > checkFinishId(tuple, type); > } else { > synchronized(_tracked) { > _delegate.execute(tuple); > } > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)