Hi Samza devs and users
I wrote customized Samza S3 consumer which downloads files from S3 and put
messages in BlockedEnvelopeMap. It was straightforward because there's a
nice example, filereader. I tried to a little optimize with
newBlockingQueue() method because I guess that single queue shared could be
fine because Samza container is single threaded. I added the following code:
public S3Consumer(String systemName, Config config, MetricsRegistry
registry) {
queueSize = config.getInt("systems." + systemName + ".queue.size",
10000);
bucket = config.get("systems." + systemName + ".bucket");
prefix = config.get("systems." + systemName + ".prefix");
queue = new LinkedBlockingQueue<>(queueSize);
recordCounter = registry.newCounter(this.getClass().getName(),
"processed_records");
}
@Override
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
return queue; // single queue
}
Unfortunately, I observed significant message loss with this
implementation. I suspected its queue might have dropped messages, so I
changed newBlockingQueue() implementation same as filereader.
@Override
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
return new LinkedBlockingQueue<>(queueSize);
}
Then, message loss didn't happen again.
Do you have any idea why it went wrong?
Thank you
Best, Jae