Re: Producer stopped during leader switch

2016-11-01 Thread David Yu
That's good news. Thanks.

On Mon, Oct 31, 2016 at 9:59 PM Jagadish Venkatraman <jagadish1...@gmail.com>
wrote:

> Hey David,
>
> Apologies for the delayed response. There are a bunch of producer *send*
> issues that were fixed in samza-11 (SAMZA-1028, SAMZA-1003). With Samza-11,
> there's improved resiliency and if retries fail, the exception will be
> propagated to your *StreamTask*, and will end up failing the container (if
> you don't swallow it).
>
> Thanks,
> Jagadish
>
>
>
> On Fri, Oct 28, 2016 at 10:12 AM, David Yu <david...@optimizely.com>
> wrote:
>
> > Hi,
> >
> > We recently experienced a Kafka broker crash. When a new broker was
> brought
> > up, we started seeing the following errors in Samza (0.10.1):
> >
> > WARN  o.a.k.c.producer.internals.Sender - Got error produce response with
> > correlation id 5199601 on topic-partition
> > session_key_partitioned_sessions-39, retrying (2147483646
> <(214)%20748-3646> attempts left).
> > Error: NOT_LEADER_FOR_PARTITION
> >
> > Is the Producer not able to detect the new broker/leader for that
> > partition?
> >
> > Thanks,
> > David
> >
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>


Producer stopped during leader switch

2016-10-28 Thread David Yu
Hi,

We recently experienced a Kafka broker crash. When a new broker was brought
up, we started seeing the following errors in Samza (0.10.1):

WARN  o.a.k.c.producer.internals.Sender - Got error produce response with
correlation id 5199601 on topic-partition
session_key_partitioned_sessions-39, retrying (2147483646 attempts left).
Error: NOT_LEADER_FOR_PARTITION

Is the Producer not able to detect the new broker/leader for that partition?

Thanks,
David


Re: RecordTooLargeException recovery

2016-10-06 Thread David Yu
Xinyu,

Thanks for the answers. Those suggestions are helpful as well.

David

On Thu, Oct 6, 2016 at 12:48 PM xinyu liu <xinyuliu...@gmail.com> wrote:

> Hi, David,
>
> For your questions:
>
> 1) In this case Samza recovered but the changelog message was lost. In
> 0.10.1 KafkaSystemProducer has a race condition: there is small chance the
> later send success might override the previous failure. The bug is fixed in
> the upcoming 0.11.0 release (SAMZA-1019). The fix allows you to catch the
> exception and then you can decide to ignore or rethrow it. In the latter
> case the container will fail and Samza will guarantee the message will be
> reprocessed after it restarts.
>
> 2) There are several ways that might help in your case: First you can turn
> on compression for your checkpoint stream. That usually saves about 20% -
> 30%. Second, you can also bump up the max.requst.size for the producer. In
> this case you need to make sure the broker also set up the corresponding
> max message size. Last, you might also try to split the key into subkeys so
> the value will be smaller.
>
> Thanks,
> Xinyu
>
> On Thu, Oct 6, 2016 at 9:30 AM, David Yu <david...@optimizely.com> wrote:
>
> > Hi,
> >
> > Our Samza job (0.10.1) throws RecordTooLargeExceptions when flushing the
> KV
> > store change to the changelog topic, as well as sending outputs to Kafka.
> > We have two questions to this problem:
> >
> > 1. It seems that after the affected containers failed multiple times, the
> > job was able to recover and move on. This is a bit hard to understand.
> How
> > could this be recoverable? We were glad it actually did, but are
> > uncomfortable not knowing the reason behind it.
> > 2. We would be the best way to prevent this from happening? Since Samza
> > serde happens behind the scenes, there does not seem to be a good way to
> > find out the payload size in bytes before putting into the KV store. Any
> > suggestions on this?
> >
> > Thanks,
> > David
> >
>


RecordTooLargeException recovery

2016-10-06 Thread David Yu
Hi,

Our Samza job (0.10.1) throws RecordTooLargeExceptions when flushing the KV
store change to the changelog topic, as well as sending outputs to Kafka.
We have two questions to this problem:

1. It seems that after the affected containers failed multiple times, the
job was able to recover and move on. This is a bit hard to understand. How
could this be recoverable? We were glad it actually did, but are
uncomfortable not knowing the reason behind it.
2. We would be the best way to prevent this from happening? Since Samza
serde happens behind the scenes, there does not seem to be a good way to
find out the payload size in bytes before putting into the KV store. Any
suggestions on this?

Thanks,
David


Re: Job coordinator stream and job redeployment

2016-08-25 Thread David Yu
Thanks for confirming, Jake :)

On Thu, Aug 25, 2016 at 11:18 AM Jacob Maes <jacob.m...@gmail.com> wrote:

> Hey David,
>
> Yes, the config is rewritten by the JobRunner each time the job is
> restarted as dictated by the "resetJobConfig" property (which defaults to
> true) here:
>
> https://github.com/apache/samza/blob/944dd02e1d00bcce59f1fcc33ecbb2a8acd95870/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala#L85
>
> The version parameter is not the version of the config, but the version of
> the coordinator stream protocol. It will only increment if there's a change
> to the way Samza reads/writes coordinator stream messages.
>
> -Jake
>
> On Thu, Aug 25, 2016 at 8:44 AM, David Yu <david...@optimizely.com> wrote:
>
> > After digging around a bit using kafka-console-consumer.sh, I'm able to
> > peek into the coordinator stream and see the config entries.
> >
> > Looks like a redeployment would just append the new configs to this
> topic.
> > And I assume the bootstrapping process will have to consume the entire
> > stream from the beginning to get a consolidated/up-to-date job model. Am
> I
> > understanding this correctly?
> >
> > Also, according to the doc, the keys of the coordinator stream is
> > ["", "", ""]. From what I saw, the
> >  doesn't seem to change. It stays at 1 even after I
> > redeployed the job with new configs. I was assuming that it would get
> > incremented with every deployment/update.
> >
> > Thanks,
> > David
> >
> > On Wed, Aug 24, 2016 at 5:39 PM David Yu <david...@optimizely.com>
> wrote:
> >
> > > Hi,
> > >
> > > I'm trying to understand role of the coordinator stream during a job
> > > redeployment.
> > >
> > > From the Samza documentation, I'm seeing the following about the
> > > coordinator stream:
> > >
> > > The Job Coordinator bootstraps configuration from the coordinator
> stream
> > > each time upon job start-up. It periodically catches up with any new
> data
> > > written to the coordinator stream and updates the Job Model.
> > >
> > > However, it is unclear to me how this will work with "--config-path"
> when
> > > we need to redeploy a job. Does the provided config first gets
> persisted
> > to
> > > the coordinator stream, updating the previous model, or will it simply
> be
> > > ignored?
> > >
> > > Thanks,
> > > David
> > >
> >
>


Re: Job coordinator stream and job redeployment

2016-08-25 Thread David Yu
After digging around a bit using kafka-console-consumer.sh, I'm able to
peek into the coordinator stream and see the config entries.

Looks like a redeployment would just append the new configs to this topic.
And I assume the bootstrapping process will have to consume the entire
stream from the beginning to get a consolidated/up-to-date job model. Am I
understanding this correctly?

Also, according to the doc, the keys of the coordinator stream is
["", "", ""]. From what I saw, the
 doesn't seem to change. It stays at 1 even after I
redeployed the job with new configs. I was assuming that it would get
incremented with every deployment/update.

Thanks,
David

On Wed, Aug 24, 2016 at 5:39 PM David Yu <david...@optimizely.com> wrote:

> Hi,
>
> I'm trying to understand role of the coordinator stream during a job
> redeployment.
>
> From the Samza documentation, I'm seeing the following about the
> coordinator stream:
>
> The Job Coordinator bootstraps configuration from the coordinator stream
> each time upon job start-up. It periodically catches up with any new data
> written to the coordinator stream and updates the Job Model.
>
> However, it is unclear to me how this will work with "--config-path" when
> we need to redeploy a job. Does the provided config first gets persisted to
> the coordinator stream, updating the previous model, or will it simply be
> ignored?
>
> Thanks,
> David
>


Job coordinator stream and job redeployment

2016-08-24 Thread David Yu
Hi,

I'm trying to understand role of the coordinator stream during a job
redeployment.

>From the Samza documentation, I'm seeing the following about the
coordinator stream:

The Job Coordinator bootstraps configuration from the coordinator stream
each time upon job start-up. It periodically catches up with any new data
written to the coordinator stream and updates the Job Model.

However, it is unclear to me how this will work with "--config-path" when
we need to redeploy a job. Does the provided config first gets persisted to
the coordinator stream, updating the previous model, or will it simply be
ignored?

Thanks,
David


Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
Make sense. Thanks for the help, Jake!

On Wed, Aug 24, 2016 at 5:11 PM Jacob Maes <jacob.m...@gmail.com> wrote:

> We don't have any hard guidelines around that metric just because there are
> no hard rules that work for every job. For example, some jobs are very
> bursty and need to keep up with huge traffic ramp-ups even though they're
> underutilized the rest of the time.
>
> That said, yes, I have used that metric to determine whether a job has too
> much parallelism. But it was a job that had very stable throughput patterns
> and didn't have any major time spent in the window or commit methods, which
> could cause periodic spikes in utilization.
>
>
>
> On Wed, Aug 24, 2016 at 2:55 PM, David Yu <david...@optimizely.com> wrote:
>
> > Interesting.
> >
> > To me, "event-loop-utilization" looks like a good indicator that shows us
> > how busy the containers are. Is it safe to use this metric as a reference
> > when we need to scale out/in our job? For example, if I'm seeing around
> 0.3
> > utilization most of the time, maybe I can decrease the # of containers
> and
> > save some resources?
> >
> > Thanks,
> > David
> >
> > On Wed, Aug 24, 2016 at 1:27 PM Jacob Maes <jacob.m...@gmail.com> wrote:
> >
> > > >
> > > > Based on what you have described, the following should be true in
> > 0.10.1:
> > > > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > > > commit-ns (if necessary)
> > >
> > > Yes, plus any time (e.g. due to an unlucky GC at just the right moment)
> > > that happens outside those timers.  And no "if necessary" for window or
> > > commit. There will be a small value for those methods even if they
> don't
> > do
> > > anything significant because the timer runs even for no-ops
> > >
> > > Since you're on 10.1, there's another useful metric
> > > "event-loop-utilization", which represents
> > > (process-ns+window-ns+commit-ns)/event-loop-ns
> > > (as you defined it). In other words, the proportion of time spend
> working
> > > vs waiting.
> > >
> > > On Wed, Aug 24, 2016 at 10:18 AM, David Yu <david...@optimizely.com>
> > > wrote:
> > >
> > > > Great. It all makes sense now.
> > > >
> > > > With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
> > > > consistent process-ns (which we do).
> > > >
> > > > Based on what you have described, the following should be true in
> > 0.10.1:
> > > > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > > > commit-ns (if necessary)
> > > >
> > > > Is this correct?
> > > > Thanks,
> > > > David
> > > >
> > > > On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes <jacob.m...@gmail.com>
> > > wrote:
> > > >
> > > > > A couple other notes.
> > > > >
> > > > > Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> > > > > choose-ns and process-ns are both high (around 10,000,000 == 10ms,
> > > which
> > > > is
> > > > > the default poll timeout), that usually means the task is caught
> up.
> > In
> > > > > Samza 10.1 the same is true if ONLY choose-ns is high. process-ns
> is
> > > > always
> > > > > the time spent in the process() method.
> > > > >
> > > > > Based on the above, the metric numbers you provided after the SSD
> fix
> > > all
> > > > > look reasonable. They're all sub-millisecond and since choose-ns
> and
> > > > > process-ns are low, it seems that the container is chewing through
> > > > messages
> > > > > at a good rate.
> > > > >
> > > > > So I would conclude that the SSD fix was probably the right one and
> > it
> > > > just
> > > > > took the job a while to catch up to the backlog of messages. Once
> it
> > > > caught
> > > > > up, the choose-ns and process-ns increased, which is normal when
> the
> > > > > processor is waiting for new messages.
> > > > >
> > > > > -Jake
> > > > >
> > > > > On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes <jacob.m...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hey David,
> > > > 

Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
Interesting.

