Re: Stream application :: State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED continuously

2019-05-27 Thread Nayanjyoti Deka
Hi Guozhang,

Yes, indeed. We found that whenever the changelog offsets were very high it
happened.

For now, we are trying the rc1 version on our staging environment. Will
update on this thread if the fix that Jonathan mentioned above worked for
us as well or not. (for downstream to be flooded with suppress)

On Fri, May 24, 2019 at 11:18 PM Guozhang Wang  wrote:

> Hello Nayanjyoti,
>
> Regarding the KIP-328, on-disk buffer is indeed being implemented but it
> has not been completed and unfortunately has to slip to the next release.
>
> Now about the "PARTITIONS_REVOKED to PARTITIONS_ASSIGNED" issue, it is
> possible that if you are restoring tons of data from the changelog, then it
> took long time and while you are doing it since stream did not call
> consumer.poll() in time it would be kicked out of the group again.
>
>
> Guozhang
>
>
> On Tue, May 21, 2019 at 5:50 AM Jonathan Santilli <
> jonathansanti...@gmail.com> wrote:
>
> > Hello Nayanjyoti, about this part you mentioned:
> >
> > "Also, we had noticed that on restarts the downstream of the suppress
> > operator is *flooded* with events, which in the ideal case wouldn't have
> > come. I came across https://stackoverflow.com/a/54227156 where Matthias
> > had
> > responded the behaviour of the supress buffer being in memory. (for 2.1
> > version) and that it reads changelog to *recreate* the buffer which
> should
> > actually *prevent* the behaviour(downstream being flooded) mentioned
> above.
> > Am I missing something?"
> >
> > It was me who asked that question in SO (
> > https://stackoverflow.com/a/54227156)
> > Yes, in the version 2.2.0 the bug is still there but has been solved in
> the
> > version 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895) (which
> is
> > under voting right now 2.2.1-RC1
> > https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html)
> > I have tested the App that was suffering that problem and now is solved.
> Of
> > course, you need to test your own App.
> >
> > I hope that helps.
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Tue, May 21, 2019 at 11:29 AM Nayanjyoti Deka 
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > I had looked more into it. Seems that on restart the suppress changelog
> > > topic was being recreated and at that time there were no heartbeats to
> > the
> > > broker from the application, hence causing it to behave this way. I
> could
> > > see the log of reading the suppress changelog topic from offset 0 on
> the
> > > restart.
> > >
> > > I'm trying to understand why it needs to read the entire changelog
> topic
> > > since the window which has passed (past time) should have been
> compacted
> > > (or maybe deleted)  from the broker's topic data.
> > >
> > > Also, we had noticed that on restarts the downstream of the suppress
> > > operator is *flooded* with events, which in the ideal case wouldn't
> have
> > > come. I came across https://stackoverflow.com/a/54227156 where
> Matthias
> > > had
> > > responded the behaviour of the supress buffer being in memory. (for 2.1
> > > version) and that it reads changelog to *recreate* the buffer which
> > should
> > > actually *prevent* the behaviour(downstream being flooded) mentioned
> > above.
> > > Am I missing something?
> > >
> > > We are using 2.2 version which probably has the same behaviour as well.
> > >
> > > Please correct me if I'm wrong on my analysis somewhere.
> > >
> > > Also if possible, could you please provide me with an understanding of
> > why
> > > suppress was implemented with an in-memory buffer in mind first and not
> > > spilling to disk. I have read the KIP document,
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> > > ,
> > > but this doesn't mention any specifics as to why the in-memory
> > > implementation was chosen since across restarts on-disk spills would
> have
> > > provided the exact semantics described by the operator.
> > >
> > >
> > >
> > > On Mon, May 20, 2019 at 9:24 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hello Nayanjyoti,
> > > >
> > > > Did you find anything else from the streams log entries (is it
> enabled
> > on
> > > > DEBUG or TRACE?), and what version of Kafka are you u

