Hi Jae,

I think the messages are not "lost", instead, they all go to one partition,
in your "shared queue" implementation.

If you check the code in BlockingEnvelopeMap line 123
it puts all the messages in the queue in one partition.


Fang, Yan

On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote:

> Hi Samza devs and users
> I wrote customized Samza S3 consumer which downloads files from S3 and put
> messages in BlockedEnvelopeMap. It was straightforward because there's a
> nice example, filereader. I tried to a little optimize with
> newBlockingQueue() method because I guess that single queue shared could be
> fine because Samza container is single threaded. I added the following
> code:
> public S3Consumer(String systemName, Config config, MetricsRegistry
> registry) {
>         queueSize = config.getInt("systems." + systemName + ".queue.size",
> 10000);
>         bucket = config.get("systems." + systemName + ".bucket");
>         prefix = config.get("systems." + systemName + ".prefix");
>         queue = new LinkedBlockingQueue<>(queueSize);
>         recordCounter = registry.newCounter(this.getClass().getName(),
> "processed_records");
>     }
> @Override
>     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
>         return queue; // single queue
>     }
> Unfortunately, I observed significant message loss with this
> implementation. I suspected its queue might have dropped messages, so I
> changed newBlockingQueue() implementation same as filereader.
> @Override
>     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
>         return new LinkedBlockingQueue<>(queueSize);
>     }
> Then, message loss didn't happen again.
> Do you have any idea why it went wrong?
> Thank you
> Best, Jae

Reply via email to