To me, "event-loop-utilization" looks like a good indicator that shows us
how busy the containers are. Is it safe to use this metric as a reference
when we need to scale out/in our job? For example, if I'm seeing around 0.3
utilization most of the time, maybe I can decrease the # of containers and
save some resources?

Thanks,
David

On Wed, Aug 24, 2016 at 1:27 PM Jacob Maes <jacob.m...@gmail.com> wrote:

> >
> > Based on what you have described, the following should be true in 0.10.1:
> > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > commit-ns (if necessary)
>
> Yes, plus any time (e.g. due to an unlucky GC at just the right moment)
> that happens outside those timers.  And no "if necessary" for window or
> commit. There will be a small value for those methods even if they don't do
> anything significant because the timer runs even for no-ops
>
> Since you're on 10.1, there's another useful metric
> "event-loop-utilization", which represents
> (process-ns+window-ns+commit-ns)/event-loop-ns
> (as you defined it). In other words, the proportion of time spend working
> vs waiting.
>
> On Wed, Aug 24, 2016 at 10:18 AM, David Yu <david...@optimizely.com>
> wrote:
>
> > Great. It all makes sense now.
> >
> > With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
> > consistent process-ns (which we do).
> >
> > Based on what you have described, the following should be true in 0.10.1:
> > event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
> > commit-ns (if necessary)
> >
> > Is this correct?
> > Thanks,
> > David
> >
> > On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes <jacob.m...@gmail.com>
> wrote:
> >
> > > A couple other notes.
> > >
> > > Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> > > choose-ns and process-ns are both high (around 10,000,000 == 10ms,
> which
> > is
> > > the default poll timeout), that usually means the task is caught up. In
> > > Samza 10.1 the same is true if ONLY choose-ns is high. process-ns is
> > always
> > > the time spent in the process() method.
> > >
> > > Based on the above, the metric numbers you provided after the SSD fix
> all
> > > look reasonable. They're all sub-millisecond and since choose-ns and
> > > process-ns are low, it seems that the container is chewing through
> > messages
> > > at a good rate.
> > >
> > > So I would conclude that the SSD fix was probably the right one and it
> > just
> > > took the job a while to catch up to the backlog of messages. Once it
> > caught
> > > up, the choose-ns and process-ns increased, which is normal when the
> > > processor is waiting for new messages.
> > >
> > > -Jake
> > >
> > > On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes <jacob.m...@gmail.com>
> > wrote:
> > >
> > > > Hey David,
> > > >
> > > > Answering the most recent question first, since it's also the
> easiest.
> > > :-)
> > > >
> > > > Is choose-ns the total number of ms used to choose a message from the
> > > input
> > > >> stream? What are some gating factors (e.g. serialization?) for this
> > > >> metric?
> > > >
> > > > It's the amount of time the event loop spent getting new messsages
> for
> > > > process(). It includes deserialization time and poll time which we
> > added
> > > > new metrics for, in Samza 10.1. Typically deserialization time is
> > pretty
> > > > consistent, so when you see a spike in choose-ns, it's usually
> because
> > > the
> > > > event loop is waiting for new messages. The two most common cases
> when
> > > it's
> > > > waiting are:
> > > > 1. There are no new messages in the topic partition. This is good
> > because
> > > > it means the processor is caught up.
> > > > 2. The consumer is slow and/or the buffer isn't large enough so the
> > > > BrokerProxy isn't able to keep enough messages buffered to keep the
> > event
> > > > loop busy. This is uncommon because the buffer is defaulted to 50,000
> > > > messages, which should be plenty. But if it happens, it's bad. To
> > control
> > > > this behavior, see the following properties in the config table (
> > > > http://samza.apache.org/learn/documentation/0.10/jobs/
> > > > configuration-table.html)

Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
Great. It all makes sense now.

With the SSD fix, we also upgrade to 0.10.1. So we should see pretty
consistent process-ns (which we do).

Based on what you have described, the following should be true in 0.10.1:
event-loop-ns = choose-ns + process-ns + window-ns (if necessary) +
commit-ns (if necessary)

Is this correct?
Thanks,
David

On Wed, Aug 24, 2016 at 11:27 AM Jacob Maes <jacob.m...@gmail.com> wrote:

> A couple other notes.
>
> Prior to Samza 10.1, the choose-ns was part of process-ns. So when
> choose-ns and process-ns are both high (around 10,000,000 == 10ms, which is
> the default poll timeout), that usually means the task is caught up. In
> Samza 10.1 the same is true if ONLY choose-ns is high. process-ns is always
> the time spent in the process() method.
>
> Based on the above, the metric numbers you provided after the SSD fix all
> look reasonable. They're all sub-millisecond and since choose-ns and
> process-ns are low, it seems that the container is chewing through messages
> at a good rate.
>
> So I would conclude that the SSD fix was probably the right one and it just
> took the job a while to catch up to the backlog of messages. Once it caught
> up, the choose-ns and process-ns increased, which is normal when the
> processor is waiting for new messages.
>
> -Jake
>
> On Wed, Aug 24, 2016 at 9:05 AM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
> > Hey David,
> >
> > Answering the most recent question first, since it's also the easiest.
> :-)
> >
> > Is choose-ns the total number of ms used to choose a message from the
> input
> >> stream? What are some gating factors (e.g. serialization?) for this
> >> metric?
> >
> > It's the amount of time the event loop spent getting new messsages for
> > process(). It includes deserialization time and poll time which we added
> > new metrics for, in Samza 10.1. Typically deserialization time is pretty
> > consistent, so when you see a spike in choose-ns, it's usually because
> the
> > event loop is waiting for new messages. The two most common cases when
> it's
> > waiting are:
> > 1. There are no new messages in the topic partition. This is good because
> > it means the processor is caught up.
> > 2. The consumer is slow and/or the buffer isn't large enough so the
> > BrokerProxy isn't able to keep enough messages buffered to keep the event
> > loop busy. This is uncommon because the buffer is defaulted to 50,000
> > messages, which should be plenty. But if it happens, it's bad. To control
> > this behavior, see the following properties in the config table (
> > http://samza.apache.org/learn/documentation/0.10/jobs/
> > configuration-table.html)
> > systems.system-name.samza.fetch.threshold
> > task.poll.interval.ms
> >
> >
> >
> > On Wed, Aug 24, 2016 at 8:52 AM, David Yu <david...@optimizely.com>
> wrote:
> >
> >> More updates:
> >> 1. process-envelopes rate finally stabilized and converged. Consumer lag
> >> is
> >> down to zero.
> >> 2. avg choose-ns across containers dropped overtime
> >> <https://www.dropbox.com/s/z4iiilvd7c1wrjc/Screenshot%202016
> >> -08-24%2010.46.22.png?dl=0>,
> >> which I assume is a good thing.
> >>
> >> My question:
> >> Is choose-ns the total number of ms used to choose a message from the
> >> input
> >> stream? What are some gating factors (e.g. serialization?) for this
> >> metric?
> >>
> >> Thanks,
> >> David
> >>
> >> On Wed, Aug 24, 2016 at 12:34 AM David Yu <david...@optimizely.com>
> >> wrote:
> >>
> >> > Some metric updates:
> >> > 1. We started seeing some containers with a higher choose-ns
> >> > <https://www.dropbox.com/s/06n3awdwn8ntfxd/Screenshot%202016
> >> -08-24%2000.26.07.png?dl=0>.
> >> > Not sure what would be the cause of this.
> >> > 2. We are seeing very different process-envelopes values across
> >> containers
> >> > <https://www.dropbox.com/s/n1wxtngquv607nb/Screenshot%202016
> >> -08-24%2000.21.05.png?dl=0>
> >> > .
> >> >
> >> >
> >> >
> >> > On Tue, Aug 23, 2016 at 5:56 PM David Yu <david...@optimizely.com>
> >> wrote:
> >> >
> >> >> Hi, Jake,
> >> >>
> >> >> Thanks for your suggestions. Some of my answers inline:
> >> >>
> >> >> 1.
> >> >> On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes <jacob.m...@gmail.com>
> >> w

Re: Debug Samza consumer lag issue

2016-08-24 Thread David Yu
More updates:
1. process-envelopes rate finally stabilized and converged. Consumer lag is
down to zero.
2. avg choose-ns across containers dropped overtime
<https://www.dropbox.com/s/z4iiilvd7c1wrjc/Screenshot%202016-08-24%2010.46.22.png?dl=0>,
which I assume is a good thing.

My question:
Is choose-ns the total number of ms used to choose a message from the input
stream? What are some gating factors (e.g. serialization?) for this metric?

Thanks,
David

On Wed, Aug 24, 2016 at 12:34 AM David Yu <david...@optimizely.com> wrote:

