Re: Debugging Kafka Streams Windowing

2017-06-08 Thread Mahendra Kariya
Yes. To some extent. But the rebalancing is now taking a lot of time. There
are situations where we have to manually restart the Streams app because
rebalancing is kind of "stuck" for several minutes.

On 7 June 2017 at 06:28, Garrett Barton  wrote:

> Mahendra,
>
>  Did increasing those two properties do the trick?  I am running into this
> exact issue testing streams out on a single Kafka instance.  Yet I can
> manually start a consumer and read the topics fine while its busy doing
> this dead stuffs.
>
> On Tue, May 23, 2017 at 12:30 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com> wrote:
>
> > On 22 May 2017 at 16:09, Guozhang Wang  wrote:
> >
> > > For
> > > that issue I'd suspect that there is a network issue, or maybe the
> > network
> > > is just saturated already and the heartbeat request / response were not
> > > exchanged in time between the consumer and the broker, or the sockets
> > being
> > > dropped because of socket limit. Under this cases not all consumers may
> > be
> > > affected, but since the associated issue is from "AbstractCoordinator"
> > > class which is part of the consumer client, I'd still be surprised if
> it
> > is
> > > actually due to Streams itself with the same consumer config settings,
> > but
> > > not to consumers.
> > >
> >
> > Yes. This is the conclusion that even we are coming to after further
> > investigation. But didn't want to post it here until we were sure.
> >
> > We are experimenting with increasing the default timeouts, particularly
> > hearbeat.interval.ms and session.timeout.ms. So far, the things have
> been
> > running fine. But we will let it run for a few more days before closing
> > this issue.
> >
>


Re: Debugging Kafka Streams Windowing

2017-05-22 Thread Mahendra Kariya
On 22 May 2017 at 16:09, Guozhang Wang  wrote:

> For
> that issue I'd suspect that there is a network issue, or maybe the network
> is just saturated already and the heartbeat request / response were not
> exchanged in time between the consumer and the broker, or the sockets being
> dropped because of socket limit. Under this cases not all consumers may be
> affected, but since the associated issue is from "AbstractCoordinator"
> class which is part of the consumer client, I'd still be surprised if it is
> actually due to Streams itself with the same consumer config settings, but
> not to consumers.
>

Yes. This is the conclusion that even we are coming to after further
investigation. But didn't want to post it here until we were sure.

We are experimenting with increasing the default timeouts, particularly
hearbeat.interval.ms and session.timeout.ms. So far, the things have been
running fine. But we will let it run for a few more days before closing
this issue.


Re: Debugging Kafka Streams Windowing

2017-05-16 Thread Mahendra Kariya
I am confused. If what you have mentioned is the case, then

   - Why would restarting the stream processes resolve the issue?
   - Why do we get these infinite stream of exceptions only on some boxes
   in the cluster and not all?
   - We have tens of other consumers running just fine. We see this issue
   only in the streams one.




On Tue, May 16, 2017 at 3:36 PM, Guozhang Wang  wrote:

> Sorry I mis-read your email and confused it with another thread.
>
> As for your observed issue, it seems "broker-05:6667" is in an unstable
> state which is the group coordinator for this stream process app with app
> id (i.e. group id) "grp_id". Since the streams app cannot commit offsets
> anymore due to group coordinator not available, it cannot proceed but
> repeatedly re-discovers the coordinator.
>
> This is not generally an issue for streams, but for consumer group
> membership management. In practice you need to make sure that the offset
> topic is replicate (I think by default it is 3 replicas) so that whenever
> the leader of a certain offset topic partition, hence the group
> coordinator, fails, another broker can take over so that any consumer
> groups that is corresponding to that offset topic partition won't be
> blocked.
>
>
> Guozhang
>
>
>
> On Mon, May 15, 2017 at 7:33 PM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> > wrote:
>
> > Thanks for the reply Guozhang! But I think we are talking of 2 different
> > issues here. KAFKA-5167 is for LockException. We face this issue
> > intermittently, but not a lot.
> >
> > There is also another issue where a particular broker is marked as dead
> for
> > a group id and Streams process never recovers from this exception.
> >
> > On Mon, May 15, 2017 at 11:28 PM, Guozhang Wang 
> > wrote:
> >
> > > I'm wondering if it is possibly due to KAFKA-5167? In that case, the
> > "other
> > > thread" will keep retrying on grabbing the lock.
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, May 13, 2017 at 7:30 PM, Mahendra Kariya <
> > > mahendra.kar...@go-jek.com
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > There is no missing data. But the INFO level logs are infinite and
> the
> > > > streams practically stops. For the messages that I posted, we got
> these
> > > > INFO logs for around 20 mins. After which we got an alert about no
> data
> > > > being produced in the sink topic and we had to restart the streams
> > > > processes.
> > > >
> > > >
> > > >
> > > > On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I just dug a little bit. The messages are logged at INFO level and
> > thus
> > > > > should not be a problem if they go away by themselves after some
> > time.
> > > > > Compare:
> > > > > https://groups.google.com/forum/#!topic/confluent-
> > platform/A14dkPlDlv4
> > > > >
> > > > > Do you still see missing data?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > > > > > Hi Matthias,
> > > > > >
> > > > > > We faced the issue again. The logs are below.
> > > > > >
> > > > > > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) dead
> > > > > for
> > > > > > group grp_id
> > > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) for
> > > > > group
> > > > > > grp_id.
> > > > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.
> AbstractCoordinator
> > -
> > > > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > > > <(214)%20748-3642> rack: null) dead
> > > > > for
> > > > > > group grp_id
> > > > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.
> AbstractCoordinator

Re: Debugging Kafka Streams Windowing

2017-05-15 Thread Mahendra Kariya
Thanks for the reply Guozhang! But I think we are talking of 2 different
issues here. KAFKA-5167 is for LockException. We face this issue
intermittently, but not a lot.

There is also another issue where a particular broker is marked as dead for
a group id and Streams process never recovers from this exception.

On Mon, May 15, 2017 at 11:28 PM, Guozhang Wang  wrote:

> I'm wondering if it is possibly due to KAFKA-5167? In that case, the "other
> thread" will keep retrying on grabbing the lock.
>
> Guozhang
>
>
> On Sat, May 13, 2017 at 7:30 PM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> > wrote:
>
> > Hi,
> >
> > There is no missing data. But the INFO level logs are infinite and the
> > streams practically stops. For the messages that I posted, we got these
> > INFO logs for around 20 mins. After which we got an alert about no data
> > being produced in the sink topic and we had to restart the streams
> > processes.
> >
> >
> >
> > On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax 
> > wrote:
> >
> > > Hi,
> > >
> > > I just dug a little bit. The messages are logged at INFO level and thus
> > > should not be a problem if they go away by themselves after some time.
> > > Compare:
> > > https://groups.google.com/forum/#!topic/confluent-platform/A14dkPlDlv4
> > >
> > > Do you still see missing data?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > > > Hi Matthias,
> > > >
> > > > We faced the issue again. The logs are below.
> > > >
> > > > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Marking the coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) dead
> > > for
> > > > group grp_id
> > > > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
> > > > Discovered coordinator broker-05:6667 (id: 2147483642
> > <(214)%20748-3642> rack: null) for
> > > group
> > > > grp_id.
> > > >
> > > >
> > > >
> > > > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> Great! Glad 0.10.2.1 fixes it for you!
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > > >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> > > >

Re: Order of punctuate() and process() in a stream processor

2017-05-14 Thread Mahendra Kariya
We use Kafka Streams for quite a few aggregation tasks. For instance,
counting the number of messages with a particular status in a 1-minute time
window.

We have noticed that whenever we restart a stream, we see a sudden spike in
the aggregated numbers. After a few minutes, things are back to normal.
Could the above discussion be the reason for this?

Please note that we use custom timestamp extractor.



On Fri, May 12, 2017 at 11:24 PM, Matthias J. Sax 
wrote:

> I added the feedback to https://issues.apache.org/jira/browse/KAFKA-3514
>
> -Matthias
>
>
> On 5/12/17 10:38 AM, Thomas Becker wrote:
> > Thanks. I think the system time based punctuation scheme we were
> discussing would not result in repeated punctuations like this, but even
> using stream time it seems a bit odd. If you do anything in a punctuate
> call that is relatively expensive it's especially bad.
> >
> > 
> > From: Matthias J. Sax [matth...@confluent.io]
> > Sent: Friday, May 12, 2017 1:18 PM
> > To: users@kafka.apache.org
> > Subject: Re: Order of punctuate() and process() in a stream processor
> >
> > Thanks for sharing.
> >
> > As punctuate is called with "streams time" you see the same time value
> > multiple times. It's again due to the coarse grained advance of "stream
> > time".
> >
> > @Thomas: I think, the way we handle it just simplifies the
> > implementation of punctuations. I don't see any other "advantage".
> >
> >
> > I will create a JIRA to track this -- we are currently working on some
> > improvements of punctuation and time management already, and it seems to
> > be another valuable improvement.
> >
> >
> > -Matthias
> >
> >
> > On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> >> Well, this is also a good question, because it is triggered with the
> same
> >> timestamp 3 times, so in order to create my update for both three
> seconds,
> >> I will have to count the number of punctuations and calculate the missed
> >> stream times for myself. It's ok for me to trigger it 3 times, but the
> >> timestamp should not be the same in each, but should be increased by the
> >> schedule time in each punctuate.
> >>
> >> - Sini
> >>
> >>
> >>
> >> From:   Thomas Becker 
> >> To: "users@kafka.apache.org" 
> >> Date:   2017/05/12 18:57
> >> Subject:RE: Order of punctuate() and process() in a stream
> >> processor
> >>
> >>
> >>
> >> I'm a bit troubled by the fact that it fires 3 times despite the stream
> >> time being advanced all at once; is there a scenario when this is
> >> beneficial?
> >>
> >> 
> >> From: Matthias J. Sax [matth...@confluent.io]
> >> Sent: Friday, May 12, 2017 12:38 PM
> >> To: users@kafka.apache.org
> >> Subject: Re: Order of punctuate() and process() in a stream processor
> >>
> >> Hi Peter,
> >>
> >> It's by design. Streams internally tracks time progress (so-called
> >> "streams time"). "streams time" get advanced *after* processing a
> record.
> >>
> >> Thus, in your case, "stream time" is still at its old value before it
> >> processed the first message of you send "burst". After that, "streams
> >> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
> >>
> >> I guess, we could change the design and include scheduled punctuations
> >> when advancing "streams time". But atm, we just don't do this.
> >>
> >> Does this make sense?
> >>
> >> Is this critical for your use case? Or do you just want to understand
> >> what's happening?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> >>> Hi,
> >>>
> >>>
> >>> Let's assume the following case.
> >>> - a stream processor that uses the Processor API
> >>> - context.schedule(1000) is called in the init()
> >>> - the processor reads only one topic that has one partition
> >>> - using custom timestamp extractor, but that timestamp is just a wall
> >>> clock time
> >>>
> >>>
> >>> Image the following events:
> >>> 1., for 10 seconds I send in 5 messages / second
> >>> 2., does not send any messages for 3 seconds
> >>> 3., starts the 5 messages / second again
> >>>
> >>> I see that punctuate() is not called during the 3 seconds when I do not
> >>> send any messages. This is ok according to the documentation, because
> >>> there is not any new messages to trigger the punctuate() call. When the
> >>> first few messages arrives after a restart the sending (point 3. above)
> >> I
> >>> see the following sequence of method calls:
> >>>
> >>> 1., process() on the 1st message
> >>> 2., punctuate() is called 3 times
> >>> 3., process() on the 2nd message
> >>> 4., process() on each following message
> >>>
> >>> What I would expect instead is that punctuate() is called first and
> then
> >>> process() is called on the messages, because the first message's
> >> timestamp
> >>> is already 3 seconds older then the last punctuate() was called, so the
> >>> first message belongs after the 3 punctuate() calls.
> >>>
> >>> Please let me kn

Re: Debugging Kafka Streams Windowing

2017-05-13 Thread Mahendra Kariya
Hi,

There is no missing data. But the INFO level logs are infinite and the
streams practically stops. For the messages that I posted, we got these
INFO logs for around 20 mins. After which we got an alert about no data
being produced in the sink topic and we had to restart the streams
processes.



On Sun, May 14, 2017 at 1:01 AM, Matthias J. Sax 
wrote:

> Hi,
>
> I just dug a little bit. The messages are logged at INFO level and thus
> should not be a problem if they go away by themselves after some time.
> Compare:
> https://groups.google.com/forum/#!topic/confluent-platform/A14dkPlDlv4
>
> Do you still see missing data?
>
>
> -Matthias
>
>
> On 5/11/17 2:39 AM, Mahendra Kariya wrote:
> > Hi Matthias,
> >
> > We faced the issue again. The logs are below.
> >
> > 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> > 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead
> for
> > group grp_id
> > 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for
> group
> > grp_id.
> >
> >
> >
> > On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax 
> > wrote:
> >
> >> Great! Glad 0.10.2.1 fixes it for you!
> >>
> >> -Matthias
> >>
> >> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> >>> Upgrading to 0.10.2.1 seems to have fixed the issue.
> >>>
> >>> Until now, we were looking at random 1 hour data to analyse the issue.
> >> Over
> >>> the weekend, we have written a simple test that will continuously check
> >> for
> >>> inconsistencies in real time and report if there is any issue.
> >>>
> >>> No issues have been reported for the last 24 hours. Will update this
> >> thread
> >>> if we find any issue.
> >>>
> >>> Thanks for all the support!
> >>>
> >>>
> >>>
> >>> On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax  >
> >>> wrote:
> >>>
> >>>> About
> >>>>
> >>>>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator
> -
> >>>>> Discovered coordinator broker-05:6667 for group group-2.
> >>>>
> >>>> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I
> would
> >>>> assume this issue is fixed, too. If not, please report back.
> >>>>
> >>>>> Another question that I have is, is there a way for us detect how
> many
> >>>>> messages have come out of order? And if possible, what is the delay?
> >>>>
> >>>> There is no metric or api for this. What you could do though is, to
> use
> >>>> #transform() that only forwards each record and as a side task,
> extracts
> >>>> the timestamp via `context#timestamp()` and does some book keeping to
> >>>> compute if out-of-order and what the delay was.
> >>>>
> >>>>
> >>>>>>>  - same for .mapValues()
> >>>>>>>
> >>>>>>
> >>>>>> I am not sure how to check this.
> >>>>
> >>>> The same way as you do for filter()?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Please find the answers below.
> >>>>>
> >>>>> I would recommend to double check the following:
> >>>>>>
> >>>>>>  - can you confirm that the filter does not remove all data for
> those
> >>>>>> time periods?
> >>>>>>
> >>>>>
> >>>>> Filter does not remove all data. There is a lot of data coming in
> even
> >>>>> after the filter stage.
> >>>>>
> >>>>>
> >>>>>>  - I would also check input for your AggregatorFunction() -- does it
> >>>>>> receive everything?
> >>>>>>
> >>>>>
> >>>>> Yes. Aggregate function seems to be receiving everything.
> >>>>>
> >>>>>
> >>>>>>  - same for .mapValues()
> >>>>>>
> >>>>>
> >>>>> I am not sure how to check this.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>


