I don’t have a heap dump. Yes but I can point out to code where we see them
being cached.
The below code is from void
backtype<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype>.storm<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm>.coordination<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination>.CoordinatedBolt<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination(CoordinatedBolt.class%E2%98%83CoordinatedBolt>
class. If you see "Object id = tuple.getValue(0);” takes the 1st element from
tuple instead of taking id of tuple. This id is then saved to _tracked
hashhMap. This hashMap is timeCache. In our case the 0th element is files byte
data. This gets stored in the _tracked map till tree of tuple doesn’t get
complete. As we are processing huge data we run outofMemory issue.
public void execute(Tuple tuple) {
Object id = tuple.getValue(0);
TrackingInfo track;
TupleType type = getTupleType(tuple);
synchronized(_tracked) {
track = _tracked.get(id);
if(track==null) {
track = new TrackingInfo();
if(_idStreamSpec==null) track.receivedId = true;
_tracked.put(id, track);
}
}
if(type==TupleType.ID) {
synchronized(_tracked) {
track.receivedId = true;
}
checkFinishId(tuple, type);
} else if(type==TupleType.COORD) {
int count = (Integer) tuple.getValue(1);
synchronized(_tracked) {
track.reportCount++;
track.expectedTupleCount+=count;
}
checkFinishId(tuple, type);
} else {
synchronized(_tracked) {
_delegate.execute(tuple);
}
}
}
Let me know if you want more information from me :)
Thanks,
Sachin
From: Bobby Evans
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>, Bobby Evans
<[email protected]<mailto:[email protected]>>
Date: Monday, 24 August 2015 6:50 pm
To: "[email protected]<mailto:[email protected]>"
<[email protected]<mailto:[email protected]>>
Subject: Re: Getting a big memory hit
Do you have a heap dump or something that shows exactly which data structure
those Tuples are being cached in? In most cases the tuples should just have a
tuple id extracted from it so it can be sent to the acker. Once it is
extracted GC should happen.
- Bobby
On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar
<[email protected]<mailto:[email protected]>> wrote:
Hi,
We are reading whole file in memory around 5 MB, which is send through Kafaka
to Storm. In next bolt, we have a bolt which performs the operation on file and
sends out tuple to next bolt. After profiling we found that file (bytes of
file) does not get garbage collected. So after further investigation we found
that
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
Collection<Tuple>, List<Object>) API gets the first object and use it for
tracking :(. Can you confirm reason behind this? Is there any way we can send
different unique id as first element in list or the unique id of tuple used as
indicator.
However, for time being we have made changes in schema assigned to KafkaSpout,
so that it will parse the file and send out list of values. Can you also
explain why the list approach is used instead of map as we do declare the out
fiels in getOutputFields() API
Thanks,
Sachin