I expected it should have not lost messages but it did. After I fixed
overridden method, it was fixed.

Anyway, thanks a lot for responding.

On Fri, Jul 10, 2015 at 2:11 PM, Yan Fang <yanfang...@gmail.com> wrote:

> Hi Jae,
>
> I think the messages are not "lost", instead, they all go to one partition,
> in your "shared queue" implementation.
>
> If you check the code in BlockingEnvelopeMap line 123
> <
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123
> >
> ,
> it puts all the messages in the queue in one partition.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <metac...@gmail.com>
> wrote:
>
> > Hi Samza devs and users
> >
> > I wrote customized Samza S3 consumer which downloads files from S3 and
> put
> > messages in BlockedEnvelopeMap. It was straightforward because there's a
> > nice example, filereader. I tried to a little optimize with
> > newBlockingQueue() method because I guess that single queue shared could
> be
> > fine because Samza container is single threaded. I added the following
> > code:
> >
> >
> > public S3Consumer(String systemName, Config config, MetricsRegistry
> > registry) {
> >         queueSize = config.getInt("systems." + systemName +
> ".queue.size",
> > 10000);
> >         bucket = config.get("systems." + systemName + ".bucket");
> >         prefix = config.get("systems." + systemName + ".prefix");
> >
> >         queue = new LinkedBlockingQueue<>(queueSize);
> >
> >         recordCounter = registry.newCounter(this.getClass().getName(),
> > "processed_records");
> >     }
> >
> > @Override
> >     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> >         return queue; // single queue
> >     }
> >
> > Unfortunately, I observed significant message loss with this
> > implementation. I suspected its queue might have dropped messages, so I
> > changed newBlockingQueue() implementation same as filereader.
> >
> > @Override
> >     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> >         return new LinkedBlockingQueue<>(queueSize);
> >     }
> >
> > Then, message loss didn't happen again.
> >
> > Do you have any idea why it went wrong?
> >
> > Thank you
> > Best, Jae
> >
>

Reply via email to