Re: Debugging Kafka Streams Windowing

2017-05-11 Thread Mahendra Kariya
Hi Matthias,

We faced the issue again. The logs are below.

16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.
16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for
group grp_id
16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group
grp_id.



On Tue, May 9, 2017 at 3:40 AM, Matthias J. Sax 
wrote:

> Great! Glad 0.10.2.1 fixes it for you!
>
> -Matthias
>
> On 5/7/17 8:57 PM, Mahendra Kariya wrote:
> > Upgrading to 0.10.2.1 seems to have fixed the issue.
> >
> > Until now, we were looking at random 1 hour data to analyse the issue.
> Over
> > the weekend, we have written a simple test that will continuously check
> for
> > inconsistencies in real time and report if there is any issue.
> >
> > No issues have been reported for the last 24 hours. Will update this
> thread
> > if we find any issue.
> >
> > Thanks for all the support!
> >
> >
> >
> > On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax 
> > wrote:
> >
> >> About
> >>
> >>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> >>> Discovered coordinator broker-05:6667 for group group-2.
> >>
> >> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would
> >> assume this issue is fixed, too. If not, please report back.
> >>
> >>> Another question that I have is, is there a way for us detect how many
> >>> messages have come out of order? And if possible, what is the delay?
> >>
> >> There is no metric or api for this. What you could do though is, to use
> >> #transform() that only forwards each record and as a side task, extracts
> >> the timestamp via `context#timestamp()` and does some book keeping to
> >> compute if out-of-order and what the delay was.
> >>
> >>
> >>>>>  - same for .mapValues()
> >>>>>
> >>>>
> >>>> I am not sure how to check this.
> >>
> >> The same way as you do for filter()?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> >>> Hi Matthias,
> >>>
> >>> Please find the answers below.
> >>>
> >>> I would recommend to double check the following:
> >>>>
> >>>>  - can you confirm that the filter does not remove all data for those
> >>>> time periods?
> >>>>
> >>>
> >>> Filter does not remove all data. There is a lot of data coming in even
> >>> after the filter stage.
> >>>
> >>>
> >>>>  - I would also check input for your AggregatorFunction() -- does it
> >>>> receive everything?
> >>>>
> >>>
> >>> Yes. Aggregate function seems to be receiving everything.
> >>>
> >>>
> >>>>  - same for .mapValues()
> >>>>
> >>>
> >>> I am not sure how to check this.
> >>>
> >>
> >>
> >
>
>


Re: Debugging Kafka Streams Windowing

2017-05-07 Thread Mahendra Kariya
Upgrading to 0.10.2.1 seems to have fixed the issue.

Until now, we were looking at random 1 hour data to analyse the issue. Over
the weekend, we have written a simple test that will continuously check for
inconsistencies in real time and report if there is any issue.

No issues have been reported for the last 24 hours. Will update this thread
if we find any issue.

Thanks for all the support!



On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax 
wrote:

> About
>
> > 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> > Discovered coordinator broker-05:6667 for group group-2.
>
> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would
> assume this issue is fixed, too. If not, please report back.
>
> > Another question that I have is, is there a way for us detect how many
> > messages have come out of order? And if possible, what is the delay?
>
> There is no metric or api for this. What you could do though is, to use
> #transform() that only forwards each record and as a side task, extracts
> the timestamp via `context#timestamp()` and does some book keeping to
> compute if out-of-order and what the delay was.
>
>
> >>>  - same for .mapValues()
> >>>
> >>
> >> I am not sure how to check this.
>
> The same way as you do for filter()?
>
>
> -Matthias
>
>
> On 5/4/17 10:29 AM, Mahendra Kariya wrote:
> > Hi Matthias,
> >
> > Please find the answers below.
> >
> > I would recommend to double check the following:
> >>
> >>  - can you confirm that the filter does not remove all data for those
> >> time periods?
> >>
> >
> > Filter does not remove all data. There is a lot of data coming in even
> > after the filter stage.
> >
> >
> >>  - I would also check input for your AggregatorFunction() -- does it
> >> receive everything?
> >>
> >
> > Yes. Aggregate function seems to be receiving everything.
> >
> >
> >>  - same for .mapValues()
> >>
> >
> > I am not sure how to check this.
> >
>
>


Re: Debugging Kafka Streams Windowing

2017-05-04 Thread Mahendra Kariya
Hi Matthias,

Please find the answers below.

I would recommend to double check the following:
>
>  - can you confirm that the filter does not remove all data for those
> time periods?
>

Filter does not remove all data. There is a lot of data coming in even
after the filter stage.


>  - I would also check input for your AggregatorFunction() -- does it
> receive everything?
>

Yes. Aggregate function seems to be receiving everything.


>  - same for .mapValues()
>

I am not sure how to check this.


Re: Debugging Kafka Streams Windowing

2017-05-03 Thread Mahendra Kariya
Another question that I have is, is there a way for us detect how many
messages have come out of order? And if possible, what is the delay?

On Thu, May 4, 2017 at 6:17 AM, Mahendra Kariya 
wrote:

> Hi Matthias,
>
> Sure we will look into this. In the meantime, we have run into another
> issue. We have started getting this error frequently rather frequently and
> the Streams app is unable to recover from this.
>
> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.494 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
> 07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
> Marking the coordinator broker-05:6667 dead for group group-2
> 07:44:08.694 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
> Discovered coordinator broker-05:6667 for group group-2.
>
>
> On Thu, May 4, 2017 at 4:35 AM, Matthias J. Sax 
> wrote:
>
>> I would recommend to double check the following:
>>
>>  - can you confirm that the filter does not remove all data for those
>> time periods?
>>  - I would also check input for your AggregatorFunction() -- does it
>> receive everything?
>>  - same for .mapValues()
>>
>> This would help to understand in what part of the program the data gets
>> lost.
>>
>>
>> -Matthias
>>
>>
>> On 5/2/17 11:09 PM, Mahendra Kariya wrote:
>> > Hi Garrett,
>> >
>> > Thanks for these insights. But we are not consuming old data. We want
>> the
>> > Streams app to run in near real time. And that is how it is actually
>> > running. The lag never increases beyond a certain limit. So I don't
>> think
>> > that's an issue.
>> >
>> > The values of the configs that you are mentioning are whatever Kafka
>> offers
>> > by default. So I guess that should be fine.
>> >
>> >
>> >
>> > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <
>> garrett.bar...@gmail.com>
>> > wrote:
>> >
>> >> Mahendra,
>> >>
>> >>  One possible thing I have seen that exhibits the same behavior of
>> missing
>> >> windows of data is the configuration of the topics (internal and your
>> own)
>> >> retention policies.  I was loading data that was fairly old (weeks) and
>> >> using event time semantics as the record timestamp (custom timestamp
>> >> extractor) and the cleanup stuff was deleting segments nearly right
>> after
>> >> they were written.  In my case default cleanup run was every 5
>> minutes, and
>> >> the default retention was 7 days, so every 5 minutes I lost data.  In
>> my
>> >> logs I saw a ton of warnings about 'offset not found' and kafka
>> skipping
>> >> ahead to whatever the next available offset was.  End result was gaps
>> all
>> >> over my data.  I don't have a good fix yet, I set the retention to
>> >> something massive which I think is getting me other problems.
>> >>
>> >> Maybe that helps?
>> >>
>> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
>> >> mahendra.kar...@go-jek.com>
>> >> wrote:
>> >>
>> >>> Hi Matthias,
>> >>>
>> >>> What we did was read the data from sink topic and print it to console.
>> >> And
>> >>> here's the raw data from that topic (the counts are randomized). As we
>> >> can
>> >>> see, the data is certainly missing for some time windows. For
>> instance,
>> >>> after 1493693760, the next timestamp for which the data is present
>> >>> is 1493694300. That's around 9 minutes of data missing.
>> >>>
>> >>> And this is just one instance. There are a lot of such instances in
>> this
>> >>> file.
>> >>>
>> >>>
>> >>>
>> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
>> >>> mahendra.kar...@go-jek.com> wrote:
&g

Re: Debugging Kafka Streams Windowing

2017-05-03 Thread Mahendra Kariya
Hi Matthias,

Sure we will look into this. In the meantime, we have run into another
issue. We have started getting this error frequently rather frequently and
the Streams app is unable to recover from this.

07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.494 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.694 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.


On Thu, May 4, 2017 at 4:35 AM, Matthias J. Sax 
wrote:

> I would recommend to double check the following:
>
>  - can you confirm that the filter does not remove all data for those
> time periods?
>  - I would also check input for your AggregatorFunction() -- does it
> receive everything?
>  - same for .mapValues()
>
> This would help to understand in what part of the program the data gets
> lost.
>
>
> -Matthias
>
>
> On 5/2/17 11:09 PM, Mahendra Kariya wrote:
> > Hi Garrett,
> >
> > Thanks for these insights. But we are not consuming old data. We want the
> > Streams app to run in near real time. And that is how it is actually
> > running. The lag never increases beyond a certain limit. So I don't think
> > that's an issue.
> >
> > The values of the configs that you are mentioning are whatever Kafka
> offers
> > by default. So I guess that should be fine.
> >
> >
> >
> > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton  >
> > wrote:
> >
> >> Mahendra,
> >>
> >>  One possible thing I have seen that exhibits the same behavior of
> missing
> >> windows of data is the configuration of the topics (internal and your
> own)
> >> retention policies.  I was loading data that was fairly old (weeks) and
> >> using event time semantics as the record timestamp (custom timestamp
> >> extractor) and the cleanup stuff was deleting segments nearly right
> after
> >> they were written.  In my case default cleanup run was every 5 minutes,
> and
> >> the default retention was 7 days, so every 5 minutes I lost data.  In my
> >> logs I saw a ton of warnings about 'offset not found' and kafka skipping
> >> ahead to whatever the next available offset was.  End result was gaps
> all
> >> over my data.  I don't have a good fix yet, I set the retention to
> >> something massive which I think is getting me other problems.
> >>
> >> Maybe that helps?
> >>
> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
> >> mahendra.kar...@go-jek.com>
> >> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> What we did was read the data from sink topic and print it to console.
> >> And
> >>> here's the raw data from that topic (the counts are randomized). As we
> >> can
> >>> see, the data is certainly missing for some time windows. For instance,
> >>> after 1493693760, the next timestamp for which the data is present
> >>> is 1493694300. That's around 9 minutes of data missing.
> >>>
> >>> And this is just one instance. There are a lot of such instances in
> this
> >>> file.
> >>>
> >>>
> >>>
> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
> >>> mahendra.kar...@go-jek.com> wrote:
> >>>
> >>>> Thanks for the update Matthias! And sorry for the delayed response.
> >>>>
> >>>> The reason we use .aggregate() is because we want to count the number
> of
> >>>> unique values for a particular field in the message. So, we just add
> >> that
> >>>> particular field's value in the HashSet and then take the size of the
> >>>> HashSet.
> >>>>
> >>>> On our side, we are also investigating and it looks like there might
> be
> >> a
> >>>> bug somewhere in our codebase. If that's the case, then it's quit

Re: Debugging Kafka Streams Windowing

2017-05-02 Thread Mahendra Kariya
Hi Garrett,

Thanks for these insights. But we are not consuming old data. We want the
Streams app to run in near real time. And that is how it is actually
running. The lag never increases beyond a certain limit. So I don't think
that's an issue.

The values of the configs that you are mentioning are whatever Kafka offers
by default. So I guess that should be fine.



On Tue, May 2, 2017 at 7:52 PM, Garrett Barton 
wrote:

> Mahendra,
>
>  One possible thing I have seen that exhibits the same behavior of missing
> windows of data is the configuration of the topics (internal and your own)
> retention policies.  I was loading data that was fairly old (weeks) and
> using event time semantics as the record timestamp (custom timestamp
> extractor) and the cleanup stuff was deleting segments nearly right after
> they were written.  In my case default cleanup run was every 5 minutes, and
> the default retention was 7 days, so every 5 minutes I lost data.  In my
> logs I saw a ton of warnings about 'offset not found' and kafka skipping
> ahead to whatever the next available offset was.  End result was gaps all
> over my data.  I don't have a good fix yet, I set the retention to
> something massive which I think is getting me other problems.
>
> Maybe that helps?
>
> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com>
> wrote:
>
> > Hi Matthias,
> >
> > What we did was read the data from sink topic and print it to console.
> And
> > here's the raw data from that topic (the counts are randomized). As we
> can
> > see, the data is certainly missing for some time windows. For instance,
> > after 1493693760, the next timestamp for which the data is present
> > is 1493694300. That's around 9 minutes of data missing.
> >
> > And this is just one instance. There are a lot of such instances in this
> > file.
> >
> >
> >
> > On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
> > mahendra.kar...@go-jek.com> wrote:
> >
> >> Thanks for the update Matthias! And sorry for the delayed response.
> >>
> >> The reason we use .aggregate() is because we want to count the number of
> >> unique values for a particular field in the message. So, we just add
> that
> >> particular field's value in the HashSet and then take the size of the
> >> HashSet.
> >>
> >> On our side, we are also investigating and it looks like there might be
> a
> >> bug somewhere in our codebase. If that's the case, then it's quite
> possible
> >> that there is no bug in Kafka Streams, except the metric one.
> >>
> >> We will revert after confirming.
> >>
> >>
> >>
> >>
> >> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
> matth...@confluent.io>
> >> wrote:
> >>
> >>> Just a follow up (we identified a bug in the "skipped records" metric).
> >>> The reported value is not correct.
> >>>
> >>>
> >>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
> >>> > Ok. That makes sense.
> >>> >
> >>> > Question: why do you use .aggregate() instead of .count() ?
> >>> >
> >>> > Also, can you share the code of you AggregatorFunction()? Did you
> >>> change
> >>> > any default setting of StreamsConfig?
> >>> >
> >>> > I have still no idea what could go wrong. Maybe you can run with log
> >>> > level TRACE? Maybe we can get some insight from those.
> >>> >
> >>> >
> >>> > -Matthias
> >>> >
> >>> > On 4/27/17 11:41 PM, Mahendra Kariya wrote:
> >>> >> Oh good point!
> >>> >>
> >>> >> The reason why there is only one row corresponding to each time
> >>> window is
> >>> >> because it only contains the latest value for the time window. So
> >>> what we
> >>> >> did was we just dumped the data present in the sink topic to a db
> >>> using an
> >>> >> upsert query. The primary key of the table was time window. The file
> >>> that I
> >>> >> attached is actually the data present in the DB. And we know that
> >>> there is
> >>> >> no bug in our db dump code because we have been using it for a long
> >>> time in
> >>> >> production without any issues.
> >>> >>
> >>> >> The reason the count is zero for so

Re: Debugging Kafka Streams Windowing

2017-05-02 Thread Mahendra Kariya
Hi Matthias,

What we did was read the data from sink topic and print it to console. And
here's the raw data from that topic (the counts are randomized). As we can
see, the data is certainly missing for some time windows. For instance,
after 1493693760, the next timestamp for which the data is present
is 1493694300. That's around 9 minutes of data missing.

And this is just one instance. There are a lot of such instances in this
file.



On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
mahendra.kar...@go-jek.com> wrote:

> Thanks for the update Matthias! And sorry for the delayed response.
>
> The reason we use .aggregate() is because we want to count the number of
> unique values for a particular field in the message. So, we just add that
> particular field's value in the HashSet and then take the size of the
> HashSet.
>
> On our side, we are also investigating and it looks like there might be a
> bug somewhere in our codebase. If that's the case, then it's quite possible
> that there is no bug in Kafka Streams, except the metric one.
>
> We will revert after confirming.
>
>
>
>
> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax 
> wrote:
>
>> Just a follow up (we identified a bug in the "skipped records" metric).
>> The reported value is not correct.
>>
>>
>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
>> > Ok. That makes sense.
>> >
>> > Question: why do you use .aggregate() instead of .count() ?
>> >
>> > Also, can you share the code of you AggregatorFunction()? Did you change
>> > any default setting of StreamsConfig?
>> >
>> > I have still no idea what could go wrong. Maybe you can run with log
>> > level TRACE? Maybe we can get some insight from those.
>> >
>> >
>> > -Matthias
>> >
>> > On 4/27/17 11:41 PM, Mahendra Kariya wrote:
>> >> Oh good point!
>> >>
>> >> The reason why there is only one row corresponding to each time window
>> is
>> >> because it only contains the latest value for the time window. So what
>> we
>> >> did was we just dumped the data present in the sink topic to a db
>> using an
>> >> upsert query. The primary key of the table was time window. The file
>> that I
>> >> attached is actually the data present in the DB. And we know that
>> there is
>> >> no bug in our db dump code because we have been using it for a long
>> time in
>> >> production without any issues.
>> >>
>> >> The reason the count is zero for some time windows is because I
>> subtracted
>> >> a random number the actual values and rounded it off to zero; for
>> privacy
>> >> reason. The actual data doesn't have any zero values. I should have
>> >> mentioned this earlier. My bad!
>> >>
>> >> The stream topology code looks something like this.
>> >>
>> >> stream
>> >> .filter()
>> >> .map((key, value) -> new KeyValue<>(transform(key), value)
>> >> .groupByKey()
>> >> .aggregate(HashSet::new, AggregatorFunction(),
>> >> TimeWindows.of(6).until(360))
>> >> .mapValues(HashSet::size)
>> >> .toStream()
>> >> .map((key, value) -> convertToProtobufObject(key, value))
>> >> .to()
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> >> wrote:
>> >>
>> >>> Thanks for the details (sorry that I forgot that you did share the
>> >>> output already).
>> >>>
>> >>> Might be a dumb question, but what is the count for missing windows in
>> >>> your seconds implementation?
>> >>>
>> >>> If there is no data for a window, it should not emit a window with
>> count
>> >>> zero, but nothing.
>> >>>
>> >>> Thus, looking at your output, I am wondering how it could contain line
>> >>> like:
>> >>>
>> >>>> 2017-04-27T04:53:00 0
>> >>>
>> >>> I am also wondering why your output only contains a single value per
>> >>> window. As Streams outputs multiple updates per window while the count
>> >>> is increasing, you should actually see multiple records per window.
>> >>>
>> >>> Your code is like 

Re: Debugging Kafka Streams Windowing

2017-04-29 Thread Mahendra Kariya
Thanks for the update Matthias! And sorry for the delayed response.

The reason we use .aggregate() is because we want to count the number of
unique values for a particular field in the message. So, we just add that
particular field's value in the HashSet and then take the size of the
HashSet.

On our side, we are also investigating and it looks like there might be a
bug somewhere in our codebase. If that's the case, then it's quite possible
that there is no bug in Kafka Streams, except the metric one.

We will revert after confirming.




On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax 
wrote:

> Just a follow up (we identified a bug in the "skipped records" metric).
> The reported value is not correct.
>
>
> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
> > Ok. That makes sense.
> >
> > Question: why do you use .aggregate() instead of .count() ?
> >
> > Also, can you share the code of you AggregatorFunction()? Did you change
> > any default setting of StreamsConfig?
> >
> > I have still no idea what could go wrong. Maybe you can run with log
> > level TRACE? Maybe we can get some insight from those.
> >
> >
> > -Matthias
> >
> > On 4/27/17 11:41 PM, Mahendra Kariya wrote:
> >> Oh good point!
> >>
> >> The reason why there is only one row corresponding to each time window
> is
> >> because it only contains the latest value for the time window. So what
> we
> >> did was we just dumped the data present in the sink topic to a db using
> an
> >> upsert query. The primary key of the table was time window. The file
> that I
> >> attached is actually the data present in the DB. And we know that there
> is
> >> no bug in our db dump code because we have been using it for a long
> time in
> >> production without any issues.
> >>
> >> The reason the count is zero for some time windows is because I
> subtracted
> >> a random number the actual values and rounded it off to zero; for
> privacy
> >> reason. The actual data doesn't have any zero values. I should have
> >> mentioned this earlier. My bad!
> >>
> >> The stream topology code looks something like this.
> >>
> >> stream
> >> .filter()
> >> .map((key, value) -> new KeyValue<>(transform(key), value)
> >> .groupByKey()
> >> .aggregate(HashSet::new, AggregatorFunction(),
> >> TimeWindows.of(6).until(360))
> >> .mapValues(HashSet::size)
> >> .toStream()
> >> .map((key, value) -> convertToProtobufObject(key, value))
> >> .to()
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax  >
> >> wrote:
> >>
> >>> Thanks for the details (sorry that I forgot that you did share the
> >>> output already).
> >>>
> >>> Might be a dumb question, but what is the count for missing windows in
> >>> your seconds implementation?
> >>>
> >>> If there is no data for a window, it should not emit a window with
> count
> >>> zero, but nothing.
> >>>
> >>> Thus, looking at your output, I am wondering how it could contain line
> >>> like:
> >>>
> >>>> 2017-04-27T04:53:00 0
> >>>
> >>> I am also wondering why your output only contains a single value per
> >>> window. As Streams outputs multiple updates per window while the count
> >>> is increasing, you should actually see multiple records per window.
> >>>
> >>> Your code is like this:
> >>>
> >>> stream.filter().groupByKey().count(TimeWindow.of(6)).to();
> >>>
> >>> Or do you have something more complex?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
> >>>>> Can you somehow verify your output?
> >>>>
> >>>>
> >>>> Do you mean the Kafka streams output? In the Kafka Streams output, we
> do
> >>>> see some missing values. I have attached the Kafka Streams output
> (for a
> >>>> few hours) in the very first email of this thread for reference.
> >>>>
> >>>> Let me also summarise what we have done so far.
> >>>>
> >>>> We took a dump of the raw data present in the source topic. We wrote a
> >>>> script to read this data and do the exact same aggregations that we do
> >>>> using Kafka Streams. And then we compared the output from Kafka
> Streams
> >>> and
> >>>> our script.
> >>>>
> >>>> The difference that we observed in the two outputs is that there were
> a
> >>> few
> >>>> rows (corresponding to some time windows) missing in the Streams
> output.
> >>>> For the time windows for which the data was present, the aggregated
> >>> numbers
> >>>> matched exactly.
> >>>>
> >>>> This means, either all the records for a particular time window are
> being
> >>>> skipped, or none. Now this is highly unlikely to happen. Maybe there
> is a
> >>>> bug somewhere in the rocksdb state stores? Just a speculation, not
> sure
> >>>> though. And there could even be a bug in the reported metric.
> >>>>
> >>>
> >>>
> >>
> >
>
>


Re: Debugging Kafka Streams Windowing

2017-04-27 Thread Mahendra Kariya
Oh good point!

The reason why there is only one row corresponding to each time window is
because it only contains the latest value for the time window. So what we
did was we just dumped the data present in the sink topic to a db using an
upsert query. The primary key of the table was time window. The file that I
attached is actually the data present in the DB. And we know that there is
no bug in our db dump code because we have been using it for a long time in
production without any issues.

The reason the count is zero for some time windows is because I subtracted
a random number the actual values and rounded it off to zero; for privacy
reason. The actual data doesn't have any zero values. I should have
mentioned this earlier. My bad!

The stream topology code looks something like this.

stream
.filter()
.map((key, value) -> new KeyValue<>(transform(key), value)
.groupByKey()
.aggregate(HashSet::new, AggregatorFunction(),
TimeWindows.of(6).until(360))
.mapValues(HashSet::size)
.toStream()
.map((key, value) -> convertToProtobufObject(key, value))
.to()






On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax 
wrote:

> Thanks for the details (sorry that I forgot that you did share the
> output already).
>
> Might be a dumb question, but what is the count for missing windows in
> your seconds implementation?
>
> If there is no data for a window, it should not emit a window with count
> zero, but nothing.
>
> Thus, looking at your output, I am wondering how it could contain line
> like:
>
> > 2017-04-27T04:53:00 0
>
> I am also wondering why your output only contains a single value per
> window. As Streams outputs multiple updates per window while the count
> is increasing, you should actually see multiple records per window.
>
> Your code is like this:
>
> stream.filter().groupByKey().count(TimeWindow.of(6)).to();
>
> Or do you have something more complex?
>
>
> -Matthias
>
>
> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
> >> Can you somehow verify your output?
> >
> >
> > Do you mean the Kafka streams output? In the Kafka Streams output, we do
> > see some missing values. I have attached the Kafka Streams output (for a
> > few hours) in the very first email of this thread for reference.
> >
> > Let me also summarise what we have done so far.
> >
> > We took a dump of the raw data present in the source topic. We wrote a
> > script to read this data and do the exact same aggregations that we do
> > using Kafka Streams. And then we compared the output from Kafka Streams
> and
> > our script.
> >
> > The difference that we observed in the two outputs is that there were a
> few
> > rows (corresponding to some time windows) missing in the Streams output.
> > For the time windows for which the data was present, the aggregated
> numbers
> > matched exactly.
> >
> > This means, either all the records for a particular time window are being
> > skipped, or none. Now this is highly unlikely to happen. Maybe there is a
> > bug somewhere in the rocksdb state stores? Just a speculation, not sure
> > though. And there could even be a bug in the reported metric.
> >
>
>


Re: Debugging Kafka Streams Windowing

2017-04-27 Thread Mahendra Kariya
> Can you somehow verify your output?


Do you mean the Kafka streams output? In the Kafka Streams output, we do
see some missing values. I have attached the Kafka Streams output (for a
few hours) in the very first email of this thread for reference.

Let me also summarise what we have done so far.

We took a dump of the raw data present in the source topic. We wrote a
script to read this data and do the exact same aggregations that we do
using Kafka Streams. And then we compared the output from Kafka Streams and
our script.

The difference that we observed in the two outputs is that there were a few
rows (corresponding to some time windows) missing in the Streams output.
For the time windows for which the data was present, the aggregated numbers
matched exactly.

This means, either all the records for a particular time window are being
skipped, or none. Now this is highly unlikely to happen. Maybe there is a
bug somewhere in the rocksdb state stores? Just a speculation, not sure
though. And there could even be a bug in the reported metric.


Re: Debugging Kafka Streams Windowing

2017-04-27 Thread Mahendra Kariya
Hi Matthias,

We changed our timestamp extractor code to this.

public long extract(ConsumerRecord record, long
previousTimestamp) {
Message message = (Message) record.value();
long timeInMillis = Timestamps.toMillis(message.getEventTimestamp());

if (timeInMillis < 0) {
LOGGER.error("Negative timestamp: {}", timeInMillis);
}

return timeInMillis;
}


We don't see any errors in the logs. However, the skipped-records-rate has
not come down.




On Fri, Apr 28, 2017 at 5:17 AM, Matthias J. Sax 
wrote:

> Streams skips records with timestamp -1
>
> The metric you mentioned, reports the number of skipped record.
>
> Are you sure that `getEventTimestamp()` never returns -1 ?
>
>
>
> -Matthias
>
> On 4/27/17 10:33 AM, Mahendra Kariya wrote:
> > Hey Eno,
> >
> > We are using a custom TimeStampExtractor class. All messages that we have
> > in Kafka has a timestamp field. That is what we are using.
> > The code looks like this.
> >
> > public long extract(ConsumerRecord record, long
> > previousTimestamp) {
> > Message message = (Message) record.value();
> > return Timestamps.toMillis(message.getEventTimestamp());
> > }
> >
> >
> >
> >
> >
> > On Fri, Apr 28, 2017 at 12:40 AM, Eno Thereska 
> > wrote:
> >
> >> Hi Mahendra,
> >>
> >> We are currently looking at the skipped-records-rate metric as part of
> >> https://issues.apache.org/jira/browse/KAFKA-5055 <
> >> https://issues.apache.org/jira/browse/KAFKA-5055>. Could you let us
> know
> >> if you use any special TimeStampExtractor class, or if it is the
> default?
> >>
> >> Thanks
> >> Eno
> >>> On 27 Apr 2017, at 13:46, Mahendra Kariya 
> >> wrote:
> >>>
> >>> Hey All,
> >>>
> >>> We have a Kafka Streams application which ingests from a topic to which
> >> more than 15K messages are generated per second. The app filters a few
> of
> >> them, counts the number of unique filtered messages (based on one
> >> particular field) within a 1 min time window, and dumps it back to
> Kafka.
> >>>
> >>> The issue that we are facing is that for certain minutes, there is no
> >> data in the sink topic. I have attached the data from 03:30AM to 10:00
> AM
> >> today morning with this mail. And if you notice closely, the data for
> quite
> >> a few minutes is missing.
> >>>
> >>> One thing that we have noticed is that the skipped-records-rate metrics
> >> emitted by Kafka is around 200 for each thread. By the way, what does
> >> metric indicate? Does this represent the filtered out messages?
> >>>
> >>> We have checked the raw data in the source topic and didn't find any
> >> discrepancy.
> >>>
> >>> We even checked the logs on the stream app boxes and the only errors we
> >> found were GC errors.
> >>>
> >>>
> >>> Other relevant info:
> >>>
> >>> Kafka version: 0.10.2.0
> >>> Number of partitions for source topic: 50
> >>> Stream App cluster: 5 machines with 10 threads each
> >>>
> >>>  How do we debug this? What could be the cause?
> >>>
> >>>
> >>>
> >>> 
> >>
> >>
> >
>
>


Re: Debugging Kafka Streams Windowing

2017-04-27 Thread Mahendra Kariya
Hey Eno,

We are using a custom TimeStampExtractor class. All messages that we have
in Kafka has a timestamp field. That is what we are using.
The code looks like this.

public long extract(ConsumerRecord record, long
previousTimestamp) {
Message message = (Message) record.value();
return Timestamps.toMillis(message.getEventTimestamp());
}





On Fri, Apr 28, 2017 at 12:40 AM, Eno Thereska 
wrote:

> Hi Mahendra,
>
> We are currently looking at the skipped-records-rate metric as part of
> https://issues.apache.org/jira/browse/KAFKA-5055 <
> https://issues.apache.org/jira/browse/KAFKA-5055>. Could you let us know
> if you use any special TimeStampExtractor class, or if it is the default?
>
> Thanks
> Eno
> > On 27 Apr 2017, at 13:46, Mahendra Kariya 
> wrote:
> >
> > Hey All,
> >
> > We have a Kafka Streams application which ingests from a topic to which
> more than 15K messages are generated per second. The app filters a few of
> them, counts the number of unique filtered messages (based on one
> particular field) within a 1 min time window, and dumps it back to Kafka.
> >
> > The issue that we are facing is that for certain minutes, there is no
> data in the sink topic. I have attached the data from 03:30AM to 10:00 AM
> today morning with this mail. And if you notice closely, the data for quite
> a few minutes is missing.
> >
> > One thing that we have noticed is that the skipped-records-rate metrics
> emitted by Kafka is around 200 for each thread. By the way, what does
> metric indicate? Does this represent the filtered out messages?
> >
> > We have checked the raw data in the source topic and didn't find any
> discrepancy.
> >
> > We even checked the logs on the stream app boxes and the only errors we
> found were GC errors.
> >
> >
> > Other relevant info:
> >
> > Kafka version: 0.10.2.0
> > Number of partitions for source topic: 50
> > Stream App cluster: 5 machines with 10 threads each
> >
> >  How do we debug this? What could be the cause?
> >
> >
> >
> > 
>
>


Debugging Kafka Streams Windowing

2017-04-27 Thread Mahendra Kariya
Hey All,

We have a Kafka Streams application which ingests from a topic to which
more than 15K messages are generated per second. The app filters a few of
them, counts the number of unique filtered messages (based on one
particular field) within a 1 min time window, and dumps it back to Kafka.

The issue that we are facing is that for certain minutes, there is no data
in the sink topic. I have attached the data from 03:30AM to 10:00 AM today
morning with this mail. And if you notice closely, the data for quite a few
minutes is missing.

One thing that we have noticed is that the skipped-records-rate metrics
emitted by Kafka is around 200 for each thread. By the way, what does
metric indicate? Does this represent the filtered out messages?

We have checked the raw data in the source topic and didn't find any
discrepancy.

We even checked the logs on the stream app boxes and the only errors we
found were GC errors.


Other relevant info:

Kafka version: 0.10.2.0
Number of partitions for source topic: 50
Stream App cluster: 5 machines with 10 threads each

 How do we debug this? What could be the cause?
2017-04-27T03:30:00 1
2017-04-27T03:36:00 2
2017-04-27T03:50:00 16
2017-04-27T04:03:00 9
2017-04-27T04:12:00 3
2017-04-27T04:33:00 2
2017-04-27T04:38:00 3
2017-04-27T04:44:00 2
2017-04-27T04:50:00 3
2017-04-27T04:53:00 0
2017-04-27T05:02:00 3
2017-04-27T05:10:00 0
2017-04-27T05:14:00 4
2017-04-27T05:18:00 1
2017-04-27T05:20:00 3
2017-04-27T05:22:00 13
2017-04-27T05:24:00 7
2017-04-27T05:25:00 6
2017-04-27T05:26:00 2
2017-04-27T05:30:00 10
2017-04-27T05:32:00 5
2017-04-27T05:34:00 8
2017-04-27T05:36:00 9
2017-04-27T05:37:00 1
2017-04-27T05:38:00 17
2017-04-27T05:39:00 0
2017-04-27T05:40:00 6
2017-04-27T05:44:00 1
2017-04-27T05:45:00 8
2017-04-27T05:48:00 9
2017-04-27T05:49:00 24
2017-04-27T05:52:00 1
2017-04-27T05:53:00 22
2017-04-27T05:54:00 24
2017-04-27T05:55:00 20
2017-04-27T05:56:00 11
2017-04-27T05:57:00 1
2017-04-27T05:58:00 2
2017-04-27T05:59:00 10
2017-04-27T06:01:00 6
2017-04-27T06:07:00 4
2017-04-27T06:09:00 6
2017-04-27T06:10:00 23
2017-04-27T06:12:00 38
2017-04-27T06:13:00 9
2017-04-27T06:14:00 4
2017-04-27T06:15:00 34
2017-04-27T06:16:00 25
2017-04-27T06:17:00 11
2017-04-27T06:18:00 19
2017-04-27T06:19:00 32
2017-04-27T06:21:00 68
2017-04-27T06:22:00 55
2017-04-27T06:25:00 57
2017-04-27T06:26:00 38
2017-04-27T06:27:00 65
2017-04-27T06:28:00 18
2017-04-27T06:29:00 38
2017-04-27T06:30:00 66
2017-04-27T06:31:00 64
2017-04-27T06:32:00 30
2017-04-27T06:35:00 46
2017-04-27T06:36:00 6
2017-04-27T06:38:00 43
2017-04-27T06:39:00 19
2017-04-27T06:40:00 30
2017-04-27T06:41:00 23
2017-04-27T06:42:00 82
2017-04-27T06:43:00 27
2017-04-27T06:44:00 20
2017-04-27T06:45:00 19
2017-04-27T06:46:00 45
2017-04-27T06:48:00 27
2017-04-27T06:49:00 4
2017-04-27T06:50:00 67
2017-04-27T06:51:00 111
2017-04-27T06:52:00 90
2017-04-27T06:53:00 95
2017-04-27T06:55:00 83
2017-04-27T06:57:00 51
2017-04-27T06:58:00 110
2017-04-27T06:59:00 45
2017-04-27T07:01:00 10
2017-04-27T07:02:00 119
2017-04-27T07:03:00 4
2017-04-27T07:04:00 63
2017-04-27T07:06:00 75
2017-04-27T07:07:00 63
2017-04-27T07:08:00 30
2017-04-27T07:09:00 36
2017-04-27T07:10:00 94
2017-04-27T07:11:00 95
2017-04-27T07:12:00 62
2017-04-27T07:13:00 57
2017-04-27T07:14:00 59
2017-04-27T07:15:00 20
2017-04-27T07:16:00 56
2017-04-27T07:17:00 85
2017-04-27T07:18:00 106
2017-04-27T07:19:00 71
2017-04-27T07:20:00 111
2017-04-27T07:21:00 23
2017-04-27T07:22:00 2
2017-04-27T07:23:00 103
2017-04-27T07:24:00 26
2017-04-27T07:25:00 74
2017-04-27T07:26:00 126
2017-04-27T07:27:00 57
2017-04-27T07:28:00 61
2017-04-27T07:29:00 69
2017-04-27T07:30:00 59
2017-04-27T07:31:00 95
2017-04-27T07:32:00 26
2017-04-27T07:33:00 43
2017-04-27T07:34:00 61
2017-04-27T07:35:00 62
2017-04-27T07:36:00 106
2017-04-27T07:37:00 80
2017-04-27T07:38:00 90
2017-04-27T07:39:00 118
2017-04-27T07:40:00 123
2017-04-27T07:41:00 59
2017-04-27T07:42:00 73
2017-04-27T07:43:00 66
2017-04-27T07:44:00 120
2017-04-27T07:45:00 50
2017-04-27T07:46:00 23
2017-04-27T07:47:00 64
2017-04-27T07:48:00 40
2017-04-27T07:49:00 28
2017-04-27T07:50:00 36
2017-04-27T07:51:00 90
2017-04-27T07:52:00 21
2017-04-27T07:53:00 48
2017-04-27T07:54:00 64
2017-04-27T07:56:00 186
2017-04-27T07:57:00 154
2017-04-27T07:58:00 152
2017-04-27T07:59:00 79
2017-04-27T08:00:00 186
2017-04-27T08:01:00 192
2017-04-27T08:02:00 11
2017-04-27T08:03:00 74
2017-04-27T08:04:00 200
2017-04-27T08:05:00 100
2017-04-27T08:06:00 48
2017-04-27T08:07:00 179
2017-04-27T08:08:00 28
2017-04-27T08:09:00 25
2017-04-27T08:10:00 203
2017-04-27T08:11:00 216
2017-04-27T08:12:00 122
2017-04-27T08:13:00 122
2017-04-27T08:14:00 59
2017-04-27T08:15:00 89
2017-04-27T08:16:00 123
2017-04-27T08:17:00 19
2017-04-27T08:18:00 17
2017-04-27T08:19:00 10
2017-04-27T08:20:00 117
2017-04-27T08:21:00 193
2017-04-27T08:22:00 76
2017-04-27T08:23:00 93
2017-04-27T08:24:00 52
2017-04-27T08:25:00 51
2017-04-27T08:26:00 188
2017-04-27T08:27:00 143
2017-04-27T08:28:00 0

Re: how can I contribute to this project?

2017-04-19 Thread Mahendra Kariya
Hi James,

This page has all the information you are looking for.
https://kafka.apache.org/contributing

On Thu, Apr 20, 2017 at 9:32 AM, James Chain 
wrote:

> Hi
> Because I love this project, so I want to take part of it. But I'm brand
> new to opensource project.
>
> How can I get started to make contribution? Can you give me some advise or
> something?
>
> By the way, I already have JIRA account which called "james.c"
>
> Sincerely,
>James.C
>


Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-18 Thread Mahendra Kariya
Yeah. Quite possible. Completely missed this possibility. What I simply did
was to download and add the kafka-streams jar as a dependency. I didn't
update the downstream dependencies. My bad!

On Tue, Apr 18, 2017 at 7:42 PM, Eno Thereska 
wrote:

> Hi Mahendra,
>
> I see the java.lang.NoSuchMethodError: org.apache.kafka.clients... error.
> Looks like some jars aren't in the classpath?
>
> Eno
>
> > On 18 Apr 2017, at 12:46, Mahendra Kariya 
> wrote:
> >
> > Hey Eno,
> >
> > I just pulled the latest jar from the link you shared and tried to run my
> > code. I am getting the following exception on new KafkaStreams(). The
> same
> > code is working fine with 0.10.2.0 jar.
> >
> >
> > Exception in thread "main" org.apache.kafka.common.KafkaException:
> Failed
> > to construct kafka consumer
> >at org.apache.kafka.clients.consumer.KafkaConsumer.(
> > KafkaConsumer.java:717)
> >at org.apache.kafka.clients.consumer.KafkaConsumer.(
> > KafkaConsumer.java:566)
> >at org.apache.kafka.streams.processor.internals.
> > DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.
> java:38)
> >at org.apache.kafka.streams.processor.internals.
> StreamThread.(
> > StreamThread.java:316)
> >at org.apache.kafka.streams.KafkaStreams.(
> > KafkaStreams.java:358)
> >at org.apache.kafka.streams.KafkaStreams.(
> > KafkaStreams.java:279)
> > Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
> > Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
> >at org.apache.kafka.streams.processor.internals.
> > StreamsKafkaClient.(StreamsKafkaClient.java:98)
> >at org.apache.kafka.streams.processor.internals.
> > StreamsKafkaClient.(StreamsKafkaClient.java:82)
> >at org.apache.kafka.streams.processor.internals.
> > StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
> >at org.apache.kafka.common.config.AbstractConfig.
> > getConfiguredInstances(AbstractConfig.java:254)
> >at org.apache.kafka.common.config.AbstractConfig.
> > getConfiguredInstances(AbstractConfig.java:220)
> >at org.apache.kafka.clients.consumer.KafkaConsumer.(
> > KafkaConsumer.java:673)
> >... 6 more
> >
> >
> >
> > On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> >> wrote:
> >
> >> Thanks!
> >>
> >> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska 
> >> wrote:
> >>
> >>> The RC candidate build is here: http://home.apache.org/~
> >>> gwenshap/kafka-0.10.2.1-rc1/ <http://home.apache.org/~
> >>> gwenshap/kafka-0.10.2.1-rc1/>
> >>>
> >>> Eno
> >>>> On 17 Apr 2017, at 17:20, Mahendra Kariya  >
> >>> wrote:
> >>>>
> >>>> Thanks!
> >>>>
> >>>> In the meantime, is the jar published somewhere on github or as a part
> >>> of
> >>>> build pipeline?
> >>>>
> >>>> On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska  >
> >>>> wrote:
> >>>>
> >>>>> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
> >>> week.
> >>>>>
> >>>>> Eno
> >>>>>> On 17 Apr 2017, at 13:25, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> >>>>
> >>>>> wrote:
> >>>>>>
> >>>>>> Are the bug fix releases published to Maven central repo?
> >>>>>>
> >>>>>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <
> eno.there...@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Sachin,
> >>>>>>>
> >>>>>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
> >>>>>>> max.poll.interval to infinite since from our experience with
> streams
> >>>>> this
> >>>>>>> should not be something that users set: https://github.com/apache/
> >>>>>>> kafka/pull/2770/files <https://github.com/apache/
> >>> kafka/pull/2770/files
> >>>>>> .
> >>>>>>>
> >>>>>>> We're in the process of documenting that change. For now you can
> >>>>> increase
> >>>>>>> the request time

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-18 Thread Mahendra Kariya
Hey Eno,

I just pulled the latest jar from the link you shared and tried to run my
code. I am getting the following exception on new KafkaStreams(). The same
code is working fine with 0.10.2.0 jar.


Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:717)
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:566)
at org.apache.kafka.streams.processor.internals.
DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
at org.apache.kafka.streams.processor.internals.StreamThread.(
StreamThread.java:316)
at org.apache.kafka.streams.KafkaStreams.(
KafkaStreams.java:358)
at org.apache.kafka.streams.KafkaStreams.(
KafkaStreams.java:279)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
at org.apache.kafka.streams.processor.internals.
StreamsKafkaClient.(StreamsKafkaClient.java:98)
at org.apache.kafka.streams.processor.internals.
StreamsKafkaClient.(StreamsKafkaClient.java:82)
at org.apache.kafka.streams.processor.internals.
StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
at org.apache.kafka.common.config.AbstractConfig.
getConfiguredInstances(AbstractConfig.java:254)
at org.apache.kafka.common.config.AbstractConfig.
getConfiguredInstances(AbstractConfig.java:220)
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:673)
... 6 more



