[GitHub] kafka pull request #2894: [KAFKA-5092] [WIP] changed ProducerRecord interfac...

2017-12-13 Thread simplesteph
Github user simplesteph closed the pull request at:

https://github.com/apache/kafka/pull/2894


---


[GitHub] kafka pull request #4323: KAFKA-5849: Add process stop, round trip workload,...

2017-12-13 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/4323

KAFKA-5849: Add process stop, round trip workload, partitioned test

* Implement process stop faults via SIGSTOP / SIGCONT

* Implement RoundTripWorkload, which both sends messages, and confirms that 
they are received at least once.

* Allow Trogdor tasks to block until other Trogdor tasks are complete.

* Add CreateTopicsWorker, which can be a building block for a lot of tests.

* Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON.

* Implement some fault injection tests in round_trip_workload_test.py

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-5849

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4323


commit bd7c5ae285bcc62e6ee526de55343679e1e1
Author: Colin P. Mccabe 
Date:   2017-12-14T01:31:00Z

KAFKA-5849: Add process stop faults, round trip workload, partitioned 
produce-consume test

* Implement process stop faults via SIGSTOP / SIGCONT

* Implement RoundTripWorkload, which both sends messages, and confirms that 
they are received at least once.

* Allow Trogdor tasks to block until other Trogdor tasks are complete.

* Add CreateTopicsWorker, which can be a building block for a lot of tests.

* Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON.

* Implement some fault injection tests in round_trip_workload_test.py




---


[GitHub] kafka pull request #4106: KAFKA-5849: Add partitioned produce consume test

2017-12-13 Thread cmccabe
Github user cmccabe closed the pull request at:

https://github.com/apache/kafka/pull/4106


---


[jira] [Created] (KAFKA-6361) Fast leader fail over can lead to log divergence between replica and follower

2017-12-13 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6361:
--

 Summary: Fast leader fail over can lead to log divergence between 
replica and follower
 Key: KAFKA-6361
 URL: https://issues.apache.org/jira/browse/KAFKA-6361
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We have observed an edge case in the replication failover logic which can cause 
a replica to permanently fall out of sync with the leader or, in the worst 
case, actually have localized divergence between logs. This occurs in spite of 
the improved truncation logic from KIP-101. 

Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
appends two batches: one in the range (0, 10) and the other in the range (11, 
20). The first one successfully replicates to B, but the second one does not. 
In other words, the logs on the brokers look like this:

{code}
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1

Broker B:
0: offsets [0, 10], leader epoch: 1
{code}

Broker A then has a zk session expiration and broker B is elected with epoch 2. 
It appends a new batch with offsets (11, n) to its local log. So we now have 
this:

{code}
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1

Broker B:
0: offsets [0, 10], leader epoch: 1
1: offsets: [11, n], leader epoch: 2
{code}

Normally we expect broker A to truncate to offset 11 on becoming the follower, 
but before it is able to do so, broker B has its own zk session expiration and 
broker A again becomes leader, now with epoch 3. It then appends a new entry in 
the range (21, 30). The updated logs look like this:

{code}
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1
2: offsets: [21, 30], leader epoch: 3

Broker B:
0: offsets [0, 10], leader epoch: 1
1: offsets: [11, n], leader epoch: 2
{code}

Now what happens next depends on the last offset of the batch appended in epoch 
2. On becoming follower, broker B will send an OffsetForLeaderEpoch request to 
broker A with epoch 2. Broker A will respond that epoch 2 ends at offset 21. 
There are three cases:

1) n < 20: In this case, broker B will not do any truncation. It will begin 
fetching from offset n, which will ultimately cause an out of order offset 
error because broker A will return the full batch beginning from offset 11 
which broker B will be unable to append.

2) n == 20: Again broker B does not truncate. It will fetch from offset 21 and 
everything will appear fine though the logs have actually diverged.

3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in the 
middle of the batch, it will truncate all the way to offset 10. It can begin 
fetching from offset 11 and everything is fine.

The case we have actually seen is the first one. The second one would likely go 
unnoticed in practice and everything is fine in the third case. To workaround 
the issue, we deleted the active segment on the replica which allowed it to 
re-replicate consistently from the leader.

I'm not sure the best solution for this scenario. Maybe if the leader isn't 
aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
instead of using the offset of the next highest epoch. That would cause the 
follower to truncate using its high watermark. Or perhaps instead of doing so, 
it could send another OffsetForLeaderEpoch request at the next previous cached 
epoch and then truncate using that. 




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-210: Provide for custom error handling when Kafka Streams fails to produce

2017-12-13 Thread Guozhang Wang
Metrics: this is a good point.

Note that currently we have two metrics for `skipped-records` on different
levels:

1) on the highest level, the thread-level, we have a `skipped-records`,
that records all the skipped records due to deserialization errors.
2) on the lower processor-node level, we have a
`skippedDueToDeserializationError`, that records the skipped records on
that specific source node due to deserialization errors.


So you can see that 1) does not cover any other scenarios and can just be
thought of as an aggregate of 2) across all the tasks' source nodes.
However, there are other places that can cause a record to be dropped, for
example:

1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
dropped due to window elapsed.
2) KIP-210: records could be dropped on the producer side.
3) records could be dropped during user-customized processing on errors.


I think improving the skipped records of all these scenarios itself worth
having another KIP; so I'd suggest we do not drag this KIP-210 into this.


Guozhang


On Wed, Dec 13, 2017 at 3:45 PM, Matthias J. Sax 
wrote:

