In prototyping an application, I use a simple pojo (see below) to send data between bolts. In an earlier version of my bolt, I would simply get the pojo using tuple.getValue() inside the execute() function, like:
SimplePojo pojo = (SimplePojo) tuple.getValue(0); But when I scale up and have many executors running on the same worker, I run into a data issue that the pojo would have different values within execute() function. Upon closer look, it seems like when the same worker ran many of the two different bolts, it would do something like: t=1: Run Bolt_A with pojo_1 t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2. In my topology, Bolt_B runs substantially longer. At t=2, when I printed out the values of pojo at the beginning of Bolt_B's execute(), it had values in pojo_1. Then before the end of Bolt_B's execute() when I printed values of pojo again, it had values in pojo_2. In other words, it is like the pojo pointed to different SimplePojo within the execute() function. So I ended up putting a copy constructor in SimplePojo and create a new SimplePojo inside execute() function, like: SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0)); After that I no longer encounter the data issue above. So my question is... should I always make a copy of the data in Tuple in bolt? Thanks, B public class SimplePojo implements KryoSerializable { private long id1; private long id2; private byte[] bin1; private byte[] bin2; public SimplePojo(SimplePojo source) { id1 = source.getId1(); id2 = source.getId2(); bin1 = source.getBin1(); bin2 = source.getBin2(); } //getters public long getId1() { return id1; } public long getId2() { return id2; } public byte[] getBin1() { return bin1; } public byte[] getBin2() { return bin2; } ... @Override public void read(Kryo kryo, Input input) { id1 = input.readLong(); id2 = input.readLong(); int bin1Len = input.readInt(); bin1 = input.readBytes(bin1Len); //Line 156 int bin2Len = input.readInt(); bin2 = input.readBytes(bin2Len); } @Override public void write(Kryo kryo, Output output) { output.writeLong(id1); output.writeLong(id2); output.writeInt(bin1.length); output.write(bin1); output.writeInt(bin2.length); output.write(bin2); } }