On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya  wrote:

> Thanks!
>
> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska 
> wrote:
>
>> The RC candidate build is here: http://home.apache.org/~
>> gwenshap/kafka-0.10.2.1-rc1/ <http://home.apache.org/~
>> gwenshap/kafka-0.10.2.1-rc1/>
>>
>> Eno
>> > On 17 Apr 2017, at 17:20, Mahendra Kariya 
>> wrote:
>> >
>> > Thanks!
>> >
>> > In the meantime, is the jar published somewhere on github or as a part
>> of
>> > build pipeline?
>> >
>> > On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska 
>> > wrote:
>> >
>> >> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
>> week.
>> >>
>> >> Eno
>> >>> On 17 Apr 2017, at 13:25, Mahendra Kariya > >
>> >> wrote:
>> >>>
>> >>> Are the bug fix releases published to Maven central repo?
>> >>>
>> >>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska > >
>> >>> wrote:
>> >>>
>> >>>> Hi Sachin,
>> >>>>
>> >>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
>> >>>> max.poll.interval to infinite since from our experience with streams
>> >> this
>> >>>> should not be something that users set: https://github.com/apache/
>> >>>> kafka/pull/2770/files <https://github.com/apache/
>> kafka/pull/2770/files
>> >>> .
>> >>>>
>> >>>> We're in the process of documenting that change. For now you can
>> >> increase
>> >>>> the request timeout without worrying about max.poll.interval
>> anymore. In
>> >>>> fact I'd suggest you also increase max.poll.interval as we've done it
>> >> above.
>> >>>>
>> >>>> Thanks
>> >>>> Eno
>> >>>>
>> >>>>> On 1 Apr 2017, at 03:28, Sachin Mittal  wrote:
>> >>>>>
>> >>>>> Should this timeout be less than max poll interval value? if yes
>> than
>> >>>>> generally speaking what should be the ratio between two or range for
>> >> this
>> >>>>> timeout value .
>> >>>>>
>> >>>>> Thanks
>> >>>>> Sachin
>> >>>>>
>> >>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" 
>> wrote:
>> >>>>>
>> >>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>> >>>>>
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>>
>> >>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
>> >>>>>> Hi,
>> >>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
>> >>>>> Integer.MA

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-17 Thread Mahendra Kariya
Thanks!

On Tue, Apr 18, 2017, 12:26 AM Eno Thereska  wrote:

> The RC candidate build is here:
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/ <
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/>
>
> Eno
> > On 17 Apr 2017, at 17:20, Mahendra Kariya 
> wrote:
> >
> > Thanks!
> >
> > In the meantime, is the jar published somewhere on github or as a part of
> > build pipeline?
> >
> > On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska 
> > wrote:
> >
> >> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
> week.
> >>
> >> Eno
> >>> On 17 Apr 2017, at 13:25, Mahendra Kariya 
> >> wrote:
> >>>
> >>> Are the bug fix releases published to Maven central repo?
> >>>
> >>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska 
> >>> wrote:
> >>>
> >>>> Hi Sachin,
> >>>>
> >>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
> >>>> max.poll.interval to infinite since from our experience with streams
> >> this
> >>>> should not be something that users set: https://github.com/apache/
> >>>> kafka/pull/2770/files <
> https://github.com/apache/kafka/pull/2770/files
> >>> .
> >>>>
> >>>> We're in the process of documenting that change. For now you can
> >> increase
> >>>> the request timeout without worrying about max.poll.interval anymore.
> In
> >>>> fact I'd suggest you also increase max.poll.interval as we've done it
> >> above.
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>
> >>>>> On 1 Apr 2017, at 03:28, Sachin Mittal  wrote:
> >>>>>
> >>>>> Should this timeout be less than max poll interval value? if yes than
> >>>>> generally speaking what should be the ratio between two or range for
> >> this
> >>>>> timeout value .
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" 
> wrote:
> >>>>>
> >>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
> >>>>>> Hi,
> >>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
> >>>>> Integer.MAX_VALUE
> >>>>>> and the NotLeaderForPartitionException is gone.
> >>>>>>
> >>>>>> However we see a new exception especially under heavy load:
> >>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> >> exception
> >>>>>> caught when producing
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >> RecordCollectorImpl.flush(
> >>>>> RecordCollectorImpl.java:127)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamTask$1.run(StreamTask.
> >>>>> java:76)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.commit(StreamTask.
> >>>>> java:280)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> >>>>> StreamThread.java:787)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-17 Thread Mahendra Kariya
Thanks!

In the meantime, is the jar published somewhere on github or as a part of
build pipeline?

On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska 
wrote:

> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this week.
>
> Eno
> > On 17 Apr 2017, at 13:25, Mahendra Kariya 
> wrote:
> >
> > Are the bug fix releases published to Maven central repo?
> >
> > On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Sachin,
> >>
> >> In the bug fix release for 0.10.2 (and in trunk) we have now set
> >> max.poll.interval to infinite since from our experience with streams
> this
> >> should not be something that users set: https://github.com/apache/
> >> kafka/pull/2770/files <https://github.com/apache/kafka/pull/2770/files
> >.
> >>
> >> We're in the process of documenting that change. For now you can
> increase
> >> the request timeout without worrying about max.poll.interval anymore. In
> >> fact I'd suggest you also increase max.poll.interval as we've done it
> above.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 1 Apr 2017, at 03:28, Sachin Mittal  wrote:
> >>>
> >>> Should this timeout be less than max poll interval value? if yes than
> >>> generally speaking what should be the ratio between two or range for
> this
> >>> timeout value .
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> On 1 Apr 2017 04:57, "Matthias J. Sax"  wrote:
> >>>
> >>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
> >>>> Hi,
> >>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
> >>> Integer.MAX_VALUE
> >>>> and the NotLeaderForPartitionException is gone.
> >>>>
> >>>> However we see a new exception especially under heavy load:
> >>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> exception
> >>>> caught when producing
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>> checkForException(RecordCollectorImpl.java:119)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> >>> RecordCollectorImpl.java:127)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >>>> org.apache.kafka.streams.processor.internals.
> >> StreamTask$1.run(StreamTask.
> >>> java:76)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.commit(StreamTask.
> >>> java:280)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> >>> StreamThread.java:787)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> >>> StreamThread.java:774)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >>>> org.apache.kafka.streams.processor.internals.
> StreamThread.maybeCommit(
> >>> StreamThread.java:749)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>> StreamThread.java:671)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >>>> org.apache.kafka.streams.processor.internals.
> >>> StreamThread.run(StreamThread.java:378)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> >> for
> >>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
> >>> append
> >>>>
> >>>> So any idea as why TimeoutException is happening.
> >>>> 

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

2017-04-17 Thread Mahendra Kariya
Are the bug fix releases published to Maven central repo?

On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska 
wrote:

> Hi Sachin,
>
> In the bug fix release for 0.10.2 (and in trunk) we have now set
> max.poll.interval to infinite since from our experience with streams this
> should not be something that users set: https://github.com/apache/
> kafka/pull/2770/files .
>
> We're in the process of documenting that change. For now you can increase
> the request timeout without worrying about max.poll.interval anymore. In
> fact I'd suggest you also increase max.poll.interval as we've done it above.
>
> Thanks
> Eno
>
> > On 1 Apr 2017, at 03:28, Sachin Mittal  wrote:
> >
> > Should this timeout be less than max poll interval value? if yes than
> > generally speaking what should be the ratio between two or range for this
> > timeout value .
> >
> > Thanks
> > Sachin
> >
> > On 1 Apr 2017 04:57, "Matthias J. Sax"  wrote:
> >
> > Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >
> >
> > -Matthias
> >
> >
> > On 3/31/17 11:32 AM, Sachin Mittal wrote:
> >> Hi,
> >> So I have added the config ProducerConfig.RETRIES_CONFIG,
> > Integer.MAX_VALUE
> >> and the NotLeaderForPartitionException is gone.
> >>
> >> However we see a new exception especially under heavy load:
> >> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
> >> caught when producing
> >>  at
> >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:119)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:127)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >> org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.
> > java:76)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > java:280)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:787)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:774)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:749)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:671)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]at
> >> org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> for
> >> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
> > append
> >>
> >> So any idea as why TimeoutException is happening.
> >> Is this controlled by
> >> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>
> >> If yes
> >> What should the value be set in this given that out consumer
> >> max.poll.interval.ms is defaul 5 minutes.
> >>
> >> Is there any other setting that we should try to avoid such errors which
> >> causes stream thread to die.
> >>
> >> Thanks
> >> Sachin
> >>
> >>
> >> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska 
> >> wrote:
> >>
> >>> Hi Sachin,
> >>>
> >>> Not in this case.
> >>>
> >>> Thanks
> >>> Eno
> >>>
>  On Mar 25, 2017, at 6:19 PM, Sachin Mittal 
> wrote:
> 
>  OK.
>  I will try this out.
> 
>  Do I need to change anything for
>  max.in.flight.requests.per.connection
> 
>  Thanks
>  Sachin
> 
> 
>  On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
> eno.there...@gmail.com>
>  wrote:
> 
> > Hi Sachin,
> >
> > For this particular error, “org.apache.kafka.common.errors.
> > NotLeaderForPartitionException: This server is not the leader for
> that
> > topic-partition.”, could you try setting the number of retries to
> >>> something
> > large like this:
> >
> > Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
> > ...
> > props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
> >
> > This will retry the produce requests and should hopefully solve your
> > immediate problem.
> >
> > Thanks
> > Eno
> >
> >
> > On 25/03/2017, 08:35, "Sachin Mittal"  wrote:
> >
> >   Hi,
> >   We have encountered another case of series of errors which I would
> >>> need
> >   more help in understanding.

Re: auto.offset.reset for Kafka streams 0.10.2.0

2017-04-11 Thread Mahendra Kariya
Thanks for the clarification Matthais / Michael!

+1 to clear documentation around this because as far as I remember, default
for normal consumers is "latest" and since Streams internally use normal
consumers, the first intuition is that it will be "latest" for Streams as
well.


Best,
Mahendra



On Tue, Apr 11, 2017 at 12:31 PM, Michael Noll  wrote:

> It's also documented at
> http://docs.confluent.io/current/streams/developer-guide.html#non-streams-
> configuration-parameters
> .
>
> FYI: We have already begun syncing the Confluent docs for Streams into the
> Apache Kafka docs for Streams, but there's still quite some work left
> (volunteers are welcome :-P).
>
> -Michael
>
>
> On Tue, Apr 11, 2017 at 8:37 AM, Matthias J. Sax 
> wrote:
>
> > Default for Streams is "earliest"
> >
> > cf.
> > https://github.com/apache/kafka/blob/0.10.2.0/streams/
> > src/main/java/org/apache/kafka/streams/StreamsConfig.java#L405
> >
> >
> > -Matthias
> >
> > On 4/10/17 9:41 PM, Mahendra Kariya wrote:
> > > This was even my assumption. But I had to explicitly specify
> > > auto.offset.reset=latest. Without this config, it started from
> > "earliest"!
> > >
> > > On Tue, Apr 11, 2017 at 10:07 AM, Sachin Mittal 
> > wrote:
> > >
> > >> As far as I know default is latest, if no offsets are found. Otherwise
> > it
> > >> starts from the offset.
> > >>
> > >>
> > >> On Tue, Apr 11, 2017 at 8:51 AM, Mahendra Kariya <
> > >> mahendra.kar...@go-jek.com
> > >>> wrote:
> > >>
> > >>> Hey All,
> > >>>
> > >>> Is the auto offset reset set to "earliest" by default in Kafka
> streams
> > >>> 0.10.2.0? I thought default was "latest".
> > >>>
> > >>> I started a new Kafka streams application with a fresh application id
> > and
> > >>> it started consuming messages from the beginning.
> > >>>
> > >>
> > >
> >
> >
>


Re: auto.offset.reset for Kafka streams 0.10.2.0

2017-04-10 Thread Mahendra Kariya
This was even my assumption. But I had to explicitly specify
auto.offset.reset=latest. Without this config, it started from "earliest"!

On Tue, Apr 11, 2017 at 10:07 AM, Sachin Mittal  wrote:

> As far as I know default is latest, if no offsets are found. Otherwise it
> starts from the offset.
>
>
> On Tue, Apr 11, 2017 at 8:51 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> > wrote:
>
> > Hey All,
> >
> > Is the auto offset reset set to "earliest" by default in Kafka streams
> > 0.10.2.0? I thought default was "latest".
> >
> > I started a new Kafka streams application with a fresh application id and
> > it started consuming messages from the beginning.
> >
>


auto.offset.reset for Kafka streams 0.10.2.0

2017-04-10 Thread Mahendra Kariya
Hey All,

Is the auto offset reset set to "earliest" by default in Kafka streams
0.10.2.0? I thought default was "latest".

I started a new Kafka streams application with a fresh application id and
it started consuming messages from the beginning.


Re: Capacity planning for Kafka Streams

2017-03-22 Thread Mahendra Kariya
Hi Damian,

The rest of the logs were INFO messages about offset being committed.

Anyways, the problem is resolved for now, after we increased the
max.poll.interval.ms.

For anyone else who is facing similar problem, please refer this thread.
https://groups.google.com/forum/#!topic/confluent-platform/wgCSuwIJo5g



On Wed, Mar 22, 2017 at 7:11 PM, Damian Guy  wrote:

> Hi Mahendra,
>
> Are you able to share the complete logs? It is pretty hard to tell what is
> happening just from a few snippets of information.
>
> Thanks,
> Damian
>
> On Wed, 22 Mar 2017 at 12:16 Mahendra Kariya 
> wrote:
>
> > To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the
> > latest version and used mirror maker to replicate the data from the
> > 0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created
> > Kafka cluster.
> >
> > We have 5 nodes, each running the streaming app with 10 threads. In less
> > than 10 minutes, the process on all the 5 nodes died with different
> > exceptions. Below are the different stack traces we got.
> >
> > Any help would be really appreciated.
> >
> > *Stacktrace # 1 (got on 3 of 5 nodes):*
> >
> > 18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread -
> stream-thread
> > [StreamThread-2] Stream thread shutdown complete
> > 18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> > state transition from RUNNING to NOT_RUNNING
> > Exception in thread "StreamThread-2"
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=1_396, processor=KSTREAM-SOURCE-04,
> > topic=topicname, partition=396, offset=66839
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
> > store %s has closed
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore$
> RocksDbIterator.hasNext(RocksDBStore.java:398)
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore$
> RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
> > at
> >
> > org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.
> hasNext(WindowStoreKeySchema.java:30)
> > at
> >
> > org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(
> SegmentIterator.java:69)
> > at
> >
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$
> MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.
> java:131)
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
> TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
> > at
> >
> > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStore
> Iterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74)
> > at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$
> KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> > ... 2 more
> >
> >
> > *Stacktrace # 2 (got on 1 node):*
> >
> > 18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread -
> stream-thread
> > [StreamThread-2] Stream thread shutdown complete
> > 18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> > state transition from ASSIGNING_PARTITIONS to NOT_RUNNING
> > Exception in thread "StreamThread-2"
> > org.apache.kafka.streams.err

Re: Capacity planning for Kafka Streams

2017-03-22 Thread Mahendra Kariya
als.StreamTask.(StreamTask.java:141)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
... 1 more

*Stacktrace # 3 (got on 1 node):*