> One more after thought: should we add a metric for this? We also have a
> metric for `skippedDueToDeserializationError-rate` ?
>
>
> -Matthias
>
>
>
> On 12/6/17 7:54 AM, Bill Bejeck wrote:
> > Thanks for the clearly written KIP, no further comments from my end.
> >
> > -Bill
> >
> > On Wed, Dec 6, 2017 at 9:52 AM, Matt Farmer  wrote:
> >
> >> There is already a vote thread for this KIP. I can bump it so that it’s
> >> towards the top of your inbox.
> >>
> >> With regard to your concerns:
> >>
> >> 1) We do not have the "ProductionExceptionHandler" interface defined in
> the
> >> wiki page, thought it is sort of clear that it is a one-function
> interface
> >> with record and exception. Could you add it?
> >>
> >>
> >> It is defined, it’s just not defined using a code snippet. The KIP
> reads as
> >> follows:
> >>
> >> ===
> >>
> >> A public interface named ProductionExceptionHandler with a single
> method,
> >> handle, that has the following signature:
> >>
> >>- ProductionExceptionHandlerResponse handle(ProducerRecord >>byte[]> record, Exception exception)
> >>
> >>
> >> ===
> >>
> >> If you’d like me to add a code snippet illustrating this that’s simple
> for
> >> me to do, but it seemed superfluous.
> >>
> >> 2) A quick question about your example code: where would be the "logger"
> >> object be created?
> >>
> >>
> >> SLF4J loggers are typically created as a class member in the class. Such
> >> as:
> >>
> >> private Logger logger = LoggerFactory.getLogger(HelloWorld.class);
> >>
> >> I omit that in my implementation examples for brevity.
> >>
> >> On December 6, 2017 at 2:14:58 AM, Guozhang Wang (wangg...@gmail.com)
> >> wrote:
> >>
> >> Hello Matt,
> >>
> >> Thanks for writing up the KIP. I made a pass over it and here is a few
> >> minor comments. I think you can consider starting a voting thread for
> this
> >> KIP while addressing them.
> >>
> >> 1) We do not have the "ProductionExceptionHandler" interface defined in
> the
> >> wiki page, thought it is sort of clear that it is a one-function
> interface
> >> with record and exception. Could you add it?
> >>
> >> 2) A quick question about your example code: where would be the "logger"
> >> object be created? Note that the implementation of this interface have
> to
> >> give a non-param constructor, or as a static field of the class but in
> that
> >> case you would not be able to log which instance is throwing this error
> (we
> >> may have multiple producers within a single instance, even within a
> >> thread). Just a reminder to consider in your implementation.
> >>
> >>
> >> Guozhang
> >>
> >> On Tue, Dec 5, 2017 at 3:15 PM, Matthias J. Sax 
> >> wrote:
> >>
> >>> Thanks a lot for the update! Great write-up! Very clearly explained
> what
> >>> the change will look like!
> >>>
> >>> Looks good to me. No further comments from my side.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 12/5/17 9:14 AM, Matt Farmer wrote:
>  I have updated this KIP accordingly.
> 
>  Can you please take a look and let me know if what I wrote looks
> >> correct
> >>> to
>  you?
> 
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> 210+-+Provide+for+custom+error+handling++when+Kafka+
> >>> Streams+fails+to+produce
> 
>  Thanks!
> 
>  Matt
> 
> 
>  On December 4, 2017 at 9:39:13 PM, Matt Farmer (m...@frmr.me) wrote:
> 
>  Hey Matthias, thanks for getting back to me.
> 
>  That's fine. But if we add it to `test` package, we don't need to talk
>  about it in the KIP. `test` is not public API.
> 
>  Yes, that makes sense. It was in the KIP originally because I was, at
> >> one
>  point, planning on including it. We can remove it now that we’ve
> >> decided
> >>> we
>  won’t include it in the public 

Re: [DISCUSS] KIP-210: Provide for custom error handling when Kafka Streams fails to produce

2017-12-13 Thread Matthias J. Sax
One more after thought: should we add a metric for this? We also have a
metric for `skippedDueToDeserializationError-rate` ?


-Matthias



On 12/6/17 7:54 AM, Bill Bejeck wrote:
> Thanks for the clearly written KIP, no further comments from my end.
> 
> -Bill
> 
> On Wed, Dec 6, 2017 at 9:52 AM, Matt Farmer  wrote:
> 
>> There is already a vote thread for this KIP. I can bump it so that it’s
>> towards the top of your inbox.
>>
>> With regard to your concerns:
>>
>> 1) We do not have the "ProductionExceptionHandler" interface defined in the
>> wiki page, thought it is sort of clear that it is a one-function interface
>> with record and exception. Could you add it?
>>
>>
>> It is defined, it’s just not defined using a code snippet. The KIP reads as
>> follows:
>>
>> ===
>>
>> A public interface named ProductionExceptionHandler with a single method,
>> handle, that has the following signature:
>>
>>- ProductionExceptionHandlerResponse handle(ProducerRecord>byte[]> record, Exception exception)
>>
>>
>> ===
>>
>> If you’d like me to add a code snippet illustrating this that’s simple for
>> me to do, but it seemed superfluous.
>>
>> 2) A quick question about your example code: where would be the "logger"
>> object be created?
>>
>>
>> SLF4J loggers are typically created as a class member in the class. Such
>> as:
>>
>> private Logger logger = LoggerFactory.getLogger(HelloWorld.class);
>>
>> I omit that in my implementation examples for brevity.
>>
>> On December 6, 2017 at 2:14:58 AM, Guozhang Wang (wangg...@gmail.com)
>> wrote:
>>
>> Hello Matt,
>>
>> Thanks for writing up the KIP. I made a pass over it and here is a few
>> minor comments. I think you can consider starting a voting thread for this
>> KIP while addressing them.
>>
>> 1) We do not have the "ProductionExceptionHandler" interface defined in the
>> wiki page, thought it is sort of clear that it is a one-function interface
>> with record and exception. Could you add it?
>>
>> 2) A quick question about your example code: where would be the "logger"
>> object be created? Note that the implementation of this interface have to
>> give a non-param constructor, or as a static field of the class but in that
>> case you would not be able to log which instance is throwing this error (we
>> may have multiple producers within a single instance, even within a
>> thread). Just a reminder to consider in your implementation.
>>
>>
>> Guozhang
>>
>> On Tue, Dec 5, 2017 at 3:15 PM, Matthias J. Sax 
>> wrote:
>>
>>> Thanks a lot for the update! Great write-up! Very clearly explained what
>>> the change will look like!
>>>
>>> Looks good to me. No further comments from my side.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 12/5/17 9:14 AM, Matt Farmer wrote:
 I have updated this KIP accordingly.

 Can you please take a look and let me know if what I wrote looks