> Some metric updates:
> 1. We started seeing some containers with a higher choose-ns
> <https://www.dropbox.com/s/06n3awdwn8ntfxd/Screenshot%202016-08-24%2000.26.07.png?dl=0>.
> Not sure what would be the cause of this.
> 2. We are seeing very different process-envelopes values across containers
> <https://www.dropbox.com/s/n1wxtngquv607nb/Screenshot%202016-08-24%2000.21.05.png?dl=0>
> .
>
>
>
> On Tue, Aug 23, 2016 at 5:56 PM David Yu <david...@optimizely.com> wrote:
>
>> Hi, Jake,
>>
>> Thanks for your suggestions. Some of my answers inline:
>>
>> 1.
>> On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes <jacob.m...@gmail.com> wrote:
>>
>>> Hey David,
>>>
>>> A few initial thoughts/questions:
>>>
>>>
>>>1. Is this job using RocksDB to store the aggregations? If so, is it
>>>running on a machine with SSDs? We've seen a few performance issues
>>> related
>>>to RocksDB.
>>>   1. Not running on SSD causes slowness on disk
>>
>>  - [David] This definitely pointed me to the right direction in my
>> investigation. We do use RocksDB and just realized that our YARN cluster is
>> using c3.xlarge EC2 instances and is using a mixture of EBS and local SSD
>> storage. After digging around, we noticed that some containers were
>> persisting their KV stores in SSD while others were using EBS. We just
>> updated our YARN config to use SSD only before redeployed our jobs. So far
>> everything looks good. Will report back on the performance update.
>>
>>>   2. Prior to Samza 10.1, samza would excessively flush the store to
>>>   disk, causing RocksDB compaction issues (stalls) - SAMZA-957
>>>
>> - [David] We did notice that the log cleaner thread died on one of our
>> brokers. Not sure if this was the same problem you pointed out. Following
>> errors are logged:
>>
>> 2016-08-15 10:00:56,475 ERROR kafka.log.LogCleaner:
>> [kafka-log-cleaner-thread-0], Error due to
>>
>> java.lang.IllegalArgumentException: requirement failed: 5865800 messages
>> in segment session-store-2.0-tickets-changelog-9/09548937.log
>> but offset map can fit only 5033164. You can increase
>> log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
>>
>> at scala.Predef$.require(Predef.scala:219)
>>
>> We had to cleanup the changelog topic and restart the broker to bring
>> back the cleaner thread.
>>
>>>   3. When the RocksDB store is used as a queue, the iterator can
>>> suffer
>>>   performance issues due to RocksDBs tombstoning. (
>>>
>>> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
>>>   )
>>>
>> - [David] We use RocksDB to keep track of opening sessions and use
>> sessionId (a random hash) as the key. In that sense, this does not sound
>> like a queue. But we do iterate and delete closed sessions during windowing
>> on a minute by minute basis.
>>
>>2. Is the "messages-behind-high-watermark" metric non-zero?
>>>
>> -[David] Yes.
>>
>>>3. The SamzaContainerMetrics might be useful too. Particularly
>>>"choose-ns" and "commit-ns"
>>>
>> -[David] We are seeing the following from one of the containers (after
>> the SSD fix mentioned above):
>> choose-ns=61353
>> commit-ns=306328 (what does this metric indicate? Is this in ms?)
>> process-ns=248260
>> window-ns=150717
>>
>>>4. The only time I've personally seen slowness on the producer is if
>>>it's configured for acks="all". What is the producer config from the
>>> log?
>>>
>> - [David] We did not override this. So should be the default value (1?).
>>
>>5. The window time is high, but since it's only called once per minute,
>>>it looks like it only represents 1% of the event loop utilization. So
>>> I
>>>don't think that's a smoking gun.
>>>
>>> -Ja

Re: Debug Samza consumer lag issue

2016-08-23 Thread David Yu
Some metric updates:
1. We started seeing some containers with a higher choose-ns
<https://www.dropbox.com/s/06n3awdwn8ntfxd/Screenshot%202016-08-24%2000.26.07.png?dl=0>.
Not sure what would be the cause of this.
2. We are seeing very different process-envelopes values across containers
<https://www.dropbox.com/s/n1wxtngquv607nb/Screenshot%202016-08-24%2000.21.05.png?dl=0>
.



On Tue, Aug 23, 2016 at 5:56 PM David Yu <david...@optimizely.com> wrote:

> Hi, Jake,
>
> Thanks for your suggestions. Some of my answers inline:
>
> 1.
> On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes <jacob.m...@gmail.com> wrote:
>
>> Hey David,
>>
>> A few initial thoughts/questions:
>>
>>
>>1. Is this job using RocksDB to store the aggregations? If so, is it
>>running on a machine with SSDs? We've seen a few performance issues
>> related
>>to RocksDB.
>>   1. Not running on SSD causes slowness on disk
>
>  - [David] This definitely pointed me to the right direction in my
> investigation. We do use RocksDB and just realized that our YARN cluster is
> using c3.xlarge EC2 instances and is using a mixture of EBS and local SSD
> storage. After digging around, we noticed that some containers were
> persisting their KV stores in SSD while others were using EBS. We just
> updated our YARN config to use SSD only before redeployed our jobs. So far
> everything looks good. Will report back on the performance update.
>
>>   2. Prior to Samza 10.1, samza would excessively flush the store to
>>   disk, causing RocksDB compaction issues (stalls) - SAMZA-957
>>
> - [David] We did notice that the log cleaner thread died on one of our
> brokers. Not sure if this was the same problem you pointed out. Following
> errors are logged:
>
> 2016-08-15 10:00:56,475 ERROR kafka.log.LogCleaner:
> [kafka-log-cleaner-thread-0], Error due to
>
> java.lang.IllegalArgumentException: requirement failed: 5865800 messages
> in segment session-store-2.0-tickets-changelog-9/09548937.log
> but offset map can fit only 5033164. You can increase
> log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
>
> at scala.Predef$.require(Predef.scala:219)
>
> We had to cleanup the changelog topic and restart the broker to bring back
> the cleaner thread.
>
>>   3. When the RocksDB store is used as a queue, the iterator can
>> suffer
>>   performance issues due to RocksDBs tombstoning. (
>>
>> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
>>   )
>>
> - [David] We use RocksDB to keep track of opening sessions and use
> sessionId (a random hash) as the key. In that sense, this does not sound
> like a queue. But we do iterate and delete closed sessions during windowing
> on a minute by minute basis.
>
>2. Is the "messages-behind-high-watermark" metric non-zero?
>>
> -[David] Yes.
>
>>3. The SamzaContainerMetrics might be useful too. Particularly
>>"choose-ns" and "commit-ns"
>>
> -[David] We are seeing the following from one of the containers (after the
> SSD fix mentioned above):
> choose-ns=61353
> commit-ns=306328 (what does this metric indicate? Is this in ms?)
> process-ns=248260
> window-ns=150717
>
>>4. The only time I've personally seen slowness on the producer is if
>>it's configured for acks="all". What is the producer config from the
>> log?
>>
> - [David] We did not override this. So should be the default value (1?).
>
>5. The window time is high, but since it's only called once per minute,
>>it looks like it only represents 1% of the event loop utilization. So I
>>don't think that's a smoking gun.
>>
>> -Jake
>>
>> On Tue, Aug 23, 2016 at 9:18 AM, David Yu <david...@optimizely.com>
>> wrote:
>>
>> > Dear Samza guys,
>> >
>> > We are here for some debugging suggestions on our Samza job (0.10.0),
>> which
>> > lags behind on consumption after running for a couple of hours,
>> regardless
>> > of the number of containers allocated (currently 5).
>> >
>> > Briefly, the job aggregates events into sessions (in Avro) during
>> process()
>> > and emits snapshots of the open sessions using window() every minute.
>> This
>> > graph
>> > <https://www.dropbox.com/s/utywr1j5eku0ec0/Screenshot%
>> > 202016-08-23%2010.33.16.png?dl=0>
>> > shows
>> > you where processing started to lag (red is the number of events
>> received
>> > and green is the number of event 

Re: Debug Samza consumer lag issue

2016-08-23 Thread David Yu
Hi, Jake,

Thanks for your suggestions. Some of my answers inline:

1.
On Tue, Aug 23, 2016 at 11:53 AM Jacob Maes <jacob.m...@gmail.com> wrote:

> Hey David,
>
> A few initial thoughts/questions:
>
>
>1. Is this job using RocksDB to store the aggregations? If so, is it
>running on a machine with SSDs? We've seen a few performance issues
> related
>to RocksDB.
>   1. Not running on SSD causes slowness on disk

 - [David] This definitely pointed me to the right direction in my
investigation. We do use RocksDB and just realized that our YARN cluster is
using c3.xlarge EC2 instances and is using a mixture of EBS and local SSD
storage. After digging around, we noticed that some containers were
persisting their KV stores in SSD while others were using EBS. We just
updated our YARN config to use SSD only before redeployed our jobs. So far
everything looks good. Will report back on the performance update.

>   2. Prior to Samza 10.1, samza would excessively flush the store to
>   disk, causing RocksDB compaction issues (stalls) - SAMZA-957
>
- [David] We did notice that the log cleaner thread died on one of our
brokers. Not sure if this was the same problem you pointed out. Following
errors are logged:

2016-08-15 10:00:56,475 ERROR kafka.log.LogCleaner:
[kafka-log-cleaner-thread-0], Error due to

java.lang.IllegalArgumentException: requirement failed: 5865800 messages in
segment session-store-2.0-tickets-changelog-9/09548937.log but
offset map can fit only 5033164. You can increase
log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads

at scala.Predef$.require(Predef.scala:219)

We had to cleanup the changelog topic and restart the broker to bring back
the cleaner thread.

>   3. When the RocksDB store is used as a queue, the iterator can suffer
>   performance issues due to RocksDBs tombstoning. (
>
> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
>   )
>
- [David] We use RocksDB to keep track of opening sessions and use
sessionId (a random hash) as the key. In that sense, this does not sound
like a queue. But we do iterate and delete closed sessions during windowing
on a minute by minute basis.

   2. Is the "messages-behind-high-watermark" metric non-zero?
>
-[David] Yes.

>3. The SamzaContainerMetrics might be useful too. Particularly
>"choose-ns" and "commit-ns"
>
-[David] We are seeing the following from one of the containers (after the
SSD fix mentioned above):
choose-ns=61353
commit-ns=306328 (what does this metric indicate? Is this in ms?)
process-ns=248260
window-ns=150717

>4. The only time I've personally seen slowness on the producer is if
>it's configured for acks="all". What is the producer config from the
> log?
>
- [David] We did not override this. So should be the default value (1?).

   5. The window time is high, but since it's only called once per minute,
>it looks like it only represents 1% of the event loop utilization. So I
>don't think that's a smoking gun.
>
> -Jake
>
> On Tue, Aug 23, 2016 at 9:18 AM, David Yu <david...@optimizely.com> wrote:
>
> > Dear Samza guys,
> >
> > We are here for some debugging suggestions on our Samza job (0.10.0),
> which
> > lags behind on consumption after running for a couple of hours,
> regardless
> > of the number of containers allocated (currently 5).
> >
> > Briefly, the job aggregates events into sessions (in Avro) during
> process()
> > and emits snapshots of the open sessions using window() every minute.
> This
> > graph
> > <https://www.dropbox.com/s/utywr1j5eku0ec0/Screenshot%
> > 202016-08-23%2010.33.16.png?dl=0>
> > shows
> > you where processing started to lag (red is the number of events received
> > and green is the number of event processed). The end result is a steady
> > increase of the consumer lag
> > <https://www.dropbox.com/s/fppsv91c339xmdb/Screenshot%
> > 202016-08-23%2010.19.27.png?dl=0>.
> > What we are trying to track down is where the performance bottleneck is.
> > But it's unclear at the moment if that's in Samza or in Kafka.
> >
> > What we know so far:
> >
> >- Kafka producer seems to take a while writing to the downstream topic
> >(changelog and session snapshots) shown by various timers. Not sure
> > which
> >numbers are critical but here are the producer metrics
> ><https://www.dropbox.com/s/pzi9304gw5vmae2/Screenshot%
> > 202016-08-23%2010.57.33.png?dl=0>
> > from
> >one container.
> >- avg windowing duration peaks at one point during the day (due to the
> >number of open sessions) but everything is 

Debug Samza consumer lag issue

2016-08-23 Thread David Yu
Dear Samza guys,

We are here for some debugging suggestions on our Samza job (0.10.0), which
lags behind on consumption after running for a couple of hours, regardless
of the number of containers allocated (currently 5).

Briefly, the job aggregates events into sessions (in Avro) during process()
and emits snapshots of the open sessions using window() every minute. This
graph

shows
you where processing started to lag (red is the number of events received
and green is the number of event processed). The end result is a steady
increase of the consumer lag
.
What we are trying to track down is where the performance bottleneck is.
But it's unclear at the moment if that's in Samza or in Kafka.

What we know so far:

   - Kafka producer seems to take a while writing to the downstream topic
   (changelog and session snapshots) shown by various timers. Not sure which
   numbers are critical but here are the producer metrics
   

from
   one container.
   - avg windowing duration peaks at one point during the day (due to the
   number of open sessions) but everything is still sub-seconds
   

   .
   - our Kafka cluster doesn't seem to be overloaded
   

with writes < 60MB/s across all three brokers

>From all we know, we suspected that the bottleneck happens at producing to
Kafka. But we need some help confirming that.

Any suggestion is appreciated.

David


Re: State store changelog format

2016-08-08 Thread David Yu
That's great news. Will be keeping an eye on the release.

On Mon, Aug 8, 2016 at 10:12 AM Jacob Maes <jm...@linkedin.com.invalid>
wrote:

> Hey David,
>
> I think that behavior is meant to prevent an issue on the Kafka 0.8
> Brokers. Samza 10.1 allows compression on log compacted topics, but you'll
> need to make sure you're using Kafka 0.9 or higher on the Brokers.
>
> -Jake
>
> On Fri, Aug 5, 2016 at 10:57 PM, David Yu <david...@optimizely.com> wrote:
>
> > I guess this might be the problem:
> >
> > 2016-08-06 05:23:23,622 [main ] WARN  o.a.s.s.kafka.KafkaSystemFactory$ -
> > System name 'kafka' is being used as a changelog. Disabling compression
> > since Kafka does not support compression for log compacted topics.
> >
> >
> > Is this a 0.8.x.x Kafka producer limitation?
> >
> > On Fri, Aug 5, 2016 at 4:25 PM Jacob Maes <jacob.m...@gmail.com> wrote:
> >
> > > Hey David,
> > >
> > > If you check your container logs Kafka should print the producer config
> > >
> > > > 2016-08-05 19:18:30.134 [main] ProducerConfig [INFO] ProducerConfig
> > > values:
> > > > compression.type = gzip
> > >
> > > ...
> > >
> > > If you see the correct compression type in that config, then the Kafka
> > > producer definitely has compression enabled.
> > >
> > >
> > > If it's still not compressing after confirming that setting then it
> > > means snappy wasn't able to compress your messages. I talked to our
> > > Kafka team and they said that's not uncommon. You might want to try
> > > gzip or some other compression.
> > >
> > >
> > > On Fri, Aug 5, 2016 at 12:10 PM, David Yu <david...@optimizely.com>
> > wrote:
> > >
> > > > I'm reporting back my observations after enabling compression.
> > > >
> > > > Looks like compression is not doing anything. I'm still seeing
> > > > "compression-rate-avg=1.0" and the same "record-size-avg" from JMX
> > > > "kafka.producer" metrics.
> > > >
> > > > I did set the following:
> > > > systems.kafka.producer.compression.type=snappy
> > > >
> > > > Am I missing anything?
> > > >
> > > > Thanks,
> > > > David
> > > >
> > > > On Wed, Aug 3, 2016 at 1:48 PM David Yu <david...@optimizely.com>
> > wrote:
> > > >
> > > > > Great. Thx.
> > > > >
> > > > > On Wed, Aug 3, 2016 at 1:42 PM Jacob Maes <jacob.m...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hey David,
> > > > >>
> > > > >> what gets written to the changelog topic
> > > > >>
> > > > >> The changelog gets the same value as the store, which is the
> > > serialized
> > > > >> form of the key and value. The serdes for the store are configured
> > > with
> > > > >> the
> > > > >> properties:
> > > > >> stores.store-name.key.serde
> > > > >> stores.store-name.msg.serde
> > > > >>
> > > > >> If I want to compress the changelog topic, do I enable that from
> the
> > > > >> > producer?
> > > > >>
> > > > >> Yes. When you specify the changelog for your store, you specify it
> > in
> > > > >> terms
> > > > >> of a SystemStream (typically a Kafka topic). In the part of the
> > config
> > > > >> where you define the Kafka system, you can pass any Kafka producer
> > > > config
> > > > >> <http://kafka.apache.org/documentation.html#newproducerconfigs>.
> So
> > > to
> > > > >> configure compression you should configure the following property.
> > > > >> systems.system-name.producer.compression.type
> > > > >>
> > > > >> Hope this helps.
> > > > >> -Jake
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Wed, Aug 3, 2016 at 11:16 AM, David Yu <
> david...@optimizely.com>
> > > > >> wrote:
> > > > >>
> > > > >> > I'm trying to understand what gets written to the changelog
> topic.
> > > Is
> > > > it
> > > > >> > just the serialized value of the particular state store entry?
> If
> > I
> > > > >> want to
> > > > >> > compress the changelog topic, do I enable that from the
> producer?
> > > > >> >
> > > > >> > The reason I'm asking is that, we are seeing producer throughput
> > > > issues
> > > > >> and
> > > > >> > suspected that writing to changelog takes up most of the network
> > > > >> bandwidth.
> > > > >> >
> > > > >> > Thanks,
> > > > >> > David
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>


Re: State store changelog format

2016-08-05 Thread David Yu
I'm reporting back my observations after enabling compression.

Looks like compression is not doing anything. I'm still seeing
"compression-rate-avg=1.0" and the same "record-size-avg" from JMX
"kafka.producer" metrics.

I did set the following:
systems.kafka.producer.compression.type=snappy

Am I missing anything?

Thanks,
David

On Wed, Aug 3, 2016 at 1:48 PM David Yu <david...@optimizely.com> wrote:

> Great. Thx.
>
> On Wed, Aug 3, 2016 at 1:42 PM Jacob Maes <jacob.m...@gmail.com> wrote:
>
>> Hey David,
>>
>> what gets written to the changelog topic
>>
>> The changelog gets the same value as the store, which is the serialized
>> form of the key and value. The serdes for the store are configured with
>> the
>> properties:
>> stores.store-name.key.serde
>> stores.store-name.msg.serde
>>
>> If I want to compress the changelog topic, do I enable that from the
>> > producer?
>>
>> Yes. When you specify the changelog for your store, you specify it in
>> terms
>> of a SystemStream (typically a Kafka topic). In the part of the config
>> where you define the Kafka system, you can pass any Kafka producer config
>> <http://kafka.apache.org/documentation.html#newproducerconfigs>. So to
>> configure compression you should configure the following property.
>> systems.system-name.producer.compression.type
>>
>> Hope this helps.
>> -Jake
>>
>>
>>
>> On Wed, Aug 3, 2016 at 11:16 AM, David Yu <david...@optimizely.com>
>> wrote:
>>
>> > I'm trying to understand what gets written to the changelog topic. Is it
>> > just the serialized value of the particular state store entry? If I
>> want to
>> > compress the changelog topic, do I enable that from the producer?
>> >
>> > The reason I'm asking is that, we are seeing producer throughput issues
>> and
>> > suspected that writing to changelog takes up most of the network
>> bandwidth.
>> >
>> > Thanks,
>> > David
>> >
>>
>


Re: Understand KV store restoring

2016-08-05 Thread David Yu
Sorry, you are right. It is going through the changelog topic partition by
partition, which happens sequentially.

Thanks,
David

On Fri, Aug 5, 2016 at 1:53 PM Navina Ramesh <nram...@linkedin.com.invalid>
wrote:

> Hi David,
>
> For a given container, it should go through the entire changelog only for
> the partitions owned by the tasks in the container. Restoration should
> happen only once and not multiple times.
>
> What logs statements do you see that indicate that it is going through the
> changelog multiple times? Can you please share that ?
>
> Thanks!
> Navina
>
> On Fri, Aug 5, 2016 at 11:39 AM, David Yu <david...@optimizely.com> wrote:
>
> > Within a given container, does the restoration process go through the
> > changelog topic once to restore ALL stores in that container? From the
> > logs, I have a feeling that it is going through the changelog multiple
> > times.
> >
> > Can anyone confirm?
> >
> > Thanks,
> > David
> >
>
>
>
> --
> Navina R.
>


Understand KV store restoring

2016-08-05 Thread David Yu
Within a given container, does the restoration process go through the
changelog topic once to restore ALL stores in that container? From the
logs, I have a feeling that it is going through the changelog multiple
times.

Can anyone confirm?

Thanks,
David


Re: State store changelog format

2016-08-03 Thread David Yu
Great. Thx.

On Wed, Aug 3, 2016 at 1:42 PM Jacob Maes <jacob.m...@gmail.com> wrote:

> Hey David,
>
> what gets written to the changelog topic
>
> The changelog gets the same value as the store, which is the serialized
> form of the key and value. The serdes for the store are configured with the
> properties:
> stores.store-name.key.serde
> stores.store-name.msg.serde
>
> If I want to compress the changelog topic, do I enable that from the
> > producer?
>
> Yes. When you specify the changelog for your store, you specify it in terms
> of a SystemStream (typically a Kafka topic). In the part of the config
> where you define the Kafka system, you can pass any Kafka producer config
> <http://kafka.apache.org/documentation.html#newproducerconfigs>. So to
> configure compression you should configure the following property.
> systems.system-name.producer.compression.type
>
> Hope this helps.
> -Jake
>
>
>
> On Wed, Aug 3, 2016 at 11:16 AM, David Yu <david...@optimizely.com> wrote:
>
> > I'm trying to understand what gets written to the changelog topic. Is it
> > just the serialized value of the particular state store entry? If I want
> to
> > compress the changelog topic, do I enable that from the producer?
> >
> > The reason I'm asking is that, we are seeing producer throughput issues
> and
> > suspected that writing to changelog takes up most of the network
> bandwidth.
> >
> > Thanks,
> > David
> >
>


State store changelog format

2016-08-03 Thread David Yu
I'm trying to understand what gets written to the changelog topic. Is it
just the serialized value of the particular state store entry? If I want to
compress the changelog topic, do I enable that from the producer?

The reason I'm asking is that, we are seeing producer throughput issues and
suspected that writing to changelog takes up most of the network bandwidth.

Thanks,
David


Number of Kafka producers

2016-07-27 Thread David Yu
Is there a way to control the number of producers? Our Samza job writes a
lot of data to the downstream Kafka topic. I was wondering if there is a
way to optimize concurrency by creating more async producers.

Thanks,
David


Re: Sync Kafka producer by default?

2016-07-26 Thread David Yu
I'm using 0.10.

Thanks for confirming.

On Tue, Jul 26, 2016 at 1:12 PM Navina Ramesh <nram...@linkedin.com.invalid>
wrote:

> Hi David,
> Which version of Samza are you using? Starting with Samza 0.8, I think we
> always used async producer.
>
> Thanks!
> Navina
>
> On Tue, Jul 26, 2016 at 10:38 AM, David Yu <david...@optimizely.com>
> wrote:
>
> > Our session-roll-up Samza job is experience throughput issues. We would
> > like to fine-tune our Kafka producer. Does Samza use "producer.type=sync"
> > by default? From the JMX console, I'm seeing some producer batching
> > metrics. I thought those are for async producers.
> >
> > Thanks,
> > David
> >
>
>
>
> --
> Navina R.
>


Sync Kafka producer by default?

2016-07-26 Thread David Yu
Our session-roll-up Samza job is experience throughput issues. We would
like to fine-tune our Kafka producer. Does Samza use "producer.type=sync"
by default? From the JMX console, I'm seeing some producer batching
metrics. I thought those are for async producers.

Thanks,
David


Re: No updates to some of the store changelog partitions

2016-06-14 Thread David Yu
Some updates below.

After changing the number of contains *from 6 to 4*, we are able to see
changelog offsets moving across the board. Yay!

Now the question is: why?

With 20 input partitions/task instances, the previous setting will allocate
a different number of task instances (4)  to some containers than others
(3), whereas now things are perfectly balanced. But I'm befuddled as to why
that might cause the particular issue.

Any insight is appreciated.

Thanks,
David

On Mon, Jun 13, 2016 at 10:57 PM, David Yu <david...@optimizely.com> wrote:

> Hi, Yi,
>
> I couldn't find any errors in the log indicating any issue writing to
> those particular changelog partitions. I event went ahead and removed all
> checkpoint, coordinator and changelog topics and started fresh. This issue
> is still manifested itself with offsets of 0:
>
> partition offset
> --
> 0 0
> 1 54251
> 2 54315
> 3 54196
> 4 53548
> 5 53581
> 6 55175
> 7 54599
> 8 53694
> 9 0
> 10 53456
> 11 53450
> 12 0
> 13 54442
> 14 54759
> 15 54958
> 16 54909
> 17 53396
> 18 55442
> 19 54121
>
> In this case, we have partition 0, 2, 8 and 14 all running in the same
> YARN container, which means that it's not a container specific issue (since
> partition 2, 8 and 14 all get proper changelogs written).
>
> As I mentioned earlier, the changelog topic was auto-created by the samza
> job. So no manual overrides such as "auto-commit" was given.
>
> Thanks,
> David
>
> On Mon, Jun 13, 2016 at 9:40 AM, Yi Pan <nickpa...@gmail.com> wrote:
>
>> Hi, David,
>>
>> Did you check the log to see whether there is any log lines indicating the
>> producer issues on the three partitions that you suspect? And could you
>> also check whether you have auto-commit turned on? If your auto-commit is
>> on and producer does not report any issue writing to the changelog topic,
>> you may want to do a comparison between the local RocksDB and the one
>> persisted in changelog to verify that there indeed are some discrepancies
>> between them. Samza provides a command line tool state-storage-tool.sh to
>> recover the RocksDB state store from the changelog. You can use it to
>> recover the state store from changelog and compare w/ the local RocksDB to
>> verify if there is any discrepancies.
>>
>> Best.
>>
>> -Yi
>>
>> On Sun, Jun 12, 2016 at 12:49 PM, David Yu <david...@optimizely.com>
>> wrote:
>>
>> > Jagadish,
>> >
>> > All your description matches my understand.
>> >
>> > Here are our settings:
>> > - Our task aggregates user events into user sessions.
>> > - We have one k-v store for each task, which tracks active user sessions
>> > (with sessionId as the key).
>> > - When a user session expires, the session will be removed from the
>> store.
>> > - The changelog topic was auto created with cleanup.policy=*compact*.
>> >
>> > In terms of log compaction, I'm expecting it to keep the last log entry
>> for
>> > a given key and deletes all previous entries. For example, if we have:
>> >
>> > store.put("session1", Session1_1)  // session created
>> > store.put("session1", Session1_2)  // session updated
>> > store.delete("session1")  // session expired
>> >
>> >
>> > I'm expecting something as following in the changelog (after
>> compaction):
>> >
>> > 1 session1=Session1_1
>> > 2 session1=Session1_2
>> > 3 session1=NULL
>> >
>> >
>> > with only offset 3 retained. The next log entry should take offset 4. In
>> > that sense, the offsets should always increase monotonically, with lots
>> of
>> > gaps in between due to compaction.
>> >
>> > So again, I'm not sure why we have three changelog partitions that stop
>> > seeing movements in their offsets.
>> >
>> > Thanks,
>> > David
>> >
>> > On Sun, Jun 12, 2016 at 11:09 AM, Jagadish Venkatraman <
>> > jagadish1...@gmail.com> wrote:
>> >
>> > > Some context: Each k-v store has a changelog topic. The # of
>> partitions
>> > in
>> > > that changelog topic is equal to the # of tasks. Each task's K-V store
>> > will
>> > > be mapped to a particular partition of that changelog topic. This
>> mapping
>> > > from taskNames-changeLogPartitionNumber is stored in coordinator
>> stream.
>> >
>> >
>> > > Of course, yo

