I expected it should have not lost messages but it did. After I fixed overridden method, it was fixed.
Anyway, thanks a lot for responding. On Fri, Jul 10, 2015 at 2:11 PM, Yan Fang <yanfang...@gmail.com> wrote: > Hi Jae, > > I think the messages are not "lost", instead, they all go to one partition, > in your "shared queue" implementation. > > If you check the code in BlockingEnvelopeMap line 123 > < > https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123 > > > , > it puts all the messages in the queue in one partition. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <metac...@gmail.com> > wrote: > > > Hi Samza devs and users > > > > I wrote customized Samza S3 consumer which downloads files from S3 and > put > > messages in BlockedEnvelopeMap. It was straightforward because there's a > > nice example, filereader. I tried to a little optimize with > > newBlockingQueue() method because I guess that single queue shared could > be > > fine because Samza container is single threaded. I added the following > > code: > > > > > > public S3Consumer(String systemName, Config config, MetricsRegistry > > registry) { > > queueSize = config.getInt("systems." + systemName + > ".queue.size", > > 10000); > > bucket = config.get("systems." + systemName + ".bucket"); > > prefix = config.get("systems." + systemName + ".prefix"); > > > > queue = new LinkedBlockingQueue<>(queueSize); > > > > recordCounter = registry.newCounter(this.getClass().getName(), > > "processed_records"); > > } > > > > @Override > > protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { > > return queue; // single queue > > } > > > > Unfortunately, I observed significant message loss with this > > implementation. I suspected its queue might have dropped messages, so I > > changed newBlockingQueue() implementation same as filereader. > > > > @Override > > protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { > > return new LinkedBlockingQueue<>(queueSize); > > } > > > > Then, message loss didn't happen again. > > > > Do you have any idea why it went wrong? > > > > Thank you > > Best, Jae > > >