>> correct
>>> to
 you?

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 210+-+Provide+for+custom+error+handling++when+Kafka+
>>> Streams+fails+to+produce

 Thanks!

 Matt


 On December 4, 2017 at 9:39:13 PM, Matt Farmer (m...@frmr.me) wrote:

 Hey Matthias, thanks for getting back to me.

 That's fine. But if we add it to `test` package, we don't need to talk
 about it in the KIP. `test` is not public API.

 Yes, that makes sense. It was in the KIP originally because I was, at
>> one
 point, planning on including it. We can remove it now that we’ve
>> decided
>>> we
 won’t include it in the public API.

 Understood. That makes sense. We should explain this clearly in the KIP
 and maybe log all other following exceptions at DEBUG level?


 I thought it was clear in the KIP, but I can go back and double check
>> my
 wording and revise it to try and make it clearer.

 I’ll take a look at doing more work on the KIP and the Pull Request
 tomorrow.

 Thanks again!

 On December 4, 2017 at 5:50:33 PM, Matthias J. Sax (
>>> matth...@confluent.io)
 wrote:

 Hey,

 About your questions:

>>> Acknowledged, so is ProducerFencedException the only kind of
>>> exception I
>>> need to change my behavior on? Or are there other types I need to
 check? Is
>>> there a comprehensive list somewhere?

 I cannot think if any other atm. We should list all fatal exceptions
>> for
 which we don't call the handler and explain why (exception is "global"
 and will affect all other records, too | ProducerFenced is
>> self-healing).

 We started to collect and categorize exception here (not completed
>> yet):
 https://cwiki.apache.org/confluence/display/KAFKA/
>>> Kafka+Streams+Architecture#KafkaStreamsArchitecture-TypesofExceptions
 :

 This list should be a good starting point though.

> I include it in the test package because I have tests that assert 

[GitHub] kafka pull request #4322: KAFKA-6126: Remove unnecessary topics created chec...

2017-12-13 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/4322

KAFKA-6126: Remove unnecessary topics created check


### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
kafka-6126-remove-topic-check-on-rebalance-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4322


commit c0a8b4868cf979e47a25d5e837b26e62e2aac212
Author: Matthias J. Sax 
Date:   2017-12-13T21:06:18Z

KAFKA-6126: Remove unnecessary topics created check




---


[GitHub] kafka pull request #4308: catch and log exceptions thrown in waiters added t...

2017-12-13 Thread xvrl
Github user xvrl closed the pull request at:

https://github.com/apache/kafka/pull/4308


---


Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2017-12-13 Thread Ted Yu
Tom:
bq. create a znode /admin/reassignments/$topic-$partition

Looks like the tree structure above should be:

/admin/reassignments/$topic/$partition

bq. The controller removes /admin/reassignment/$topic/$partition

Note the lack of 's' for reassignment. It would be good to make zookeeper
paths consistent.

Thanks

On Wed, Dec 13, 2017 at 9:49 AM, Tom Bentley  wrote:

> Hi Jun and Ted,
>
> Jun, you're right that needing one watcher per reassigned partition
> presents a scalability problem, and using a separate notification path
> solves that. I also agree that it makes sense to prevent users from using
> both methods on the same reassignment.
>
> Ted, naming the reassignments like mytopic-42 was simpler while I was
> proposing a watcher-per-reassignment (I'd have needed a child watcher on
> /admin/reassignments and also on /admin/reassignments/mytopic). Using the
> separate notification path means I don't need any watchers in the
> /admin/reassignments subtree, so switching to /admin/reassignments/mytopic/
> 42
> would work, and avoid /admin/reassignments having a very large number of
> child nodes. On the other hand it also means I have to create and delete
> the topic nodes (e.g. /admin/reassignments/mytopic), which incurs the cost
> of extra round trips to zookeeper. I suppose that since reassignment is
> generally a slow process it makes little difference if we increase the
> latency of the interactions with zookeeper.
>
> I have updated the KIP with these improvements, and a more detailed
> description of exactly how we would manage these znodes.
>
> Reading the algorithm in KafkaController.onPartitionReassignment(), it
> seems that it would be suboptimal for changing reassignments in-flight.
> Consider an initial assignment of [1,2], reassigned to [2,3] and then
> changed to [2,4]. Broker 3 will remain in the assigned replicas until
> broker 4 is in sync, even though 3 wasn't actually one of the original
> assigned replicas and is no longer a new assigned replica. I think this
> also affects the case where the reassignment is cancelled
> ([1,2]->[2,3]->[1,2]): We again have to wait for 3 to catch up, even though
> its replica will then be deleted.
>
> Should we seek to improve this algorithm in this KIP, or leave that as a
> later optimisation?
>
> Cheers,
>
> Tom
>
> On 11 December 2017 at 21:31, Jun Rao  wrote:
>
> > Another question is on the compatibility. Since now there are 2 ways of
> > specifying a partition reassignment, one under /admin/reassign_partitions
> > and the other under /admin/reassignments, we probably want to prevent the
> > same topic being reassigned under both paths at the same time?
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao  wrote:
> >
> > > Hi, Tom,
> > >
> > > Thanks for the KIP. It definitely addresses one of the pain points in
> > > partition reassignment. Another issue that it also addresses is the ZK
> > node
> > > size limit when writing the reassignment JSON.
> > >
> > > My only concern is that the KIP needs to create one watcher per
> > reassigned
> > > partition. This could add overhead in ZK and complexity for debugging
> > when
> > > lots of partitions are being reassigned simultaneously. We could
> > > potentially improve this by introducing a separate ZK path for change
> > > notification as we do for configs. For example, every time we change
> the
> > > assignment for a set of partitions, we could further write a sequential
> > > node /admin/reassignment_changes/[change_x]. That way, the controller
> > > only needs to watch the change path. Once a change is triggered, the
> > > controller can read everything under /admin/reassignments/.
> > >
> > > Jun
> > >
> > >
> > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley 
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> This is still very new, but I wanted some quick feedback on a
> > preliminary
> > >> KIP which could, I think, help with providing an AdminClient API for
> > >> partition reassignment.
> > >>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%
> > >> 3A+Interruptible+Partition+Reassignment
> > >>
> > >> I wasn't sure whether to start fleshing out a whole AdminClient API in
> > >> this
> > >> KIP (which would make it very big, and difficult to read), or whether
> to
> > >> break it down into smaller KIPs (which makes it easier to read and
> > >> implement in pieces, but harder to get a high-level picture of the
> > >> ultimate
> > >> destination). For now I've gone for a very small initial KIP, but I'm
> > >> happy
> > >> to sketch the bigger picture here if people are interested.
> > >>
> > >> Cheers,
> > >>
> > >> Tom
> > >>
> > >
> > >
> >
>
>
> On 11 December 2017 at 21:31, Jun Rao  wrote:
>
> > Another question is on the compatibility. Since now there are 2 ways of
> > specifying a partition reassignment, one under /admin/reassign_partitions
> > and 

Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient

2017-12-13 Thread Colin McCabe
On Wed, Dec 13, 2017, at 10:00, dan wrote:
> > Why not just return
> > org.apache.kafka.clients.admin.Config like describeConfigs does?
> 
> brokers have a `num.partitions` config that does not map to a valid
> `Config` entry for a topic.

Hi Dan,

Sorry if I'm misunderstanding something, but why not map it to
num.partitions?

> 
> another added benefit to using `NewTopic` may be (future kip) having the
> cluster return the actual replica mappings it would create (i have no
> idea if this is actually possible)

A better way of doing that would probably be extending
CreateTopicsRequest so that it returns partition assignment information
to the caller.  Then using validOnly = true to get this information.

Actually, come to think of it, maybe we should be doing that for this
KIP too.  Why not have CreateTopicsRequest return the config that was
used, plus the partition assignment that was made?  We don't create
topics that often, so the extra space on the wire should not be a
concern.

best,
Colin

> 
> dan
> 
> On Wed, Dec 13, 2017 at 9:55 AM, Colin McCabe  wrote:
> 
> > On Tue, Dec 12, 2017, at 19:02, Ewen Cheslack-Postava wrote:
> > > re: API versions, I actually wasn't sure if we needed it or not. I'm fine
> > > if people would prefer just bumping it, but I was actually curious if we
> > > could get away without bumping it. I don't know the behavior of the
> > > broker code paths for this well enough to know what types of errors those
> > > non-null assertions get converted into.
> >
> > There's no advantage to trying to keep the API version number the same,
> > though.  Since we have bidirectional client compatibility now, the
> > clients and the server will just negotiate whatever version they need.
> > New clients can still talk to older brokers that don't support this
> > feature.
> >
> > If you don't bump the API version, the best case scenario is that you
> > get a disconnect exception and the end-user is left confused about why.
> > The worse-case scenario is that you crash the broker (but probably not,
> > since you'd just get an NPE in serde, I think).  If you bump the version
> > number, you can provide a proper UnsupportedVersionException when the
> > feature is not supported.
> >
> > > For the return type, NewTopic seems reasonable and kind of intuitive --
> > > basically a description of the NewTopic you would get. The only reason I
> > > would be wary of reusing it is that what we don't want people doing is
> > > taking that and passing it directly into AdminClient.createTopics since
> > > we don't want them explicitly overriding all the defaults.
> >
> > Yeah.  Another thing is that NewTopic has a lot of stuff related to
> > replication that doesn't seem relevant here.  For example, when creating
> > NewTopic, you have the option of either setting replicationFactor, or
> > setting up a specific replica assignment.  Why not just return
> > org.apache.kafka.clients.admin.Config like describeConfigs does?
> >
> > best,
> > Colin
> >
> > >
> > > -Ewen
> > >
> > > On Tue, Dec 12, 2017 at 2:32 PM, dan  wrote:
> > >
> > > > Colin/Ewen,
> > > >
> > > > i will add changes to bump the API version.
> > > >
> > > > any preferences on the return type for the new method? tbh it seems
> > like
> > > > returning a NewTopic could make sense because the ConfigResource for a
> > > > TOPIC type does not let me encode `numPartitions`
> > > >
> > > > thanks
> > > > dan
> > > >
> > > > On Mon, Dec 11, 2017 at 7:22 PM, Colin McCabe 
> > wrote:
> > > >
> > > > > Hi Dan,
> > > > >
> > > > > The KIP looks good overall.
> > > > >
> > > > > On Mon, Dec 11, 2017, at 18:28, Ewen Cheslack-Postava wrote:
> > > > > > I think the key point is when the kafka admin and user creating
> > topics
> > > > > > differ. I think a more realistic example of Dan's point (2) is for
> > > > > > retention. I know that realistically, admins aren't just going to
> > > > > > randomly
> > > > > > drop the broker defaults from 1w to 1d without warning anyone
> > (they'd
> > > > > > likely be fired...). But as a user, I may not know the broker
> > configs,
> > > > if
> > > > > > admins have overridden them, etc. I may want a *minimum* of, e.g.,
> > 2d.
> > > > > > But if the broker defaults are higher such that the admins are
> > > > confident
> > > > > the
> > > > > > cluster can handle 1w, I'd rather just fall back on the default
> > value.
> > > > >
> > > > > Right.  I think this API addresses a similar set of use-cases as
> > adding
> > > > > the "validateOnly" boolean for createTopics.  You shouldn't have to
> > > > > create a topic to know whether it was possible to create it, or what
> > the
> > > > > retention will end up being, etc. etc.
> > > > >
> > > > > > Now, there's arguably a better solution for that case -- allow
> > topic
> > > > > > configs to express a *minimum* value (or maximum depending on the
> > > > > > particular config), with the broker config taking 

Re: [DISCUSS] KIP-222 - Add "describe consumer group" to KafkaAdminClient

2017-12-13 Thread Colin McCabe
On Tue, Dec 12, 2017, at 09:39, Jason Gustafson wrote:
> Hi Colin,
> 
> They do share the same namespace. We have a "protocol type" field in the
> JoinGroup request to make sure that all members are of the same kind.

Hi Jason,

Thanks.  That makes sense.

> Very roughly what I was thinking is something like this. First we introduce an
> interface for deserialization:
> 
> interface GroupMetadataDeserializer {
>   String protocolType();
>   Metadata desrializeMetadata(ByteBuffer);
>   Assignment deserializeAssignment(ByteBuffer);
> }
> 
> Then we add some kind of generic container:
> 
> class MemberMetadata {
>   Metadata metadata;
>   Assignment assignment;
> }
> 
> Then we have two APIs: one generic and one specific to consumer groups:
> 
>  Map> describeGroup(String groupId,
> GroupMetadataDeserializer deserializer);
> 
> Map describeConsumerGroup(String groupId);
> 
> (This is just a sketch, so obviously we can change them to use futures or
> to batch or whatever.)
> 
> I think it would be fine to not provide a connect-specific API since this
> usage will probably be limited to Connect itself.

Yeah, it probably makes sense to have a separation between describeGroup
and describeConsumerGroup.

We will have to be pretty careful with cross-version compatibility in
describeConsumerGroup.  It should be possible for an old client to talk
to a new broker, and a new client to talk to an old broker.  So we
should be prepared to read data in multiple formats.

I'm not sure if we need to have a 'deserializer' argument to
describeGroup.  We can just let them access a byte array, right? 
Theoretically they might also just want to check for the presence or
absence of a group, but not deserialize anything.

best,
Colin

> 
> Thanks,
> Jason
> 
> 
> On Mon, Dec 11, 2017 at 9:15 PM, Colin McCabe  wrote:
> 
> > Sorry... this is probably a silly question, but do Kafka Connect groups
> > share a namespace with consumer groups?  If we had a separate API for
> > Kafka Connect groups vs. Consumer groups, would that make sense?  Or
> > should we unify them?
> >
> > best,
> > Colin
> >
> >
> > On Mon, Dec 11, 2017, at 16:11, Jason Gustafson wrote:
> > > Hi Jorge,
> > >
> > > Kafka group management is actually more general than consumer groups
> > > (e.g.
> > > there are kafka connect groups). If we are adding these APIs, I would
> > > suggest we consider the more general protocol and how to expose
> > > group-protocol-specific metadata. For example, it might be reasonable to
> > > have both an API to access to the low-level bytes as well as some
> > > higher-level convenience APIs for accessing consumer groups.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Mon, Dec 4, 2017 at 4:07 PM, Matthias J. Sax 
> > > wrote:
> > >
> > > > Jorge,
> > > >
> > > > is there any update regarding this KIP?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 11/17/17 9:14 AM, Guozhang Wang wrote:
> > > > > Hello Jorge,
> > > > >
> > > > > I made a pass over the wiki, and here are a few comments:
> > > > >
> > > > > 1. First, regarding to Tom's comment #2 above, I think if we are only
> > > > going
> > > > > to include the String groupId. Then it is Okay to keep as a String
> > than
> > > > > using a new wrapper class. However, I think we could include the
> > > > > protocol_type returned from the ListGroupsResponse along with the
> > > > groupId.
> > > > > This is a very useful information to tell which consumer groups are
> > from
> > > > > Connect, which ones are from Streams, which ones are user-customized
> > etc.
> > > > > With this, it is reasonable to keep a wrapper class.
> > > > >
> > > > > 2. In ConsumerDescription, could we also add the state, protocol_type
> > > > > (these two are form DescribeGroupResponse), and the Node coordinator
> > > > (this
> > > > > may be returned from the AdminClient itself) as well? This is also
> > for
> > > > > information consistency with the old client (note that protocol_type
> > was
> > > > > called assignment_strategy there).
> > > > >
> > > > > 3. With 1) / 2) above, maybe we can rename "ConsumerGroupListing" to
> > > > > "ConsumerGroupSummary" and make "ConsumerGroupDescription" an
> > extended
> > > > > class of the former with the additional fields?
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Nov 7, 2017 at 2:13 AM, Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jo...@gmail.com> wrote:
> > > > >
> > > > >> Hi Tom,
> > > > >>
> > > > >> 1. You're right. I've updated the KIP accordingly.
> > > > >> 2. Yes, I have add it to keep consistency, but I'd like to know what
> > > > others
> > > > >> think about this too.
> > > > >>
> > > > >> Cheers,
> > > > >> Jorge.
> > > > >>
> > > > >> El mar., 7 nov. 2017 a las 9:29, Tom Bentley (<
> > t.j.bent...@gmail.com>)
> > > > >> escribió:
> > > > >>
> > 

Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient

2017-12-13 Thread dan
> Why not just return
> org.apache.kafka.clients.admin.Config like describeConfigs does?

brokers have a `num.partitions` config that does not map to a valid
`Config` entry for a topic.

another added benefit to using `NewTopic` may be (future kip) having the
cluster return the actual replica mappings it would create (i have no idea
if this is actually possible)

dan

On Wed, Dec 13, 2017 at 9:55 AM, Colin McCabe  wrote:

> On Tue, Dec 12, 2017, at 19:02, Ewen Cheslack-Postava wrote:
> > re: API versions, I actually wasn't sure if we needed it or not. I'm fine
> > if people would prefer just bumping it, but I was actually curious if we
> > could get away without bumping it. I don't know the behavior of the
> > broker code paths for this well enough to know what types of errors those
> > non-null assertions get converted into.
>
> There's no advantage to trying to keep the API version number the same,
> though.  Since we have bidirectional client compatibility now, the
> clients and the server will just negotiate whatever version they need.
> New clients can still talk to older brokers that don't support this
> feature.
>
> If you don't bump the API version, the best case scenario is that you
> get a disconnect exception and the end-user is left confused about why.
> The worse-case scenario is that you crash the broker (but probably not,
> since you'd just get an NPE in serde, I think).  If you bump the version
> number, you can provide a proper UnsupportedVersionException when the
> feature is not supported.
>
> > For the return type, NewTopic seems reasonable and kind of intuitive --
> > basically a description of the NewTopic you would get. The only reason I
> > would be wary of reusing it is that what we don't want people doing is
> > taking that and passing it directly into AdminClient.createTopics since
> > we don't want them explicitly overriding all the defaults.
>
> Yeah.  Another thing is that NewTopic has a lot of stuff related to
> replication that doesn't seem relevant here.  For example, when creating
> NewTopic, you have the option of either setting replicationFactor, or
> setting up a specific replica assignment.  Why not just return
> org.apache.kafka.clients.admin.Config like describeConfigs does?
>
> best,
> Colin
>
> >
> > -Ewen
> >
> > On Tue, Dec 12, 2017 at 2:32 PM, dan  wrote:
> >
> > > Colin/Ewen,
> > >
> > > i will add changes to bump the API version.
> > >
> > > any preferences on the return type for the new method? tbh it seems
> like
> > > returning a NewTopic could make sense because the ConfigResource for a
> > > TOPIC type does not let me encode `numPartitions`
> > >
> > > thanks
> > > dan
> > >
> > > On Mon, Dec 11, 2017 at 7:22 PM, Colin McCabe 
> wrote:
> > >
> > > > Hi Dan,
> > > >
> > > > The KIP looks good overall.
> > > >
> > > > On Mon, Dec 11, 2017, at 18:28, Ewen Cheslack-Postava wrote:
> > > > > I think the key point is when the kafka admin and user creating
> topics
> > > > > differ. I think a more realistic example of Dan's point (2) is for
> > > > > retention. I know that realistically, admins aren't just going to
> > > > > randomly
> > > > > drop the broker defaults from 1w to 1d without warning anyone
> (they'd
> > > > > likely be fired...). But as a user, I may not know the broker
> configs,
> > > if
> > > > > admins have overridden them, etc. I may want a *minimum* of, e.g.,
> 2d.
> > > > > But if the broker defaults are higher such that the admins are
> > > confident
> > > > the
> > > > > cluster can handle 1w, I'd rather just fall back on the default
> value.
> > > >
> > > > Right.  I think this API addresses a similar set of use-cases as
> adding
> > > > the "validateOnly" boolean for createTopics.  You shouldn't have to
> > > > create a topic to know whether it was possible to create it, or what
> the
> > > > retention will end up being, etc. etc.
> > > >
> > > > > Now, there's arguably a better solution for that case -- allow
> topic
> > > > > configs to express a *minimum* value (or maximum depending on the
> > > > > particular config), with the broker config taking precedence if it
> has
> > > a
> > > > > smaller value (or larger in the case of maximums). This lets you
> > > express
> > > > > your minimum requirements but allows the cluster to do more if
> that's
> > > the
> > > > > default. However, that would represent a much more significant and
> > > > > invasive change, and honestly I think it is more likely to confuse
> > > users.
> > > >
> > > > There always need to be topic defaults, though.  If we add a foobar
> > > > configuration for topics, existing topics will need to get
> grandfathered
> > > > in with a default foobar.  And they won't be able to set min and max
> > > > ranges, because foobars didn't exist back when the old topics were
> > > > created.
> > > >
> > > > >
> > > > > @Dan, regarding compatibility, this changes behavior without
> revving
> > > the
> > > > > 

Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient

2017-12-13 Thread Colin McCabe
On Tue, Dec 12, 2017, at 19:02, Ewen Cheslack-Postava wrote:
> re: API versions, I actually wasn't sure if we needed it or not. I'm fine
> if people would prefer just bumping it, but I was actually curious if we
> could get away without bumping it. I don't know the behavior of the
> broker code paths for this well enough to know what types of errors those
> non-null assertions get converted into.

There's no advantage to trying to keep the API version number the same,
though.  Since we have bidirectional client compatibility now, the
clients and the server will just negotiate whatever version they need. 
New clients can still talk to older brokers that don't support this
feature.

If you don't bump the API version, the best case scenario is that you
get a disconnect exception and the end-user is left confused about why. 
The worse-case scenario is that you crash the broker (but probably not,
since you'd just get an NPE in serde, I think).  If you bump the version
number, you can provide a proper UnsupportedVersionException when the
feature is not supported.

> For the return type, NewTopic seems reasonable and kind of intuitive --
> basically a description of the NewTopic you would get. The only reason I
> would be wary of reusing it is that what we don't want people doing is
> taking that and passing it directly into AdminClient.createTopics since
> we don't want them explicitly overriding all the defaults.

Yeah.  Another thing is that NewTopic has a lot of stuff related to
replication that doesn't seem relevant here.  For example, when creating
NewTopic, you have the option of either setting replicationFactor, or
setting up a specific replica assignment.  Why not just return
org.apache.kafka.clients.admin.Config like describeConfigs does?

best,
Colin

> 
> -Ewen
> 
> On Tue, Dec 12, 2017 at 2:32 PM, dan  wrote:
> 
> > Colin/Ewen,
> >
> > i will add changes to bump the API version.
> >
> > any preferences on the return type for the new method? tbh it seems like
> > returning a NewTopic could make sense because the ConfigResource for a
> > TOPIC type does not let me encode `numPartitions`
> >
> > thanks
> > dan
> >
> > On Mon, Dec 11, 2017 at 7:22 PM, Colin McCabe  wrote:
> >
> > > Hi Dan,
> > >
> > > The KIP looks good overall.
> > >
> > > On Mon, Dec 11, 2017, at 18:28, Ewen Cheslack-Postava wrote:
> > > > I think the key point is when the kafka admin and user creating topics
> > > > differ. I think a more realistic example of Dan's point (2) is for
> > > > retention. I know that realistically, admins aren't just going to
> > > > randomly
> > > > drop the broker defaults from 1w to 1d without warning anyone (they'd
> > > > likely be fired...). But as a user, I may not know the broker configs,
> > if
> > > > admins have overridden them, etc. I may want a *minimum* of, e.g., 2d.
> > > > But if the broker defaults are higher such that the admins are
> > confident
> > > the
> > > > cluster can handle 1w, I'd rather just fall back on the default value.
> > >
> > > Right.  I think this API addresses a similar set of use-cases as adding
> > > the "validateOnly" boolean for createTopics.  You shouldn't have to
> > > create a topic to know whether it was possible to create it, or what the
> > > retention will end up being, etc. etc.
> > >
> > > > Now, there's arguably a better solution for that case -- allow topic
> > > > configs to express a *minimum* value (or maximum depending on the
> > > > particular config), with the broker config taking precedence if it has
> > a
> > > > smaller value (or larger in the case of maximums). This lets you
> > express
> > > > your minimum requirements but allows the cluster to do more if that's
> > the
> > > > default. However, that would represent a much more significant and
> > > > invasive change, and honestly I think it is more likely to confuse
> > users.
> > >
> > > There always need to be topic defaults, though.  If we add a foobar
> > > configuration for topics, existing topics will need to get grandfathered
> > > in with a default foobar.  And they won't be able to set min and max
> > > ranges, because foobars didn't exist back when the old topics were
> > > created.
> > >
> > > >
> > > > @Dan, regarding compatibility, this changes behavior without revving
> > the
> > > > request version number, which normally we only do for things that are
> > > > reasonably considered bugfixes or were it has no compatibility
> > > > implications. In this case, older brokers talking to newer AdminClients
> > > > will presumably return some error. Do we know what the non-null
> > assertion
> > > > gets converted to and if we're happy with the behavior (i.e. will
> > > > applications be able to do something reasonable, distinguish it from
> > some
> > > > completely unrelated error, etc)? Similarly, it's obviously only one
> > > > implementation using the KIP-4 APIs, but do we know what client-side
> > > > validation AdminClient is already 

Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2017-12-13 Thread Tom Bentley
Hi Jun and Ted,

Jun, you're right that needing one watcher per reassigned partition
presents a scalability problem, and using a separate notification path
solves that. I also agree that it makes sense to prevent users from using
both methods on the same reassignment.

Ted, naming the reassignments like mytopic-42 was simpler while I was
proposing a watcher-per-reassignment (I'd have needed a child watcher on
/admin/reassignments and also on /admin/reassignments/mytopic). Using the
separate notification path means I don't need any watchers in the
/admin/reassignments subtree, so switching to /admin/reassignments/mytopic/42
would work, and avoid /admin/reassignments having a very large number of
child nodes. On the other hand it also means I have to create and delete
the topic nodes (e.g. /admin/reassignments/mytopic), which incurs the cost
of extra round trips to zookeeper. I suppose that since reassignment is
generally a slow process it makes little difference if we increase the
latency of the interactions with zookeeper.

I have updated the KIP with these improvements, and a more detailed
description of exactly how we would manage these znodes.

Reading the algorithm in KafkaController.onPartitionReassignment(), it
seems that it would be suboptimal for changing reassignments in-flight.
Consider an initial assignment of [1,2], reassigned to [2,3] and then
changed to [2,4]. Broker 3 will remain in the assigned replicas until
broker 4 is in sync, even though 3 wasn't actually one of the original
assigned replicas and is no longer a new assigned replica. I think this
also affects the case where the reassignment is cancelled
([1,2]->[2,3]->[1,2]): We again have to wait for 3 to catch up, even though
its replica will then be deleted.

Should we seek to improve this algorithm in this KIP, or leave that as a
later optimisation?

Cheers,

Tom

On 11 December 2017 at 21:31, Jun Rao  wrote:

> Another question is on the compatibility. Since now there are 2 ways of
> specifying a partition reassignment, one under /admin/reassign_partitions
> and the other under /admin/reassignments, we probably want to prevent the
> same topic being reassigned under both paths at the same time?
> Thanks,
>
> Jun
>
>
>
> On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao  wrote:
>
> > Hi, Tom,
> >
> > Thanks for the KIP. It definitely addresses one of the pain points in
> > partition reassignment. Another issue that it also addresses is the ZK
> node
> > size limit when writing the reassignment JSON.
> >
> > My only concern is that the KIP needs to create one watcher per
> reassigned
> > partition. This could add overhead in ZK and complexity for debugging
> when
> > lots of partitions are being reassigned simultaneously. We could
> > potentially improve this by introducing a separate ZK path for change
> > notification as we do for configs. For example, every time we change the
> > assignment for a set of partitions, we could further write a sequential
> > node /admin/reassignment_changes/[change_x]. That way, the controller
> > only needs to watch the change path. Once a change is triggered, the
> > controller can read everything under /admin/reassignments/.
> >
> > Jun
> >
> >
> > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley 
> wrote:
> >
> >> Hi,
> >>
> >> This is still very new, but I wanted some quick feedback on a
> preliminary
> >> KIP which could, I think, help with providing an AdminClient API for
> >> partition reassignment.
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%
> >> 3A+Interruptible+Partition+Reassignment
> >>
> >> I wasn't sure whether to start fleshing out a whole AdminClient API in
> >> this
> >> KIP (which would make it very big, and difficult to read), or whether to
> >> break it down into smaller KIPs (which makes it easier to read and
> >> implement in pieces, but harder to get a high-level picture of the
> >> ultimate
> >> destination). For now I've gone for a very small initial KIP, but I'm
> >> happy
> >> to sketch the bigger picture here if people are interested.
> >>
> >> Cheers,
> >>
> >> Tom
> >>
> >
> >
>


On 11 December 2017 at 21:31, Jun Rao  wrote:

> Another question is on the compatibility. Since now there are 2 ways of
> specifying a partition reassignment, one under /admin/reassign_partitions
> and the other under /admin/reassignments, we probably want to prevent the
> same topic being reassigned under both paths at the same time?
> Thanks,
>
> Jun
>
>
>
> On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao  wrote:
>
> > Hi, Tom,
> >
> > Thanks for the KIP. It definitely addresses one of the pain points in
> > partition reassignment. Another issue that it also addresses is the ZK
> node
> > size limit when writing the reassignment JSON.
> >
> > My only concern is that the KIP needs to create one watcher per
> reassigned
> > partition. This could add overhead in ZK and complexity for debugging
> 

[jira] [Created] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2017-12-13 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-6360:
-

 Summary: RocksDB segments not removed when store is closed causes 
re-initialization to fail
 Key: KAFKA-6360
 URL: https://issues.apache.org/jira/browse/KAFKA-6360
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Damian Guy
Assignee: Damian Guy
Priority: Blocker
 Fix For: 1.1.0


When a store is re-initialized it is first closed, before it is opened again. 
When this happens the segments in the {{Segments}} class are closed, but they 
are not removed from the list of segments. So when the store is re-initialized 
the old closed segments are used. This results in:
{code}
[2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task 
[1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-24:  
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)
org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
at 
org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
at 
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
at 
org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
at 
org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
at 
org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
at 
org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6359) Work for KIP-236

2017-12-13 Thread Tom Bentley (JIRA)
Tom Bentley created KAFKA-6359:
--

 Summary: Work for KIP-236
 Key: KAFKA-6359
 URL: https://issues.apache.org/jira/browse/KAFKA-6359
 Project: Kafka
  Issue Type: Improvement
Reporter: Tom Bentley
Assignee: Tom Bentley
Priority: Minor


This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4321: KAFKA-6342 : Move workaround for JSON parsing of n...

2017-12-13 Thread umesh9794
GitHub user umesh9794 opened a pull request:

https://github.com/apache/kafka/pull/4321

KAFKA-6342 : Move workaround for JSON parsing of non-escaped strings

This PR moves the JSON parsing workaround of [this 
PR](https://github.com/apache/kafka/pull/4303) to new method and uses this 
method in `ZkClient` etc. classes.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/umesh9794/kafka KAFKA-6342

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4321


commit ec8abab0629ecdea4a3a5970653e4c88025b8dfd
Author: umesh chaudhary 
Date:   2017-12-13T10:51:23Z

Initial Commit




---


[GitHub] kafka pull request #4320: [WIP] Add logs to debug testHighConcurrencyModific...

2017-12-13 Thread omkreddy
GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/4320

[WIP] Add logs to debug testHighConcurrencyModificationOfResourceAcls test 
case

Not able to reproduce locally. add few logs to check on jenkins
looks like some synchronization issue

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka KAFKA-6335

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4320


commit 32ca396f676136f6964ecd65ba1ba93badda6955
Author: Manikumar Reddy 
Date:   2017-12-13T10:26:35Z

[WIP] Add logs to debug




---


[jira] [Created] (KAFKA-6358) Per topic producer/fetch_consumer/fetch_follower metrics

2017-12-13 Thread Ricardo Bartolome (JIRA)
Ricardo Bartolome created KAFKA-6358:


 Summary: Per topic producer/fetch_consumer/fetch_follower metrics
 Key: KAFKA-6358
 URL: https://issues.apache.org/jira/browse/KAFKA-6358
 Project: Kafka
  Issue Type: Wish
  Components: metrics
Affects Versions: 1.0.0
Reporter: Ricardo Bartolome
Priority: Minor


We are using the following JMX beans to monitor Kafka 1.0.0:

{code}
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
Mean
50thPercentile
...
99thPercentile

kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer
Count

kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower
Count
{code}

There are more, but this provide an idea of what we are using in order to get 
produce/fetch operations on a per-broker basis. Nevertheless, in order to 
identify abusing consumers/clients in our kafka cluster, we would appreciate to 
have these metrics in a per-topic basis.

As example of per-topic metrics we have:
{code}
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=.*
{code}

Where we have a per-topic bean with a "Count" attribute that we can query. That 
way we can know which topics are ingesting more data and which ones less data. 
We can't do that with the metric explained above.

Would you consider a change in an upcoming Kafka version as a feature request?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6357) Return nonzero code in kafka-consumer-groups.sh tool in case of error

2017-12-13 Thread Rinat Shigapov (JIRA)
Rinat Shigapov created KAFKA-6357:
-

 Summary: Return nonzero code in kafka-consumer-groups.sh tool in 
case of error
 Key: KAFKA-6357
 URL: https://issues.apache.org/jira/browse/KAFKA-6357
 Project: Kafka
  Issue Type: Improvement
  Components: tools
 Environment: kafka_2.12-0.11.0.0
Reporter: Rinat Shigapov


Use case that triggered that issue:

kafka-consumer-groups.sh can reset offset if there is no active consumer in the 
group. Otherwise it just prints error message about this situation and returns 
zero error code.

Expected behaviour: nonzero code should be returned on error. Than proper 
scripting around kafka-consumer-groups.sh would be possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6356) UnknownTopicOrPartitionException & NotLeaderForPartitionException and log deletion happening with retention bytes kept at -1.