Re: No updates to some of the store changelog partitions

2016-06-13 Thread David Yu
Hi, Yi,

I couldn't find any errors in the log indicating any issue writing to those
particular changelog partitions. I event went ahead and removed all
checkpoint, coordinator and changelog topics and started fresh. This issue
is still manifested itself with offsets of 0:

partition offset
--
0 0
1 54251
2 54315
3 54196
4 53548
5 53581
6 55175
7 54599
8 53694
9 0
10 53456
11 53450
12 0
13 54442
14 54759
15 54958
16 54909
17 53396
18 55442
19 54121

In this case, we have partition 0, 2, 8 and 14 all running in the same YARN
container, which means that it's not a container specific issue (since
partition 2, 8 and 14 all get proper changelogs written).

As I mentioned earlier, the changelog topic was auto-created by the samza
job. So no manual overrides such as "auto-commit" was given.

Thanks,
David

On Mon, Jun 13, 2016 at 9:40 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, David,
>
> Did you check the log to see whether there is any log lines indicating the
> producer issues on the three partitions that you suspect? And could you
> also check whether you have auto-commit turned on? If your auto-commit is
> on and producer does not report any issue writing to the changelog topic,
> you may want to do a comparison between the local RocksDB and the one
> persisted in changelog to verify that there indeed are some discrepancies
> between them. Samza provides a command line tool state-storage-tool.sh to
> recover the RocksDB state store from the changelog. You can use it to
> recover the state store from changelog and compare w/ the local RocksDB to
> verify if there is any discrepancies.
>
> Best.
>
> -Yi
>
> On Sun, Jun 12, 2016 at 12:49 PM, David Yu <david...@optimizely.com>
> wrote:
>
> > Jagadish,
> >
> > All your description matches my understand.
> >
> > Here are our settings:
> > - Our task aggregates user events into user sessions.
> > - We have one k-v store for each task, which tracks active user sessions
> > (with sessionId as the key).
> > - When a user session expires, the session will be removed from the
> store.
> > - The changelog topic was auto created with cleanup.policy=*compact*.
> >
> > In terms of log compaction, I'm expecting it to keep the last log entry
> for
> > a given key and deletes all previous entries. For example, if we have:
> >
> > store.put("session1", Session1_1)  // session created
> > store.put("session1", Session1_2)  // session updated
> > store.delete("session1")  // session expired
> >
> >
> > I'm expecting something as following in the changelog (after compaction):
> >
> > 1 session1=Session1_1
> > 2 session1=Session1_2
> > 3 session1=NULL
> >
> >
> > with only offset 3 retained. The next log entry should take offset 4. In
> > that sense, the offsets should always increase monotonically, with lots
> of
> > gaps in between due to compaction.
> >
> > So again, I'm not sure why we have three changelog partitions that stop
> > seeing movements in their offsets.
> >
> > Thanks,
> > David
> >
> > On Sun, Jun 12, 2016 at 11:09 AM, Jagadish Venkatraman <
> > jagadish1...@gmail.com> wrote:
> >
> > > Some context: Each k-v store has a changelog topic. The # of partitions
> > in
> > > that changelog topic is equal to the # of tasks. Each task's K-V store
> > will
> > > be mapped to a particular partition of that changelog topic. This
> mapping
> > > from taskNames-changeLogPartitionNumber is stored in coordinator
> stream.
> >
> >
> > > Of course, you don't want this k-v changelog topic to keep growing. So,
> > > people configure it with some expiration. The expiration can either be:
> > > 1. Time retention: Records older than the retention are purged.
> > > 2. Compaction: Newer key-values will over-write older keys and only the
> > > most recent value is retained.
> > >
> > > I'm not sure if offsets are always monotonically increasing in Kafka or
> > > could change after a compaction/ a time based retention kicks in for
> the
> > > topic partition.
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Jun 11, 2016 at 11:53 PM, David Yu <david...@optimizely.com>
> > > wrote:
> > >
> > > > My understanding of store changelog is that, each task writes store
> > > changes
> > > > to a particular changelog partition for that task. (Does that mean
> the
> > > > changelog keys are task names?)
> > > >
> > >

