Hi Jae, I think the messages are not "lost", instead, they all go to one partition, in your "shared queue" implementation.
If you check the code in BlockingEnvelopeMap line 123 <https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123> , it puts all the messages in the queue in one partition. Thanks, Fang, Yan yanfang...@gmail.com On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote: > Hi Samza devs and users > > I wrote customized Samza S3 consumer which downloads files from S3 and put > messages in BlockedEnvelopeMap. It was straightforward because there's a > nice example, filereader. I tried to a little optimize with > newBlockingQueue() method because I guess that single queue shared could be > fine because Samza container is single threaded. I added the following > code: > > > public S3Consumer(String systemName, Config config, MetricsRegistry > registry) { > queueSize = config.getInt("systems." + systemName + ".queue.size", > 10000); > bucket = config.get("systems." + systemName + ".bucket"); > prefix = config.get("systems." + systemName + ".prefix"); > > queue = new LinkedBlockingQueue<>(queueSize); > > recordCounter = registry.newCounter(this.getClass().getName(), > "processed_records"); > } > > @Override > protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { > return queue; // single queue > } > > Unfortunately, I observed significant message loss with this > implementation. I suspected its queue might have dropped messages, so I > changed newBlockingQueue() implementation same as filereader. > > @Override > protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { > return new LinkedBlockingQueue<>(queueSize); > } > > Then, message loss didn't happen again. > > Do you have any idea why it went wrong? > > Thank you > Best, Jae >