Hi Jae,

I think the messages are not "lost", instead, they all go to one partition,
in your "shared queue" implementation.

If you check the code in BlockingEnvelopeMap line 123
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123>
,
it puts all the messages in the queue in one partition.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <metac...@gmail.com> wrote:

> Hi Samza devs and users
>
> I wrote customized Samza S3 consumer which downloads files from S3 and put
> messages in BlockedEnvelopeMap. It was straightforward because there's a
> nice example, filereader. I tried to a little optimize with
> newBlockingQueue() method because I guess that single queue shared could be
> fine because Samza container is single threaded. I added the following
> code:
>
>
> public S3Consumer(String systemName, Config config, MetricsRegistry
> registry) {
>         queueSize = config.getInt("systems." + systemName + ".queue.size",
> 10000);
>         bucket = config.get("systems." + systemName + ".bucket");
>         prefix = config.get("systems." + systemName + ".prefix");
>
>         queue = new LinkedBlockingQueue<>(queueSize);
>
>         recordCounter = registry.newCounter(this.getClass().getName(),
> "processed_records");
>     }
>
> @Override
>     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
>         return queue; // single queue
>     }
>
> Unfortunately, I observed significant message loss with this
> implementation. I suspected its queue might have dropped messages, so I
> changed newBlockingQueue() implementation same as filereader.
>
> @Override
>     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
>         return new LinkedBlockingQueue<>(queueSize);
>     }
>
> Then, message loss didn't happen again.
>
> Do you have any idea why it went wrong?
>
> Thank you
> Best, Jae
>

Reply via email to