Re: Update all values in RocksDB

2016-06-13 Thread David Yu
Hi, Pan,

I was reading the 10.0 documentation on Samza state management. One
particular section that explains counting the number of page views for each
user stands out to me, as it also uses a full table scan to output
aggregation results:

Note that this job effectively pauses at the hour mark to output its
> results. This is totally fine for Samza, as scanning over the contents of
> the key-value store is *quite fast*. The input stream is buffered while
> the job is doing this hourly work.


This doesn't quite match the particular concern you have described. Maybe
I'm missing something, or perhaps there is another way to do a full table
scan besides using iterators?

Also, after some investigation, I'm starting to believe that
deserialization is actually the bottleneck instead of the table traversal
itself.

Thanks
David

On Tue, Jun 7, 2016 at 10:15 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, David,
>
> Generally speaking, iterators will make a snapshot of key space of RocksDB.
> Hence, it associates with some memory overhead. More severe performance
> issue we saw before is that if you insert and delete tons of sessions in a
> short time period, the iterator seek function can be extremely slow, due to
> the need to traverse through tons of tombstone (i.e. deleted records)
> before hitting the next live record. The suggested "queue" mechanism helps
> to avoid the issue, since all deletions happen sequentially in a continuous
> block and all insertions also happen sequentially in a continuous block.
> This gives greater opportunity for the compaction thread to come in and
> cleanup all the tombstone records and make iterator faster, again.
>
> In your use case, if you can make sure that the newly inserted session's
> sessionId is *alway* at the tail of your session table, and your session
> expiration order is the same as the order determined by the sessionId, that
> should work as well.
>
> -Yi
>
> On Mon, Jun 6, 2016 at 3:17 PM, David Yu <david...@optimizely.com> wrote:
>
> > Hi, Yi,
> >
> > Yes, the sessions are keyed by the sessionId.
> >
> > In our case, iterating through all OPEN sessions is inevitable, since
> that
> > is precisely where we evaluate (base on timestamp) and close sessions. In
> > other words, the closed session queue you suggested cannot be constructed
> > without going through all the sessions periodically.
> >
> > Can you explain (on a higher level) why iteration through the entries can
> > be a slow process?
> >
> > Thanks,
> > David
> >
> > On Mon, Jun 6, 2016 at 2:34 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >
> > > Hi, David,
> > >
> > > I would recommend to keep a separate table of closed sessions as a
> > "queue",
> > > ordered by the time the session is closed. And in your window method,
> > just
> > > create an iterator in the "queue" and only make progress toward the end
> > of
> > > the "queue", and do a point deletion in the sessionStore, which I
> assume
> > > that would be keyed by the sessionId.
> > >
> > > The reason for that is:
> > > 1) RocksDB is a KV-store and it is super efficient in read/write by
> key,
> > > not by iterator
> > > 2) If you have to use iterator, making sure that the iterator only goes
> > > toward the "tail" where all meaningful work items will be is important
> to
> > > achieve fast and efficient operation. Please refer to this blog from
> > > RocksDB team:
> > >
> > >
> >
> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
> > >
> > > -Yi
> > >
> > > On Mon, Jun 6, 2016 at 2:25 PM, David Yu <david...@optimizely.com>
> > wrote:
> > >
> > > > We use Samza RocksDB to keep track of our user event sessions. The
> task
> > > > periodically calls window() to update all sessions in the store and
> > purge
> > > > all closed sessions.
> > > >
> > > > We do all of this in the same iterator loop.
> > > >
> > > > Here's how we are doing it:
> > > >
> > > >
> > > > public void window(MessageCollector collector, TaskCoordinator
> > > coordinator)
> > > > throws Exception {
> > > >
> > > > KeyValueIterator<String, Session> it = sessionStore.all();
> > > >
> > > > while (it.hasNext()) {
> > > >
> > > > Entry<String, Session> entry = it.next();
> > > > Session session = entry.getValue();
> > > >
> > > > update(session);
> > > >
> > > > if (session.getStatus() == Status.CLOSED) {
> > > > sessionStore.delete(entry.getKey());
> > > > } else {
> > > > sessionStore.put(entry.getKey(), session);
> > > > }
> > > > }
> > > > }
> > > >
> > > >
> > > > The question is: is this the correct/efficient way to do a
> read+update
> > > for
> > > > RocksDB?
> > > >
> > > > Thanks,
> > > > David
> > > >
> > >
> >
>


Re: No updates to some of the store changelog partitions

2016-06-12 Thread David Yu
Jagadish,

All your description matches my understand.

Here are our settings:
- Our task aggregates user events into user sessions.
- We have one k-v store for each task, which tracks active user sessions
(with sessionId as the key).
- When a user session expires, the session will be removed from the store.
- The changelog topic was auto created with cleanup.policy=*compact*.

In terms of log compaction, I'm expecting it to keep the last log entry for
a given key and deletes all previous entries. For example, if we have:

store.put("session1", Session1_1)  // session created
store.put("session1", Session1_2)  // session updated
store.delete("session1")  // session expired


I'm expecting something as following in the changelog (after compaction):

1 session1=Session1_1
2 session1=Session1_2
3 session1=NULL


with only offset 3 retained. The next log entry should take offset 4. In
that sense, the offsets should always increase monotonically, with lots of
gaps in between due to compaction.

So again, I'm not sure why we have three changelog partitions that stop
seeing movements in their offsets.

Thanks,
David

On Sun, Jun 12, 2016 at 11:09 AM, Jagadish Venkatraman <
jagadish1...@gmail.com> wrote:

> Some context: Each k-v store has a changelog topic. The # of partitions in
> that changelog topic is equal to the # of tasks. Each task's K-V store will
> be mapped to a particular partition of that changelog topic. This mapping
> from taskNames-changeLogPartitionNumber is stored in coordinator stream.


> Of course, you don't want this k-v changelog topic to keep growing. So,
> people configure it with some expiration. The expiration can either be:
> 1. Time retention: Records older than the retention are purged.
> 2. Compaction: Newer key-values will over-write older keys and only the
> most recent value is retained.
>
> I'm not sure if offsets are always monotonically increasing in Kafka or
> could change after a compaction/ a time based retention kicks in for the
> topic partition.
>
>
>
>
>
> On Sat, Jun 11, 2016 at 11:53 PM, David Yu <david...@optimizely.com>
> wrote:
>
> > My understanding of store changelog is that, each task writes store
> changes
> > to a particular changelog partition for that task. (Does that mean the
> > changelog keys are task names?)
> >
> > One thing that confuses me is that, the last offsets of some changelog
> > partitions do not move. I'm using the kafka GetOffsetShell tool to get
> the
> > last offsets for each partition. The result looks like this:
> >
> > partition   offset
> > 0 7090
> > 1 3737937
> > 2 3733222
> > 3 3719065
> > 4 3730208
> > 5 3731128
> > 6 3734669
> > 7 3691461
> > 8 3759133
> > 9 7286
> > 10 3690347
> > 11 3722450
> > 12 7376
> > 13 3738454
> > 14 3742316
> > 15 3710512
> > 16 3777267
> > 17 3750596
> > 18 3728185
> > 19 3694470
> >
> > As you can see, three of the partitions barely got any updates. In fact,
> > the offsets stopped moving for a while. The traffic for each task should
> be
> > fairly balanced. I checked the task log and made sure that the stores for
> > these partitions are actively updated.
> >
> > Any idea why this is happening? Or am I missing something?
> >
> > Thanks,
> > David
> >
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>


No updates to some of the store changelog partitions

2016-06-12 Thread David Yu
My understanding of store changelog is that, each task writes store changes
to a particular changelog partition for that task. (Does that mean the
changelog keys are task names?)

One thing that confuses me is that, the last offsets of some changelog
partitions do not move. I'm using the kafka GetOffsetShell tool to get the
last offsets for each partition. The result looks like this:

partition   offset
0 7090
1 3737937
2 3733222
3 3719065
4 3730208
5 3731128
6 3734669
7 3691461
8 3759133
9 7286
10 3690347
11 3722450
12 7376
13 3738454
14 3742316
15 3710512
16 3777267
17 3750596
18 3728185
19 3694470

As you can see, three of the partitions barely got any updates. In fact,
the offsets stopped moving for a while. The traffic for each task should be
fairly balanced. I checked the task log and made sure that the stores for
these partitions are actively updated.

Any idea why this is happening? Or am I missing something?

Thanks,
David


Re: Update all values in RocksDB

2016-06-06 Thread David Yu
Hi, Yi,

Yes, the sessions are keyed by the sessionId.

In our case, iterating through all OPEN sessions is inevitable, since that
is precisely where we evaluate (base on timestamp) and close sessions. In
other words, the closed session queue you suggested cannot be constructed
without going through all the sessions periodically.

Can you explain (on a higher level) why iteration through the entries can
be a slow process?

Thanks,
David

On Mon, Jun 6, 2016 at 2:34 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, David,
>
> I would recommend to keep a separate table of closed sessions as a "queue",
> ordered by the time the session is closed. And in your window method, just
> create an iterator in the "queue" and only make progress toward the end of
> the "queue", and do a point deletion in the sessionStore, which I assume
> that would be keyed by the sessionId.
>
> The reason for that is:
> 1) RocksDB is a KV-store and it is super efficient in read/write by key,
> not by iterator
> 2) If you have to use iterator, making sure that the iterator only goes
> toward the "tail" where all meaningful work items will be is important to
> achieve fast and efficient operation. Please refer to this blog from
> RocksDB team:
>
> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
>
> -Yi
>
> On Mon, Jun 6, 2016 at 2:25 PM, David Yu <david...@optimizely.com> wrote:
>
> > We use Samza RocksDB to keep track of our user event sessions. The task
> > periodically calls window() to update all sessions in the store and purge
> > all closed sessions.
> >
> > We do all of this in the same iterator loop.
> >
> > Here's how we are doing it:
> >
> >
> > public void window(MessageCollector collector, TaskCoordinator
> coordinator)
> > throws Exception {
> >
> > KeyValueIterator<String, Session> it = sessionStore.all();
> >
> > while (it.hasNext()) {
> >
> > Entry<String, Session> entry = it.next();
> > Session session = entry.getValue();
> >
> > update(session);
> >
> > if (session.getStatus() == Status.CLOSED) {
> > sessionStore.delete(entry.getKey());
> > } else {
> > sessionStore.put(entry.getKey(), session);
> > }
> > }
> > }
> >
> >
> > The question is: is this the correct/efficient way to do a read+update
> for
> > RocksDB?
> >
> > Thanks,
> > David
> >
>


