In prototyping an application, I use a simple pojo (see below) to send data
between bolts. In an earlier version of my bolt, I would simply get the
pojo using tuple.getValue() inside the execute() function, like:

SimplePojo pojo = (SimplePojo) tuple.getValue(0);

But when I scale up and have many executors running on the same worker, I
run into a data issue that the pojo would have different values within
execute() function. Upon closer look, it seems like when the same worker
ran many of the two different bolts, it would do something like:

t=1: Run Bolt_A with pojo_1
t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2.

In my topology, Bolt_B runs substantially longer. At t=2, when I printed
out the values of pojo at the beginning of Bolt_B's execute(), it had
values in pojo_1. Then before the end of Bolt_B's execute() when I printed
values of pojo again, it had values in pojo_2. In other words, it is like
the pojo pointed to different SimplePojo within the execute() function.

So I ended up putting a copy constructor in SimplePojo and create a new
SimplePojo inside execute() function, like:

SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0));

After that I no longer encounter the data issue above. So my question is...
should I always make a copy of the data in Tuple in bolt?

Thanks,
B



public class SimplePojo implements KryoSerializable {
 private long id1;
private long id2;
private byte[] bin1;
private byte[] bin2;

public SimplePojo(SimplePojo source) {
id1 = source.getId1();
id2 = source.getId2();
bin1 = source.getBin1();
bin2 = source.getBin2();
}

//getters
public long getId1() {
return id1;
}

public long getId2() {
return id2;
}

public byte[] getBin1() {
return bin1;
}

public byte[] getBin2() {
return bin2;
}

...

@Override
public void read(Kryo kryo, Input input) {
id1 = input.readLong();
id2 = input.readLong();

int bin1Len = input.readInt();
bin1 = input.readBytes(bin1Len); //Line 156

int bin2Len = input.readInt();
bin2 = input.readBytes(bin2Len);
}

@Override
public void write(Kryo kryo, Output output) {
output.writeLong(id1);
output.writeLong(id2);

output.writeInt(bin1.length);
output.write(bin1);

output.writeInt(bin2.length);
output.write(bin2);
}
}

Reply via email to