Re: Stream application :: State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED continuously

2019-05-21 Thread Nayanjyoti Deka
Hi Guozhang,

I had looked more into it. Seems that on restart the suppress changelog
topic was being recreated and at that time there were no heartbeats to the
broker from the application, hence causing it to behave this way. I could
see the log of reading the suppress changelog topic from offset 0 on the
restart.

I'm trying to understand why it needs to read the entire changelog topic
since the window which has passed (past time) should have been compacted
(or maybe deleted)  from the broker's topic data.

Also, we had noticed that on restarts the downstream of the suppress
operator is *flooded* with events, which in the ideal case wouldn't have
come. I came across https://stackoverflow.com/a/54227156 where Matthias had
responded the behaviour of the supress buffer being in memory. (for 2.1
version) and that it reads changelog to *recreate* the buffer which should
actually *prevent* the behaviour(downstream being flooded) mentioned above.
Am I missing something?

We are using 2.2 version which probably has the same behaviour as well.

Please correct me if I'm wrong on my analysis somewhere.

Also if possible, could you please provide me with an understanding of why
suppress was implemented with an in-memory buffer in mind first and not
spilling to disk. I have read the KIP document,
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables,
but this doesn't mention any specifics as to why the in-memory
implementation was chosen since across restarts on-disk spills would have
provided the exact semantics described by the operator.



On Mon, May 20, 2019 at 9:24 PM Guozhang Wang  wrote:

> Hello Nayanjyoti,
>
> Did you find anything else from the streams log entries (is it enabled on
> DEBUG or TRACE?), and what version of Kafka are you using?
>
>
> Guozhang
>
> On Sun, May 19, 2019 at 1:04 PM Nayanjyoti Deka 
> wrote:
>
> > Forgot to add that there is no transition to RUNNING state.
> >
> > On Mon, May 20, 2019 at 1:10 AM Nayanjyoti Deka 
> > wrote:
> >
> > > Hey guys,
> > >
> > > We are running a stream application in our production environment. On
> our
> > > latest restart, the application is consistently moving between these
> two
> > > states.
> > >
> > > From our logs:
> > >
> > > grep "State transition from " application.log | jq -r '.message' |
> sort |
> > > uniq -c | sort -n -r
> > >
> > >  40 stream-thread [-StreamThread-9] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-8] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-7] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-6] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-5] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-4] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-3] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-2] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-1] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-12] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-11] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  40 stream-thread [-StreamThread-10] State transition from
> > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > >
> > >  39 stream-thread [-StreamThread-9] State transition from
> > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > >
> > >  39 stream-thread [-StreamThread-8] State transition from
> > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > >
> > >  39 stream-thread [-StreamThread-7] State transition from
> > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > >
> > >  39 stream-thread [-StreamThread-6] State transition from
> > > PARTITIONS_ASSIGNED

Re: Stream application :: State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED continuously

2019-05-19 Thread Nayanjyoti Deka
Forgot to add that there is no transition to RUNNING state.

On Mon, May 20, 2019 at 1:10 AM Nayanjyoti Deka 
wrote:

> Hey guys,
>
> We are running a stream application in our production environment. On our
> latest restart, the application is consistently moving between these two
> states.
>
> From our logs:
>
> grep "State transition from " application.log | jq -r '.message' | sort |
> uniq -c | sort -n -r
>
>  40 stream-thread [-StreamThread-9] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-8] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-7] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-6] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-5] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-4] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-3] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-2] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-1] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-12] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-11] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  40 stream-thread [-StreamThread-10] State transition from
> PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>
>  39 stream-thread [-StreamThread-9] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-8] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-7] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-6] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-5] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-4] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-3] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-2] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-1] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-12] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-11] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>  39 stream-thread [-StreamThread-10] State transition from
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
>
>
>
> As we can see the stream threads are first revoked than again assigned.
>
> Also we can see the logs of resetting of offsets continuously as follows:
>
> Resetting offset for partition -2 to offset 9166288.
>
>
> We had actually deleted the consumer group on broker before the restart as
> there was considerable lag in the topic and processing of the stale data
> was not intended. We had assumed that on deleting the group, the
> application will start processing from latest offset as mentioned in the
> config auto.offset.reset policy.
>
> On describing the consumer group on broker side, we receive output with
> the current offset and lag set as --  (Eg shown below)
>
>
> TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> CONSUMER-ID
>
> xxx 10 -   129822997   -
> -StreamThread-2-consumer-a-b-c-d
>
>
>
> Please help us understand why this can be happening and how to solve this.
>
>
>