Re: Samza job killed by left orphaned on YARN

2016-05-19 Thread David Yu
Just stumbled upon this post and sees to be the same issue:

https://issues.apache.org/jira/browse/SAMZA-498


We followed the fix to create a wrapper kill script and everything works.

Do we have a plan to fix this in the next version of Samza?

Thanks,
David

On Wed, May 18, 2016 at 11:53 AM, Jacob Maes <jacob.m...@gmail.com> wrote:

> Hmm, could there be something in your job holding up the container shutdown
> process? Perhaps something ignoring SIGTERM/Thread.interrupt, by chance?
>
> Also, I think there's a YARN property specifying the amount of time the NM
> waits between sending a SIGTERM and a SIGKILL, though I can't find it at
> the moment.
>
> -Jake
>
> On Wed, May 18, 2016 at 10:32 AM, David Yu <david...@optimizely.com>
> wrote:
>
> > From the NM log, I'm seeing:
> >
> > 2016-05-18 06:29:06,248 INFO
> >
> >
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
> > Cleaning up container
> container_e01_1463512986427_0007_01_022016-05-18
> > 06:29:06,265 INFO
> >
> >
> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
> > Application *application_1463512986427_0007* transitioned from RUNNING to
> > FINISHING_CONTAINERS_WAIT
> >
> > (*Highlighted* is the particular samza application.)
> >
> > The status never transitioned from FINISHING_CONTAINERS_WAIT :(
> >
> >
> >
> > On Wed, May 18, 2016 at 10:21 AM, David Yu <david...@optimizely.com>
> > wrote:
> >
> > > Jacob,
> > >
> > > I have checked and made sure that NM is running on the node:
> > >
> > > $ ps aux | grep java
> > > ...
> > > yarn 25623  0.5  0.8 2366536 275488 ?  Sl   May17   7:04
> > > /usr/java/jdk1.8.0_51/bin/java -Dproc_nodemanager
> > >  ... org.apache.hadoop.yarn.server.nodemanager.NodeManager
> > >
> > >
> > >
> > > Thanks,
> > > David
> > >
> > > On Wed, May 18, 2016 at 7:08 AM, Jacob Maes <jacob.m...@gmail.com>
> > wrote:
> > >
> > >> Hey David,
> > >>
> > >> The only time I've seen orphaned containers is when the NM dies. If
> the
> > NM
> > >> isn't running, the RM has no means to kill the containers on a node.
> Can
> > >> you verify that the NM was healthy at the time of the shut down?
> > >>
> > >> If it wasn't healthy and/or it was restarted, one option that may help
> > is
> > >> NM Recovery:
> > >>
> > >>
> >
> https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/NodeManagerRestart.html
> > >>
> > >> With NM Recovery, the NM will resume control over containers that were
> > >> running when the NM shut down. This option has virtually eliminated
> > >> orphaned containers in our clusters.
> > >>
> > >> -Jake
> > >>
> > >> On Tue, May 17, 2016 at 11:54 PM, David Yu <david...@optimizely.com>
> > >> wrote:
> > >>
> > >> > Samza version = 0.10.0
> > >> > YARN version = Hadoop 2.6.0-cdh5.4.9
> > >> >
> > >> > We are experience issues when killing a Samza job:
> > >> >
> > >> > $ yarn application -kill application_1463512986427_0007
> > >> >
> > >> > Killing application application_1463512986427_0007
> > >> >
> > >> > 16/05/18 06:29:05 INFO impl.YarnClientImpl: Killed application
> > >> > application_1463512986427_0007
> > >> >
> > >> > RM shows that the job is killed. However, the samza containers are
> > still
> > >> > left running.
> > >> >
> > >> > Any idea why this is happening?
> > >> >
> > >> > Thanks,
> > >> > David
> > >> >
> > >>
> > >
> > >
> >
>


Re: Samza job killed by left orphaned on YARN

2016-05-18 Thread David Yu
>From the NM log, I'm seeing:

2016-05-18 06:29:06,248 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
Cleaning up container container_e01_1463512986427_0007_01_022016-05-18
06:29:06,265 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
Application *application_1463512986427_0007* transitioned from RUNNING to
FINISHING_CONTAINERS_WAIT

(*Highlighted* is the particular samza application.)

The status never transitioned from FINISHING_CONTAINERS_WAIT :(



On Wed, May 18, 2016 at 10:21 AM, David Yu <david...@optimizely.com> wrote:

> Jacob,
>
> I have checked and made sure that NM is running on the node:
>
> $ ps aux | grep java
> ...
> yarn 25623  0.5  0.8 2366536 275488 ?  Sl   May17   7:04
> /usr/java/jdk1.8.0_51/bin/java -Dproc_nodemanager
>  ... org.apache.hadoop.yarn.server.nodemanager.NodeManager
>
>
>
> Thanks,
> David
>
> On Wed, May 18, 2016 at 7:08 AM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
>> Hey David,
>>
>> The only time I've seen orphaned containers is when the NM dies. If the NM
>> isn't running, the RM has no means to kill the containers on a node. Can
>> you verify that the NM was healthy at the time of the shut down?
>>
>> If it wasn't healthy and/or it was restarted, one option that may help is
>> NM Recovery:
>>
>> https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/NodeManagerRestart.html
>>
>> With NM Recovery, the NM will resume control over containers that were
>> running when the NM shut down. This option has virtually eliminated
>> orphaned containers in our clusters.
>>
>> -Jake
>>
>> On Tue, May 17, 2016 at 11:54 PM, David Yu <david...@optimizely.com>
>> wrote:
>>
>> > Samza version = 0.10.0
>> > YARN version = Hadoop 2.6.0-cdh5.4.9
>> >
>> > We are experience issues when killing a Samza job:
>> >
>> > $ yarn application -kill application_1463512986427_0007
>> >
>> > Killing application application_1463512986427_0007
>> >
>> > 16/05/18 06:29:05 INFO impl.YarnClientImpl: Killed application
>> > application_1463512986427_0007
>> >
>> > RM shows that the job is killed. However, the samza containers are still
>> > left running.
>> >
>> > Any idea why this is happening?
>> >
>> > Thanks,
>> > David
>> >
>>
>
>


Samza not consuming

2016-03-19 Thread David Yu
I'm trying to debug our samza job, which seem to be stuck from consuming
from our Kafka stream.

Every time I redeploy the job, only the same handful of events get
consumed, and then no more events get processed. I manually checked to make
sure the input stream is live and flowing. I also tried both the following:

systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.consumer.auto.offset.reset=smallest

I'm also seeing the following from the log:

... partitionMetadata={Partition
[partition=0]=SystemStreamPartitionMetadata [oldestOffset=144907,
newestOffset=202708, upcomingOffset=202709], Partition
[partition=5]=SystemStreamPartitionMetadata [oldestOffset=140618,
newestOffset=200521, upcomingOffset=200522], ...


Not sure what other ways I could diagnose this problem. Any suggestion is
appreciated.


Re: Samza not consuming

2016-03-19 Thread David Yu
Finally seeing events flowing again.

Yes, the "systems.kafka.consumer.auto.offset.reset" option is probably not
a factor here. And yes, I am using checkpointing (kafka). Not sure if the
offsets are messed up. But I was able to use
"systems.kafka.streams.nogoalids.samza.reset.offset=true" to reset the
offsets to the newest ones. After that, events started coming. Still, it is
unclear to me how things got stuck in the first place.

On Wed, Mar 16, 2016 at 2:31 PM, Navina Ramesh <nram...@linkedin.com.invalid
> wrote:

> HI David,
> This configuration you have tweaked
> (systems.kafka.consumer.auto.offset.reset) is honored only when one of the
> following condition holds:
> * topic doesn't exist
> * checkpoint is older than the maximum message history retained by the
> brokers
>
> So, my questions are :
> Are you using checkpointing? If you do, you can read the checkpoint topic
> to see the offset that is being used to fetch data.
>
> If you are not using checkpoints, then samza uses
> systems.kafka.samza.offset.default to decide whether to start reading from
> the earliest (oldest data) or upcoming (newest data) offset in the stream.
>
> This could explain from where your job is trying to consume and you can
> cross-check with the broker.
> For the purpose of debugging, you can print a debug line in process()
> method to print the offset of the message you are processing
> (message.getOffset). Please remember to remove the debug line after
> troubleshooting. Else you risk filling up your logs.
>
> Let me know if you have more questions.
>
> Thanks!
> Navina
>
> On Wed, Mar 16, 2016 at 2:12 PM, David Yu <david...@optimizely.com> wrote:
>
> > I'm trying to debug our samza job, which seem to be stuck from consuming
> > from our Kafka stream.
> >
> > Every time I redeploy the job, only the same handful of events get
> > consumed, and then no more events get processed. I manually checked to
> make
> > sure the input stream is live and flowing. I also tried both the
> following:
> >
> > systems.kafka.consumer.auto.offset.reset=largest
> > systems.kafka.consumer.auto.offset.reset=smallest
> >
> > I'm also seeing the following from the log:
> >
> > ... partitionMetadata={Partition
> > [partition=0]=SystemStreamPartitionMetadata [oldestOffset=144907,
> > newestOffset=202708, upcomingOffset=202709], Partition
> > [partition=5]=SystemStreamPartitionMetadata [oldestOffset=140618,
> > newestOffset=200521, upcomingOffset=200522], ...
> >
> >
> > Not sure what other ways I could diagnose this problem. Any suggestion is
> > appreciated.
> >
>
>
>
> --
> Navina R.
>


Re: Samza not consuming

2016-03-19 Thread David Yu
No, instead, I updated the checkpoint topic with the "upcoming" offsets. (I
should have done a check before that though).

So a related question: if I delete the checkpoint topic from Kafka, that
would essentially clear up all the offset info and samza will be able to
recreate this topic with the latest offsets (e.g. smallest). Is that
correct? Just wanna find an easy way to do a "reprocess all" kind of
operation.

Thanks.

On Wed, Mar 16, 2016 at 3:25 PM, Navina Ramesh <nram...@linkedin.com.invalid
> wrote:

> Strange. I am unable to comment on the behavior because I don't know what
> your checkpoints looked like in the checkpoint topic.
>
> Did you try reading the checkpoint topic log ?
>
> If you setting systems.kafka.streams.nogoalids.samza.reset.offset = true,
> you are essentially ignoring checkpoints for that stream. Do verify that
> you are reading from the correct offset in the stream :)
>
> Thanks!
> Navina
>
> On Wed, Mar 16, 2016 at 3:16 PM, David Yu <david...@optimizely.com> wrote:
>
> > Finally seeing events flowing again.
> >
> > Yes, the "systems.kafka.consumer.auto.offset.reset" option is probably
> not
> > a factor here. And yes, I am using checkpointing (kafka). Not sure if the
> > offsets are messed up. But I was able to use
> > "systems.kafka.streams.nogoalids.samza.reset.offset=true" to reset the
> > offsets to the newest ones. After that, events started coming. Still, it
> is
> > unclear to me how things got stuck in the first place.
> >
> > On Wed, Mar 16, 2016 at 2:31 PM, Navina Ramesh
> > <nram...@linkedin.com.invalid
> > > wrote:
> >
> > > HI David,
> > > This configuration you have tweaked
> > > (systems.kafka.consumer.auto.offset.reset) is honored only when one of
> > the
> > > following condition holds:
> > > * topic doesn't exist
> > > * checkpoint is older than the maximum message history retained by the
> > > brokers
> > >
> > > So, my questions are :
> > > Are you using checkpointing? If you do, you can read the checkpoint
> topic
> > > to see the offset that is being used to fetch data.
> > >
> > > If you are not using checkpoints, then samza uses
> > > systems.kafka.samza.offset.default to decide whether to start reading
> > from
> > > the earliest (oldest data) or upcoming (newest data) offset in the
> > stream.
> > >
> > > This could explain from where your job is trying to consume and you can
> > > cross-check with the broker.
> > > For the purpose of debugging, you can print a debug line in process()
> > > method to print the offset of the message you are processing
> > > (message.getOffset). Please remember to remove the debug line after
> > > troubleshooting. Else you risk filling up your logs.
> > >
> > > Let me know if you have more questions.
> > >
> > > Thanks!
> > > Navina
> > >
> > > On Wed, Mar 16, 2016 at 2:12 PM, David Yu <david...@optimizely.com>
> > wrote:
> > >
> > > > I'm trying to debug our samza job, which seem to be stuck from
> > consuming
> > > > from our Kafka stream.
> > > >
> > > > Every time I redeploy the job, only the same handful of events get
> > > > consumed, and then no more events get processed. I manually checked
> to
> > > make
> > > > sure the input stream is live and flowing. I also tried both the
> > > following:
> > > >
> > > > systems.kafka.consumer.auto.offset.reset=largest
> > > > systems.kafka.consumer.auto.offset.reset=smallest
> > > >
> > > > I'm also seeing the following from the log:
> > > >
> > > > ... partitionMetadata={Partition
> > > > [partition=0]=SystemStreamPartitionMetadata [oldestOffset=144907,
> > > > newestOffset=202708, upcomingOffset=202709], Partition
> > > > [partition=5]=SystemStreamPartitionMetadata [oldestOffset=140618,
> > > > newestOffset=200521, upcomingOffset=200522], ...
> > > >
> > > >
> > > > Not sure what other ways I could diagnose this problem. Any
> suggestion
> > is
> > > > appreciated.
> > > >
> > >
> > >
> > >
> > > --
> > > Navina R.
> > >
> >
>
>
>
> --
> Navina R.
>


Re: No samza consumer group found

2016-03-16 Thread David Yu
Good to know. Thanks for the help, folks.

On Tue, Mar 15, 2016 at 11:10 PM, Liu Bo <diabl...@gmail.com> wrote:

> Hi David
>
> we also have this monitoring requirement
>
> one way I can think of is periodically reading the latest checkpoint
> information from the corresponding check_point topic, and then compare it
> with the topic offset.
>
> it's not that straight forward comparing to existing tools such as yours,
> and requires additional effort to integrate with dashboard tools such as
> grafana.
>
> I think this group have better ways to do this ;-)
>
>
> On 16 March 2016 at 13:51, David Yu <david...@optimizely.com> wrote:
>
> > I asked simply because I would like to monitor the topic consumed by our
> > samza job using tools like this:
> > https://github.com/quantifind/KafkaOffsetMonitor
> >
> >
> >
> > On Tue, Mar 15, 2016 at 9:13 PM, Jagadish Venkatraman <
> > jagadish1...@gmail.com> wrote:
> >
> > > Hi David,
> > >
> > > Samza is using the simple consumer - I'm not aware of a notion of group
> > id.
> > > Others will comment on that.
> > >
> > > As far as I know, the client id for a particular consumer instance is
> > > defined by a combination of the job name, the current timestamp and a
> > > sequence number.
> > >
> > > val clientId = KafkaUtil.getClientId("samza-consumer", config)
> > >
> > > KafkaUtil.getClientId has this logic.
> > >
> > > I'm curious about your usecase as to why you are interested in
> inspecting
> > > this information?
> > >
> > > Thanks,
> > > Jagadish
> > >
> > > On Tuesday, March 15, 2016, David Yu <david...@optimizely.com> wrote:
> > >
> > > > Our samza job is consuming from a Kafka topic. AFAIU, samza will auto
> > > > assign the job a consumer group id and client id. However, I'm not
> able
> > > to
> > > > see that showing up under zookeeper. Am I missing something?
> > > >
> > >
> >
>
>
>
> --
> All the best
>
> Liu Bo
>


Re: No samza consumer group found

2016-03-15 Thread David Yu
I asked simply because I would like to monitor the topic consumed by our
samza job using tools like this:
https://github.com/quantifind/KafkaOffsetMonitor



On Tue, Mar 15, 2016 at 9:13 PM, Jagadish Venkatraman <
jagadish1...@gmail.com> wrote:

> Hi David,
>
> Samza is using the simple consumer - I'm not aware of a notion of group id.
> Others will comment on that.
>
> As far as I know, the client id for a particular consumer instance is
> defined by a combination of the job name, the current timestamp and a
> sequence number.
>
> val clientId = KafkaUtil.getClientId("samza-consumer", config)
>
> KafkaUtil.getClientId has this logic.
>
> I'm curious about your usecase as to why you are interested in inspecting
> this information?
>
> Thanks,
> Jagadish
>
> On Tuesday, March 15, 2016, David Yu <david...@optimizely.com> wrote:
>
> > Our samza job is consuming from a Kafka topic. AFAIU, samza will auto
> > assign the job a consumer group id and client id. However, I'm not able
> to
> > see that showing up under zookeeper. Am I missing something?
> >
>


Re: Understand Samza default metrics

2016-02-24 Thread David Yu
Thanks guys. This is really helpful! I assume we have a plan to publish
these docs on the samza wiki.

-David

On Wed, Feb 24, 2016 at 10:34 AM, Xinyu Liu <xi...@linkedin.com.invalid>
wrote:

> Thanks, Shadi. The doc is really useful!
>
> @Milinda: thanks for pointing it out. Process-calls includes both
> process-envelopes and process-null-envelopes, so it should be
> process-envelopes in David's example.
>
> Thanks,
> Xinyu
>
> On Wed, Feb 24, 2016 at 9:52 AM, Abdollahian Noghabi, Shadi <
> abdol...@illinois.edu> wrote:
>
> > I have attached the document to SAMZA-702.<
> > https://issues.apache.org/jira/browse/SAMZA-702>
> >
> >
> > On Feb 24, 2016, at 9:33 AM, Milinda Pathirage <mpath...@umail.iu.edu
> > <mailto:mpath...@umail.iu.edu>> wrote:
> >
> > Hi Shadi,
> >
> > Attachment is not there in your mail. I think mailing list dropped the
> > attachment. IMHO, we should create a JIRA issue and attach the doc to the
> > issue so that we can move it to Samza docs.
> >
> > On Wed, Feb 24, 2016 at 12:27 PM, Abdollahian Noghabi, Shadi <
> > abdol...@illinois.edu<mailto:abdol...@illinois.edu>> wrote:
> >
> > I have a document with some of the metrics. I had gathered these around
> > last summer, so they may be out-of-date. I have attached the document to
> > this email. Hope it can help.
> >
> >
> >
> >
> >
> >
> > On Feb 24, 2016, at 7:10 AM, Milinda Pathirage <mpath...@umail.iu.edu
> > <mailto:mpath...@umail.iu.edu>>
> > wrote:
> >
> > Hi David and Xinyu,
> >
> > If you want to get the number of messages processed, "process-envelopes"
> > is
> > the correct metrics. "process-calls" gives measure the number of times
> > RunLoop#process method is called. So "process-calls" get updated even
> > without processing any messages (This happens when no new messages in
> > input
> > stream). "process-ns" can be used as the average time taken to process a
> > message. But this average also includes time taken to process null
> > messages. So I don't trust the accuracy of that metric.
> >
> > Each metric emitted by Samza contains a header which includes job name,
> > job
> > id, container name and metric timestamp. You can use it to calculate
> > messages per second values.
> >
> > If you are using KV store, KeyValueStoreMetrics contains metrics such as
> > bytes read, bytes write, puts and gets for each store.
> >
> > Thanks
> > Milinda
> >
> > On Tue, Feb 23, 2016 at 8:26 PM, xinyu liu <xinyuliu...@gmail.com
>  > xinyuliu...@gmail.com>>
> > wrote:
> >
> > Hi, David,
> >
> > I didn't find a wiki page that contains the descriptions of all Samza
> > metrics. You can find the basic metrics by googling the following
> > classes:
> > SamzaContainerMetrics, TaskInstanceMetrics, SystemConsumersMetrics and
> > SystemProducersMetrics. For your example, you can use the
> > "process-calls"
> > in SamzaContainerMetrics to get the processed message count, and divide
> > the
> > delta by time to get the messages processed per sec. In practice, you
> > can
> > either use JConsole to connect to the running Samza container or consume
> > the MetricsSnapshot topic to get the detailed metrics.
> >
> > Thanks,
> > Xinyu
> >
> > On Tue, Feb 23, 2016 at 4:51 PM, David Yu <david...@optimizely.com
>  > david...@optimizely.com>>
> > wrote:
> >
> > Hi,
> >
> > Where can I find the detailed descriptions of the out of the box
> > metrics
> > provided by MetricsSnapshotReporterFactory and JmxReporterFactory?
> >
> > I'm interested in seeing the basic metrics of the my samza job (e.g.
> > messages_processed_per_sec). But it's hard to ping point to the
> > specific
> > metric that shows me that.
> >
> > Thanks,
> > David
> >
> >
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
> >
>


Understand Samza default metrics

2016-02-23 Thread David Yu
Hi,

Where can I find the detailed descriptions of the out of the box metrics
provided by MetricsSnapshotReporterFactory and JmxReporterFactory?

I'm interested in seeing the basic metrics of the my samza job (e.g.
messages_processed_per_sec). But it's hard to ping point to the specific
metric that shows me that.

Thanks,
David