2017-12-13 Thread kaushik srinivas (JIRA)
kaushik srinivas created KAFKA-6356:
---

 Summary: UnknownTopicOrPartitionException & 
NotLeaderForPartitionException and log deletion happening with retention bytes 
kept at -1.
 Key: KAFKA-6356
 URL: https://issues.apache.org/jira/browse/KAFKA-6356
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
 Environment: Cent OS 7.2,
HDD : 2Tb,
CPUs: 56 cores,
RAM : 256GB
Reporter: kaushik srinivas
 Attachments: configs.txt, stderr_b0, stderr_b1, stderr_b2, stdout_b0, 
stdout_b1, stdout_b2, topic_description, topic_offsets

Facing issues in kafka topic with partitions and replication factor of 3.

Config used :
No of partitions : 20
replication factor : 3
No of brokers : 3
Memory for broker : 32GB
Heap for broker : 12GB

Producer is run to produce data for 20 partitions of a single topic.
But observed that partitions for which the leader is one of the 
broker(broker-1), the offsets are never incremented and also we see log file 
with 0MB size in the broker disk.

Seeing below error in the brokers :

error 1:
2017-12-13 07:11:11,191] ERROR [ReplicaFetcherThread-0-2], Error for partition 
[test2,5] to broker 
2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. (kafka.server.ReplicaFetcherThread)

error 2:
[2017-12-11 12:19:41,599] ERROR [ReplicaFetcherThread-0-2], Error for partition 
[test1,13] to broker 
2:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)

Attaching,
1. error and std out files of all the brokers.
2. kafka config used.
3. offsets and topic description.

Retention bytes was kept to -1 and retention period 96 hours.
But still observing some of the log files deleting at the broker,

from logs :
[2017-12-11 12:20:20,586] INFO Deleting index 
/var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12/.timeindex
 (kafka.log.TimeIndex)
[2017-12-11 12:20:20,587] INFO Deleted log for partition [test1,12] in 
/var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12.
 (kafka.log.LogManager)

We are expecting the logs to be never delete if retention bytes set to -1.






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)