19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not
create task 0_192. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_192] Failed to lock
the state directory: /tmp/kafka-streams/streams_test_2/0_192
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
~[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
~[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
~[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
~[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[app-name-1.2.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[app-name-1.2.1.jar:na]



On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya  wrote:

> Thanks for the heads up Guozhang!
>
> The problem is our brokers are on 0.10.0.x. So we will have to upgrade
> them.
>
> On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang 
> wrote:
>
>> Hi Mahendra,
>>
>> Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
>> require you to upgrade brokers to 0.10.2 as well. Since we have added a
>> new
>> feature since 0.10.2 to allow newer versioned clients (producer, consumer,
>> streams) to talk to older versioned brokers, and for Streams specifically
>> it only requires brokers to be no older than 0.10.1.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
>> mahendra.kar...@go-jek.com
>> > wrote:
>>
>> > We are planning to migrate to the newer version of Kafka. But that's a
>> few
&g

Re: Consumers not rebalancing after replication

2017-03-21 Thread Mahendra Kariya
I would like to add one more thing. The logs on the consumers looks fine.
We see no errors whatsoever. The log level is set INFO. What are we missing
here? Is there some other config that we need to set so that we at least
see some errors for this in the logs?

On Wed, Mar 22, 2017 at 9:14 AM, Mahendra Kariya  wrote:

> Hey All,
>
> We have six consumers in a consumer group. At times, some of the
> partitions are under replicated for a while (maybe, 2 mis). During this
> time, the consumers subscribed to such partitions stops getting data from
> Kafka and they become inactive for a while. But when the partitions are
> fully replicated again, only the active consumers gets rebalanced. The
> inactive ones continue to remain inactive.
>
> What could be causing this? How to fix this issue?
>
>
>
>


Consumers not rebalancing after replication

2017-03-21 Thread Mahendra Kariya
Hey All,

We have six consumers in a consumer group. At times, some of the partitions
are under replicated for a while (maybe, 2 mis). During this time, the
consumers subscribed to such partitions stops getting data from Kafka and
they become inactive for a while. But when the partitions are fully
replicated again, only the active consumers gets rebalanced. The inactive
ones continue to remain inactive.

What could be causing this? How to fix this issue?


Re: Kafka Streams: lockException

2017-03-20 Thread Mahendra Kariya
We did some more analysis on why the disk utilisation is continuously
increasing. Turns out it's the RocksDB WAL that's utilising most of the
disk space. The LOG.old WAL files are not getting deleted. Ideally they
should have been. RocksDB provides certain configuration for purging WAL
files
<https://github.com/facebook/rocksdb/wiki/basic-operations#purging-wal-files>.
But I am not sure how to set these configs. Any help would be really
appreciated. Just for reference, our Kafka brokers are on v0.10.0.1 and
RocksDB version is 4.8.0.




On Mon, Mar 20, 2017 at 12:29 PM, Mahendra Kariya <
mahendra.kar...@go-jek.com> wrote:

> Hey Guozhang,
>
> Thanks a lot for these insights. We are facing the exact same problem as
> Tianji. Our commit frequency is also quite high. We flush almost around 16K
> messages per minute to Kafka at the end of the topology.
>
> Another issue that we are facing is that rocksdb is not deleting old data.
> We have set the time window retention duration to 1 hour, but the disk size
> is constantly increasing. Ideally, the disk utilisation graph should
> plateau after some time.
>
> We would like to apply the config change suggestions that you have given.
> But we are on Kafka 0.10.0.1. And from the docs, it seems
> rocksdb.config.setter is not available for this version. Is there any
> other way for us to configure rocksdb?
>
> Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
> and set up alerts on?
>
>
> Thanks!
>
>
>


Re: Kafka Streams: lockException

2017-03-20 Thread Mahendra Kariya
Hey Guozhang,

Thanks a lot for these insights. We are facing the exact same problem as
Tianji. Our commit frequency is also quite high. We flush almost around 16K
messages per minute to Kafka at the end of the topology.

Another issue that we are facing is that rocksdb is not deleting old data.
We have set the time window retention duration to 1 hour, but the disk size
is constantly increasing. Ideally, the disk utilisation graph should
plateau after some time.

We would like to apply the config change suggestions that you have given.
But we are on Kafka 0.10.0.1. And from the docs, it seems
rocksdb.config.setter is not available for this version. Is there any other
way for us to configure rocksdb?

Does Kafka 0.10.0.1 emit any rocksdb related metrics that we can monitor
and set up alerts on?


Thanks!


Re: Capacity planning for Kafka Streams

2017-03-17 Thread Mahendra Kariya
Thanks for the heads up Guozhang!

The problem is our brokers are on 0.10.0.x. So we will have to upgrade them.

On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang  wrote:

> Hi Mahendra,
>
> Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
> require you to upgrade brokers to 0.10.2 as well. Since we have added a new
> feature since 0.10.2 to allow newer versioned clients (producer, consumer,
> streams) to talk to older versioned brokers, and for Streams specifically
> it only requires brokers to be no older than 0.10.1.
>
>
> Guozhang
>
>
> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> > wrote:
>
> > We are planning to migrate to the newer version of Kafka. But that's a
> few
> > weeks away.
> >
> > We will try setting the socket config and see how it turns out.
> >
> > Thanks a lot for your response!
> >
> >
> >
> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska 
> > wrote:
> >
> > > Thanks,
> > >
> > > A couple of things:
> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
> > several
> > > improvements were made in the last two releases that make rebalancing
> and
> > > performance better.
> > >
> > > - When running on environments with large latency on AWS at least
> > (haven’t
> > > tried Google cloud), one parameter we have found useful to increase
> > > performance is the receive and send socket size for the consumer and
> > > producer in streams. We’d recommend setting them to 1MB like this
> (where
> > > “props” is your own properties object when you start streams):
> > >
> > > // the socket buffer needs to be large, especially when running in AWS
> > with
> > > // high latency. if running locally the default is fine.
> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> > >
> > > Make sure the OS allows the larger socket size too.
> > >
> > > Thanks
> > > Eno
> > >
> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> > mahendra.kar...@go-jek.com>
> > > wrote:
> > > >
> > > > Hi Eno,
> > > >
> > > > Please find my answers inline.
> > > >
> > > >
> > > > We are in the process of documenting capacity planning for streams,
> > stay
> > > tuned.
> > > >
> > > > This would be great! Looking forward to it.
> > > >
> > > > Could you send some more info on your problem? What Kafka version are
> > > you using?
> > > >
> > > > We are using Kafka 0.10.0.0.
> > > >
> > > > Are the VMs on the same or different hosts?
> > > >
> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and one
> is
> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> > > >
> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what
> > > metric are you looking at?
> > > >
> > > > We are looking at Kafka Manager for the time being. By fluctuating, I
> > > mean the lag is few thousands at one time, we refresh it the next
> second,
> > > it is in few lakhs, and again refresh it and it is few thousands. I
> > > understand this may not be very accurate. We will soon have more
> accurate
> > > data once we start pushing the consumer lag metric to Datadog.
> > > >
> > > > But on a separate note, the difference between lags on different
> > > partitions is way too high. I have attached a tab separated file
> herewith
> > > which shows the consumer lag (from Kafka Manager) for the first the 50
> > > partitions. As is clear, the lag on partition 2 is 530 while the lag on
> > > partition 18 is 23K. Note that the same VM is pulling data from both
> the
> > > partitions.
> > > >
> > > >
> > > >
> > > >
> > > > 
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Capacity planning for Kafka Streams

2017-03-13 Thread Mahendra Kariya
We are planning to migrate to the newer version of Kafka. But that's a few
weeks away.

We will try setting the socket config and see how it turns out.

Thanks a lot for your response!



On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska 
wrote:

> Thanks,
>
> A couple of things:
> - I’d recommend moving to 0.10.2 (latest release) if you can since several
> improvements were made in the last two releases that make rebalancing and
> performance better.
>
> - When running on environments with large latency on AWS at least (haven’t
> tried Google cloud), one parameter we have found useful to increase
> performance is the receive and send socket size for the consumer and
> producer in streams. We’d recommend setting them to 1MB like this (where
> “props” is your own properties object when you start streams):
>
> // the socket buffer needs to be large, especially when running in AWS with
> // high latency. if running locally the default is fine.
> props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
>
> Make sure the OS allows the larger socket size too.
>
> Thanks
> Eno
>
> > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya 
> wrote:
> >
> > Hi Eno,
> >
> > Please find my answers inline.
> >
> >
> > We are in the process of documenting capacity planning for streams, stay
> tuned.
> >
> > This would be great! Looking forward to it.
> >
> > Could you send some more info on your problem? What Kafka version are
> you using?
> >
> > We are using Kafka 0.10.0.0.
> >
> > Are the VMs on the same or different hosts?
> >
> > The VMs are on Google Cloud. Two of them are in asia-east1-a and one is
> in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> >
> > Also what exactly do you mean by “the lag keeps fluctuating”, what
> metric are you looking at?
> >
> > We are looking at Kafka Manager for the time being. By fluctuating, I
> mean the lag is few thousands at one time, we refresh it the next second,
> it is in few lakhs, and again refresh it and it is few thousands. I
> understand this may not be very accurate. We will soon have more accurate
> data once we start pushing the consumer lag metric to Datadog.
> >
> > But on a separate note, the difference between lags on different
> partitions is way too high. I have attached a tab separated file herewith
> which shows the consumer lag (from Kafka Manager) for the first the 50
> partitions. As is clear, the lag on partition 2 is 530 while the lag on
> partition 18 is 23K. Note that the same VM is pulling data from both the
> partitions.
> >
> >
> >
> >
> > 
>
>


Re: Capacity planning for Kafka Streams

2017-03-13 Thread Mahendra Kariya
Hi Eno,

Please find my answers inline.


We are in the process of documenting capacity planning for streams, stay
> tuned.
>

This would be great! Looking forward to it.

Could you send some more info on your problem? What Kafka version are you
> using?
>

We are using Kafka 0.10.0.0.


> Are the VMs on the same or different hosts?
>

The VMs are on Google Cloud. Two of them are in asia-east1-a and one is in
asia-east1-c. All three are n1-standard-4 Ubuntu instances.


> Also what exactly do you mean by “the lag keeps fluctuating”, what metric
> are you looking at?
>

We are looking at Kafka Manager for the time being. By fluctuating, I mean
the lag is few thousands at one time, we refresh it the next second, it is
in few lakhs, and again refresh it and it is few thousands. I understand
this may not be very accurate. We will soon have more accurate data once we
start pushing the consumer lag metric to Datadog.

But on a separate note, the difference between lags on different partitions
is way too high. I have attached a tab separated file herewith which shows
the consumer lag (from Kafka Manager) for the first the 50 partitions. As
is clear, the lag on partition 2 is 530 while the lag on partition 18 is
23K. Note that the same VM is pulling data from both the partitions.

PartitionLogSizeConsumer OffsetLagConsumer Instance Owner
0129679486129678929557aggregator_1-1-StreamThread-2-consumer-aaa:/10.x.y.125
11297291861297251634023aggregator_1-1-StreamThread-5-consumer-bbb:/10.x.y.123
2129602164129601634530aggregator_1-1-StreamThread-1-consumer-ccc:/10.x.y.124
3130425020130424459561aggregator_1-1-StreamThread-8-consumer-ddd:/10.x.y.125
4131598849131598265584aggregator_1-1-StreamThread-4-consumer-eee:/10.x.y.124
5130755630130755081549aggregator_1-1-StreamThread-5-consumer-fff:/10.x.y.125
6128054693128054104589aggregator_1-1-StreamThread-1-consumer-ggg:/10.x.y.125
7131354755131354171584aggregator_1-1-StreamThread-6-consumer-hhh:/10.x.y.125
8129797007129796461546aggregator_1-1-StreamThread-10-consumer-iii:/10.x.y.125
9129087167129086580587aggregator_1-1-StreamThread-5-consumer-jjj:/10.x.y.124
10128365714128365102612aggregator_1-1-StreamThread-3-consumer-kkk:/10.x.y.125
11132069213132068751462aggregator_1-1-StreamThread-4-consumer-lll:/10.x.y.123
12130412684130412095589aggregator_1-1-StreamThread-3-consumer-mmm:/10.x.y.124
13132199972132199443529aggregator_1-1-StreamThread-2-consumer-nnn:/10.x.y.124
14129481438129480844594aggregator_1-1-StreamThread-7-consumer-ooo:/10.x.y.124
15130105939130105341598aggregator_1-1-StreamThread-2-consumer-aaa:/10.x.y.125
16131973373131972814559aggregator_1-1-StreamThread-8-consumer-ddd:/10.x.y.125
171300840321300828751157aggregator_1-1-StreamThread-8-consumer-ppp:/10.x.y.123
1813022073713019757623161aggregator_1-1-StreamThread-6-consumer-qqq:/10.x.y.124
19129962307129961682625aggregator_1-1-StreamThread-8-consumer-rrr:/10.x.y.124
20131692478131691927551aggregator_1-1-StreamThread-1-consumer-ggg:/10.x.y.125
21129638851129638323528aggregator_1-1-StreamThread-6-consumer-hhh:/10.x.y.125
22130918801130918241560aggregator_1-1-StreamThread-4-consumer-eee:/10.x.y.124
23131338127131337572555aggregator_1-1-StreamThread-10-consumer-iii:/10.x.y.125
24130390257130389704553aggregator_1-1-StreamThread-3-consumer-kkk:/10.x.y.125
25131383888131383468420aggregator_1-1-StreamThread-9-consumer-sss:/10.x.y.123
26127540041127539487554aggregator_1-1-StreamThread-3-consumer-mmm:/10.x.y.124
27130159933130159385548aggregator_1-1-StreamThread-2-consumer-aaa:/10.x.y.125
28129010992129010397595aggregator_1-1-StreamThread-8-consumer-ddd:/10.x.y.125
29128511277128510723554aggregator_1-1-StreamThread-7-consumer-ooo:/10.x.y.124
30131262395131261954441aggregator_1-1-StreamThread-9-consumer-sss:/10.x.y.125
31129162405129161881524aggregator_1-1-StreamThread-9-consumer-ttt:/10.x.y.124
32129693104129692591513aggregator_1-1-StreamThread-1-consumer-ggg:/10.x.y.125
33131763991131763433558aggregator_1-1-StreamThread-10-consumer-iii:/10.x.y.125
3413095456713093132623241aggregator_1-1-StreamThread-6-consumer-qqq:/10.x.y.124
35132248088132247595493aggregator_1-1-StreamThread-3-consumer-kkk:/10.x.y.125
361298931331298920191114aggregator_1-1-StreamThread-8-consumer-ppp:/10.x.y.123
37130264671130264251420aggregator_1-1-StreamThread-7-consumer-uuu:/10.x.y.125
38131077963131077526437aggregator_1-1-StreamThread-3-consumer-vvv:/10.x.y.123
39132299025132298434591aggregator_1-1-StreamThread-5-consumer-jjj:/10.x.y.124
40130839786130839203583aggregator_1-1-StreamThread-4-consumer-www:/10.x.y.125
41130605454130605019435aggregator_1-1-StreamThread-2-consumer-xxx:/10.x.y.123
42128768310128767757553aggregator_1-1-StreamThread-2-consumer-aaa:/10.x.y.125
43130863269130862681588aggregator_1-1-StreamThread-8-consumer-ddd:/10.x.y.125
44128897764128897205559aggregator_1-1-StreamThread-2-consumer-nnn:/10.x.y.124
45129363920129363346574aggregator_1-1-StreamThread-7-consumer-ooo:/10.x.y.124
46130108718130108198520aggregator_1-1-StreamThread-5-consum

Capacity planning for Kafka Streams

2017-03-12 Thread Mahendra Kariya
Hey All,

Are there some guidelines / documentation around capacity planning for
Kafka streams?

We have a Streams application which consumes messages from a topic with 400
partitions. At peak time, there are around 20K messages coming into that
topic per second. The Streams app consumes these messages to do some
aggregations.

Currently we have 3 VMs, each of them running 10 threads. The lag keeps
fluctuating between a few tens to a few lakhs. We have also noticed that
lag on the partitions being consumed on one particular machine is way
higher than the other two machines.

Has anybody faced similar issues? How did you guys resolve it?


Re: Stream topology with multiple Kaka clusters

2017-02-27 Thread Mahendra Kariya
Thanks a lot Eno!

On Mon, Feb 27, 2017 at 7:26 PM, Eno Thereska 
wrote:

> Hi Mahendra,
>
> The short answer is "not yet", but see this link for more:
> https://groups.google.com/forum/?pli=1#!msg/confluent-
> platform/LC88ijQaEMM/sa96OfK9AgAJ;context-place=forum/confluent-platform
>
> Thanks
> Eno
> > On 27 Feb 2017, at 13:37, Mahendra Kariya 
> wrote:
> >
> > Hi,
> >
> > I have a couple of questions regarding Kafka streams.
> >
> > 1. Can we merge two streams from two different Kafka clusters?
> > 2. Can my sink topic be in Kafka cluster different from source topic?
> >
> > Thanks!
>
>


Stream topology with multiple Kaka clusters

2017-02-27 Thread Mahendra Kariya
Hi,

I have a couple of questions regarding Kafka streams.

1. Can we merge two streams from two different Kafka clusters?
2. Can my sink topic be in Kafka cluster different from source topic?

Thanks!


Re: Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Mahendra Kariya
+1 for such a tool. It would be of great help in a lot of use cases.

On Thu, Feb 23, 2017 at 11:44 PM, Matthias J. Sax 
wrote:

> \cc from dev
>
>
>  Forwarded Message 
> Subject: Re: KIP-122: Add a tool to Reset Consumer Group Offsets
> Date: Thu, 23 Feb 2017 10:13:39 -0800
> From: Matthias J. Sax 
> Organization: Confluent Inc
> To: d...@kafka.apache.org
>
> So you suggest to merge "scope options" --topics, --topic, and
> --partitions into a single option? Sound good to me.
>
> I like the compact way to express it, ie, topicname:list-of-partitions
> with "all partitions" if not partitions are specified. It's quite
> intuitive to use.
>
> Just wondering, if we could get rid of the repeated --topic option; it's
> somewhat verbose. Have no good idea though who to improve it.
>
> If you concatenate multiple topic, we need one more character that is
> not allowed in topic names to separate the topics:
>
> > invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*',
> '?', ' ', '\t', '\r', '\n', '='};
>
> maybe
>
> --topics t1=1,2,3:t2:t3=3
>
> use '=' to specify partitions (instead of ':' as you proposed) and ':'
> to separate topics? All other characters seem to be worse to use to me.
> But maybe you have a better idea.
>
>
>
> -Matthias
>
>
> On 2/23/17 3:15 AM, Jorge Esteban Quilcate Otoya wrote:
> > @Matthias about the point 9:
> >
> > What about keeping only the --topic option, and support this format:
> >
> > `--topic t1:0,1,2 --topic t2 --topic t3:2`
> >
> > In this case topics t1, t2, and t3 will be selected: topic t1 with
> > partitions 0,1 and 2; topic t2 with all its partitions; and topic t3,
> with
> > only partition 2.
> >
> > Jorge.
> >
> > El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
> > quilcate.jo...@gmail.com>) escribió:
> >
> >> Thanks for the feedback Matthias.
> >>
> >> * 1. You're right. I'll reorder the scenarios.
> >>
> >> * 2. Agree. I'll update the KIP.
> >>
> >> * 3. I like it, updating to `reset-offsets`
> >>
> >> * 4. Agree, removing the `reset-` part
> >>
> >> * 5. Yes, 1.e option without --execute or --export will print out
> current
> >> offset, and the new offset, that will be the same. The use-case of this
> >> option is to use it in combination with --export mostly and have a
> current
> >> 'checkpoint' to reset later. I will add to the KIP how the output should
> >> looks like.
> >>
> >> * 6. Considering 4., I will update it to `--to-offset`
> >>
> >> * 7. I like the idea to unify these options (plus, minus).
> >> `shift-offsets-by` is a good option, but I will like some more feedback
> >> here about the name. I will update the KIP in the meantime.
> >>
> >> * 8. Yes, discussed in 9.
> >>
> >> * 9. Agree. I'll love some feedback here. `topic` is already used by
> >> `delete`, and we can add `--all-topics` to consider all
> topics/partitions
> >> assigned to a group. How could we define specific topics/partitions?
> >>
> >> * 10. Haven't thought about it, but make sense.
> >> ,, would be enough.
> >>
> >> * 11. Agree. Solved with 10.
> >>
> >> Also, I have a couple of changes to mention:
> >>
> >> 1. I have add a reference to the branch where I'm working on this KIP.
> >>
> >> 2. About the period scenario `--to-period`. I will change it to
> >> `--to-duration` given that duration (
> >> https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
> >> follows this format: 'PnDTnHnMnS' and does not consider daylight saving
> >> efects.
> >>
> >>
> >>
> >> El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax (<
> matth...@confluent.io>)
> >> escribió:
> >>
> >> Hi,
> >>
> >> thanks for updating the KIP. Couple of follow up comments:
> >>
> >> * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
> >> time" option -- IMHO it belongs to "reset by position"?
> >>
> >>
> >> * Nit: Description of "Reset to Earliest"
> >>
> >>> using Kafka Consumer's `auto.offset.reset` to `earliest`
> >>
> >> I think this is strictly speaking not correct (as auto.offset.reset only
> >> triggered if no valid offset is found, but this tool explicitly modified
> >> committed offset), and should be phrased as
> >>
> >>> using Kafka Consumer's #seekToBeginning()
> >>
> >> -> similar issue for description of "Reset to Latest"
> >>
> >>
> >> * Main option: rename to --reset-offsets (plural instead of singular)
> >>
> >>
> >> * Scenario Options: I would remove "reset" from all options, because the
> >> main argument "--reset-offset" says already what to do:
> >>
> >>> bin/kafka-consumer-groups.sh --reset-offset --reset-to-datetime XXX
> >>
> >> better (IMHO):
> >>
> >>> bin/kafka-consumer-groups.sh --reset-offsets --to-datetime XXX
> >>
> >>
> >>
> >> * Option 1.e ("print and export current offset") is not intuitive to use
> >> IMHO. The main option is "--reset-offset" but nothing happens if no
> >> scenario is specified. It is also not specified, what the output should
> >> look like?
> >>
> >> Furthermore, --describe should actually show cu

Re: Consumer / Streams causes deletes in __consumer_offsets?

2017-02-22 Thread Mahendra Kariya
Thanks!

On Thu, Feb 23, 2017 at 10:49 AM, Guozhang Wang  wrote:

> Mahendra,
>
> That is right, what I meant is that at the end of each loop in the thread,
> it will check against the commit internal and see if it should do so. That
> means, commit will only happen after any records have been completely
> processed in the topology, and that also means that the actual commit
> internal may be a bit longer than the configured value in practice.
>
>
> Guozhang
>
>
>
> On Wed, Feb 22, 2017 at 8:15 PM, Mahendra Kariya <
> mahendra.kar...@go-jek.com
> > wrote:
>
> > Hi Guozhang,
> >
> > On Thu, Feb 23, 2017 at 2:48 AM, Guozhang Wang 
> wrote:
> >
> > > With that even if you do
> > > not have any data processed the commit operation will be triggered
> after
> > > that configured period of time.
> > >
> >
> > The above statement is confusing. As per this thread <
> http://goo.gl/yczQpy
> > >,
> > offsets are only committed once they have been fully processed by the
> > topology and not when the timeout expires!
> >
>
>
>
> --
> -- Guozhang
>


Re: Consumer / Streams causes deletes in __consumer_offsets?

2017-02-22 Thread Mahendra Kariya
Hi Guozhang,

On Thu, Feb 23, 2017 at 2:48 AM, Guozhang Wang  wrote:

> With that even if you do
> not have any data processed the commit operation will be triggered after
> that configured period of time.
>

The above statement is confusing. As per this thread ,
offsets are only committed once they have been fully processed by the
topology and not when the timeout expires!


Re: JMX metrics for replica lag time

2017-02-22 Thread Mahendra Kariya
Just wondering, for what particular Kafka version is this applicable?

On Thu, Feb 23, 2017 at 2:38 AM, Guozhang Wang  wrote:

> Hmm that is a very good question. It seems to me that we did not add the
> corresponding metrics for it when we changed the mechanism. And your
> observation is likely to happen, that lag-in-message will not be useful
> enough to predict / explain why a follower has been kicked out of ISR.
>
> Could you file a JIRA for this? I think we can create a new metrics
> recording (time.milliseconds - r.lastCaughtUpTimeMs) and deprecate the old
> metrics.
>
> Guozhang
>
>
> On Tue, Feb 21, 2017 at 5:47 PM, Jun MA  wrote:
>
> > Hi Guozhang,
> >
> > Thanks for pointing this out. I was actually looking at this before and
> > that’s why I’m asking the question. This metric is 'lag in messages', and
> > since now the ISR logic relies on lag in seconds, not lag in messages,
> I’m
> > not sure how useful this metrics is. In fact, we saw the value of this
> > metrics been 0 all the time, even when there's ISR shrink/expand. I’d
> > expect to see a increasing in lag when shrink/expand happens. Is there a
> > metrics that can correctly represent the lag between followers and the
> > leader?
> >
> > Thanks,
> > Jun
> >
> > > On Feb 21, 2017, at 10:19 AM, Guozhang Wang 
> wrote:
> > >
> > > You can find them in https://kafka.apache.org/
> documentation/#monitoring
> > >
> > > I think this is the one you are looking for:
> > >
> > > Lag in messages per follower replica
> > > kafka.server:type=FetcherLagMetrics,name=
> ConsumerLag,clientId=([-.\w]+)
> > ,topic=([-.\w]+),partition=([0-9]+)
> > > lag
> > > should be proportional to the maximum batch size of a produce request.
> > >
> > > On Mon, Feb 20, 2017 at 5:43 PM, Jun Ma 
> wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> Thanks for your replay. Could you tell me which one indicates the lag
> > >> between follower and leader for a specific partition?
> > >>
> > >> Thanks,
> > >> Jun
> > >>
> > >> On Mon, Feb 20, 2017 at 4:57 PM, Guozhang Wang 
> > wrote:
> > >>
> > >>> I don't think the metrics have been changed in 0.9.0.1, in fact even
> in
> > >>> 0.10.x they are still the same as stated in:
> > >>>
> > >>> https://kafka.apache.org/documentation/#monitoring
> > >>>
> > >>> The mechanism for determine which followers have been dropped out of
> > ISR
> > >>> has changed, but the metrics are not.
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Sun, Feb 19, 2017 at 7:56 PM, Jun MA 
> > wrote:
> > >>>
> >  Hi,
> > 
> >  I’m looking for the JMX metrics to represent replica lag time for
> > >>> 0.9.0.1.
> >  Base on the documentation, I can only find kafka.server:type=
> >  ReplicaFetcherManager,name=MaxLag,clientId=Replica, which is max
> lag
> > >> in
> >  messages btw follower and leader replicas. But since in 0.9.0.1 lag
> in
> >  messages is deprecated and replaced with lag time, I’m wondering
> what
> > >> is
> >  the corresponding metrics for this?
> > 
> >  Thanks,
> >  Jun
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
> >
>
>
> --
> -- Guozhang
>


Re: Kafka consumer offset location

2017-02-09 Thread Mahendra Kariya
You can use the seekToBeginning method of KafkaConsumer.

https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)

On Thu, Feb 9, 2017 at 7:56 PM, Igor Kuzmenko  wrote:

> Hello, I'm using new consumer to read kafka topic. For testing, I want to
> read the same topic from the beggining multiple times, with same consumer.
> Before restarting test, I want to delete consumer offsets, so consumer
> start read from begining. Where can I find offsets?
>


Re: At Least Once semantics for Kafka Streams

2017-02-06 Thread Mahendra Kariya
Ah OK! Thanks!

On Mon, Feb 6, 2017, 3:09 PM Eno Thereska  wrote:

> Oh, by "other" I meant the original one you started discussing:
> COMMIT_INTERVAL_MS_CONFIG.
>
> Eno
> > On 6 Feb 2017, at 09:28, Mahendra Kariya 
> wrote:
> >
> > Thanks Eno!
> >
> > I am just wondering what is this other commit parameter?
> >
> > On Mon, Feb 6, 2017, 12:52 PM Eno Thereska 
> wrote:
> >
> >> Hi Mahendra,
> >>
> >> That is a good question. Streams uses consumers and that config applies
> to
> >> consumers. However, in streams we always set enable.auto.commit to
> false,
> >> and manage commits using the other commit parameter. That way streams
> has
> >> more control on when offsets are committed.
> >>
> >> Eno
> >>> On 6 Feb 2017, at 05:39, Mahendra Kariya 
> >> wrote:
> >>>
> >>> I have another follow up question regarding configuration.
> >>>
> >>> There is a config for enable.auto.commit for consumers. Does this apply
> >> to
> >>> Kafka streams? If yes, how is the behavior different when the value of
> >> this
> >>> config is true vs false?
> >>>
> >>> More generally, which of the consumer configs
> >>> <https://kafka.apache.org/documentation/#consumerconfigs> apply to
> Kafka
> >>> streams as well?
> >>>
> >>>
> >>>
> >>>
> >>> On Fri, Feb 3, 2017 at 9:43 PM, Mahendra Kariya <
> >> mahendra.kar...@go-jek.com>
> >>> wrote:
> >>>
> >>>> Ah OK! Thanks a lot for this clarification.
> >>>>
> >>>> it will only commit the offsets if the value of
> >> COMMIT_INTERVAL_MS_CONFIG
> >>>>> has
> >>>>> passed.
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>
> >>
>
>


Re: At Least Once semantics for Kafka Streams

2017-02-06 Thread Mahendra Kariya
Thanks Eno!

I am just wondering what is this other commit parameter?

On Mon, Feb 6, 2017, 12:52 PM Eno Thereska  wrote:

> Hi Mahendra,
>
> That is a good question. Streams uses consumers and that config applies to
> consumers. However, in streams we always set enable.auto.commit to false,
> and manage commits using the other commit parameter. That way streams has
> more control on when offsets are committed.
>
> Eno
> > On 6 Feb 2017, at 05:39, Mahendra Kariya 
> wrote:
> >
> > I have another follow up question regarding configuration.
> >
> > There is a config for enable.auto.commit for consumers. Does this apply
> to
> > Kafka streams? If yes, how is the behavior different when the value of
> this
> > config is true vs false?
> >
> > More generally, which of the consumer configs
> > <https://kafka.apache.org/documentation/#consumerconfigs> apply to Kafka
> > streams as well?
> >
> >
> >
> >
> > On Fri, Feb 3, 2017 at 9:43 PM, Mahendra Kariya <
> mahendra.kar...@go-jek.com>
> > wrote:
> >
> >> Ah OK! Thanks a lot for this clarification.
> >>
> >> it will only commit the offsets if the value of
> COMMIT_INTERVAL_MS_CONFIG
> >>> has
> >>> passed.
> >>>
> >>
> >>
> >>
> >>
> >>
>
>


Re: At Least Once semantics for Kafka Streams

2017-02-05 Thread Mahendra Kariya
I have another follow up question regarding configuration.

There is a config for enable.auto.commit for consumers. Does this apply to
Kafka streams? If yes, how is the behavior different when the value of this
config is true vs false?

More generally, which of the consumer configs
<https://kafka.apache.org/documentation/#consumerconfigs> apply to Kafka
streams as well?




On Fri, Feb 3, 2017 at 9:43 PM, Mahendra Kariya 
wrote:

> Ah OK! Thanks a lot for this clarification.
>
> it will only commit the offsets if the value of COMMIT_INTERVAL_MS_CONFIG
>> has
>> passed.
>>
>
>
>
>
>


Re: At Least Once semantics for Kafka Streams

2017-02-03 Thread Mahendra Kariya
Ah OK! Thanks a lot for this clarification.

it will only commit the offsets if the value of COMMIT_INTERVAL_MS_CONFIG
> has
> passed.
>


Re: At Least Once semantics for Kafka Streams

2017-02-03 Thread Mahendra Kariya
Thanks Damian for this info.

On Fri, Feb 3, 2017 at 3:29 PM, Damian Guy  wrote:

> The commit is done on the same thread as the processing, so only offsets
> that have been fully processed by the topology will be committed.
>


I am still not clear about why do we need the COMMIT_INTERVAL_MS_CONFIG config
for streams. If the offsets are only going to be committed after the
topology processing is over, what does this config exactly do?


Re: At Least Once semantics for Kafka Streams

2017-02-03 Thread Mahendra Kariya
Thanks a lot for this Matthias.

I have a follow up question. There is a COMMIT_INTERVAL_MS_CONFIG config
for streams. This confuses things a little bit. If the value of this config
is set to, say 100 ms, does it mean that the offset will be committed after
100 ms? If yes, then how does at least once guarantee work?




On Mon, Jan 30, 2017 at 1:43 PM, Matthias J. Sax 
wrote:

> Hi,
>
> yes, all examples have at-least-once semantics because this is the only
> "mode" Kafka Streams supports -- you cannot "disable" it. (btw: we are
> currently working on exactly-once for Streams that you will be able to
> turn off/on).
>
> There is not much documentation about how it work internally, because it
> uses the same mechanism Kafka consumer user to provide at-least-once
> semantics and there is good documentation for this:
> http://docs.confluent.io/current/clients/consumer.html
>
> The basic idea is, that input topic offsets get only committed after the
> processing is completely finished and all intermediate state/data is
> flushed to reliable storage -- to ensure nothing can get lost. If a
> failure happens, just retry from last committed offset (and because some
> output date might already be written which cannot be undone you might
> get duplicates).
>
> -Matthias
>
> On 1/29/17 8:13 PM, Mahendra Kariya wrote:
> > Hey All,
> >
> > I am new to Kafka streams. From the documentation
> > <http://docs.confluent.io/3.1.0/streams/architecture.html#
> processing-guarantees>,
> > it is pretty much clear that streams support at least once semantics.
> But I
> > couldn't find details about how this is supported. I am interested in
> > knowing the finer details / design of this.
> >
> > Is there some documentation around this?
> > Is there some documentation around what semantics are followed by the
> > various Kafka streams examples
> > <https://github.com/confluentinc/examples/tree/3.1.x/kafka-streams>
> > available on Github? Do all of them follow at least once?
> >
> >
> > Thanks,
> > Mahendra
> >
>
>


At Least Once semantics for Kafka Streams

2017-01-29 Thread Mahendra Kariya
Hey All,

I am new to Kafka streams. From the documentation
,
it is pretty much clear that streams support at least once semantics. But I
couldn't find details about how this is supported. I am interested in
knowing the finer details / design of this.

Is there some documentation around this?
Is there some documentation around what semantics are followed by the
various Kafka streams examples

available on Github? Do all of them follow at least once?


Thanks,
Mahendra


Re: Kafka consumer offset info lost

2017-01-12 Thread Mahendra Kariya
Producers were publishing data for the topic. And consumers were also
connected, sending heartbeat pings every 100 ms.



On Thu, 12 Jan 2017 at 17:15 Michael Freeman  wrote:

> If the topic has not seen traffic for a while then Kafka will remove the
> stored offset. When your consumer reconnects Kafka no longer has the offset
> so it will reprocess from earliest.
>
> Michael
>
> > On 12 Jan 2017, at 11:13, Mahendra Kariya 
> wrote:
> >
> > Hey All,
> >
> > We have a Kafka cluster hosted on Google Cloud. There was some network
> > issue on the cloud and suddenly, the offset for a particular consumer
> group
> > got reset to earliest and all of a sudden the lag was in millions. We
> > aren't able to figure out what went wrong. Has anybody faced the
> > same/similar issue? Does anybody have any debugging tips?
> >
> > Some relevant info:
> >
> >   - The auto.offset.reset config for the consumer is set to earliest
> >   - The offsets are stored on Kafka
> >   - Total nodes on cluster: 4
> >   - Replication factor: 3
> >   - Partitions: 50
>


Kafka consumer offset info lost

2017-01-12 Thread Mahendra Kariya
Hey All,

We have a Kafka cluster hosted on Google Cloud. There was some network
issue on the cloud and suddenly, the offset for a particular consumer group
got reset to earliest and all of a sudden the lag was in millions. We
aren't able to figure out what went wrong. Has anybody faced the
same/similar issue? Does anybody have any debugging tips?

Some relevant info:

   - The auto.offset.reset config for the consumer is set to earliest
   - The offsets are stored on Kafka
   - Total nodes on cluster: 4
   - Replication factor: 3
   - Partitions: 50