Stream application :: State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED continuously

2019-05-19 Thread Nayanjyoti Deka
Hey guys,

We are running a stream application in our production environment. On our
latest restart, the application is consistently moving between these two
states.

>From our logs:

grep "State transition from " application.log | jq -r '.message' | sort |
uniq -c | sort -n -r

 40 stream-thread [-StreamThread-9] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-8] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-7] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-6] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-5] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-4] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-3] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-2] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-1] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-12] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-11] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 40 stream-thread [-StreamThread-10] State transition from
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

 39 stream-thread [-StreamThread-9] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-8] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-7] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-6] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-5] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-4] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-3] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-2] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-1] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-12] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-11] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

 39 stream-thread [-StreamThread-10] State transition from
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED



As we can see the stream threads are first revoked than again assigned.

Also we can see the logs of resetting of offsets continuously as follows:

Resetting offset for partition -2 to offset 9166288.


We had actually deleted the consumer group on broker before the restart as
there was considerable lag in the topic and processing of the stale data
was not intended. We had assumed that on deleting the group, the
application will start processing from latest offset as mentioned in the
config auto.offset.reset policy.

On describing the consumer group on broker side, we receive output with the
current offset and lag set as --  (Eg shown below)


TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID

xxx 10 -   129822997   -
-StreamThread-2-consumer-a-b-c-d



Please help us understand why this can be happening and how to solve this.


Re: Topic Creation two different config(s)

2019-05-08 Thread Nayanjyoti Deka
You can use AdminClient.createTopics with the necessary configuration.

On Wed, May 8, 2019 at 12:08 PM SenthilKumar K 
wrote:

> Hello Experts , We have a requirement to create topic dynamically with two
> different config(s). Is this possible in Kafka ?
>
> Kafka Version : 2.2.0
>
> Topics with different settings:
> #1 - Set retention as 24 hours for free tier customers
> # 2 - Set retention as 72 hours for paid customers
>
> Note : Right now Java Producer is auto creating topics with broker default
> config(s).
>
> --Senthil
>


FetchConsumer RemoteTime broker metric

2019-03-24 Thread Nayanjyoti Deka
Hi,

I have been trying to make sense of the various jmx metrics exposed by the
brokers. Currentl,y I was looking into the FetchConsumer request metrics.
Here are the values I got.
#mbean =
kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request=FetchConsumer:
999thPercentile = 7.9710004;

#mbean =
kafka.network:type=RequestMetrics,name=LocalTimeMs,request=FetchConsumer:
999thPercentile = 0.97100036;

#mbean = kafka.network:type=RequestMetrics,name=*RemoteTimeMs*,request=
*FetchConsumer*:
999thPercentile = *553.913*;

#mbean =
kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request=FetchConsumer:
999thPercentile = 9.0;

#mbean =
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer:
999thPercentile = 545.971;

I was unable to make sense for the mbean
kafka.network:type=RequestMetrics,name=*RemoteTimeMs*
,request=FetchConsumer.

For consumers why is this value non-zero? In the docs, it’s mentioned that
non-zero for produce requests when ack=-1 So for the mbean for fetch
consumer shouldn’t it be 0?

Any pointers would be helpful. I have got a feeling that my understanding
of the metric is somehow wrong and would love some clarity on it.

Thanks.