[GitHub] kafka pull request #2894: [KAFKA-5092] [WIP] changed ProducerRecord interfac...
Github user simplesteph closed the pull request at: https://github.com/apache/kafka/pull/2894 ---
[GitHub] kafka pull request #4323: KAFKA-5849: Add process stop, round trip workload,...
GitHub user cmccabe opened a pull request: https://github.com/apache/kafka/pull/4323 KAFKA-5849: Add process stop, round trip workload, partitioned test * Implement process stop faults via SIGSTOP / SIGCONT * Implement RoundTripWorkload, which both sends messages, and confirms that they are received at least once. * Allow Trogdor tasks to block until other Trogdor tasks are complete. * Add CreateTopicsWorker, which can be a building block for a lot of tests. * Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON. * Implement some fault injection tests in round_trip_workload_test.py You can merge this pull request into a Git repository by running: $ git pull https://github.com/cmccabe/kafka KAFKA-5849 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4323 commit bd7c5ae285bcc62e6ee526de55343679e1e1 Author: Colin P. MccabeDate: 2017-12-14T01:31:00Z KAFKA-5849: Add process stop faults, round trip workload, partitioned produce-consume test * Implement process stop faults via SIGSTOP / SIGCONT * Implement RoundTripWorkload, which both sends messages, and confirms that they are received at least once. * Allow Trogdor tasks to block until other Trogdor tasks are complete. * Add CreateTopicsWorker, which can be a building block for a lot of tests. * Simplify how TaskSpec subclasses in ducktape serialize themselves to JSON. * Implement some fault injection tests in round_trip_workload_test.py ---
[GitHub] kafka pull request #4106: KAFKA-5849: Add partitioned produce consume test
Github user cmccabe closed the pull request at: https://github.com/apache/kafka/pull/4106 ---
[jira] [Created] (KAFKA-6361) Fast leader fail over can lead to log divergence between replica and follower
Jason Gustafson created KAFKA-6361: -- Summary: Fast leader fail over can lead to log divergence between replica and follower Key: KAFKA-6361 URL: https://issues.apache.org/jira/browse/KAFKA-6361 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson We have observed an edge case in the replication failover logic which can cause a replica to permanently fall out of sync with the leader or, in the worst case, actually have localized divergence between logs. This occurs in spite of the improved truncation logic from KIP-101. Suppose we have brokers A and B. Initially A is the leader in epoch 1. It appends two batches: one in the range (0, 10) and the other in the range (11, 20). The first one successfully replicates to B, but the second one does not. In other words, the logs on the brokers look like this: {code} Broker A: 0: offsets [0, 10], leader epoch: 1 1: offsets [11, 20], leader epoch: 1 Broker B: 0: offsets [0, 10], leader epoch: 1 {code} Broker A then has a zk session expiration and broker B is elected with epoch 2. It appends a new batch with offsets (11, n) to its local log. So we now have this: {code} Broker A: 0: offsets [0, 10], leader epoch: 1 1: offsets [11, 20], leader epoch: 1 Broker B: 0: offsets [0, 10], leader epoch: 1 1: offsets: [11, n], leader epoch: 2 {code} Normally we expect broker A to truncate to offset 11 on becoming the follower, but before it is able to do so, broker B has its own zk session expiration and broker A again becomes leader, now with epoch 3. It then appends a new entry in the range (21, 30). The updated logs look like this: {code} Broker A: 0: offsets [0, 10], leader epoch: 1 1: offsets [11, 20], leader epoch: 1 2: offsets: [21, 30], leader epoch: 3 Broker B: 0: offsets [0, 10], leader epoch: 1 1: offsets: [11, n], leader epoch: 2 {code} Now what happens next depends on the last offset of the batch appended in epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch request to broker A with epoch 2. Broker A will respond that epoch 2 ends at offset 21. There are three cases: 1) n < 20: In this case, broker B will not do any truncation. It will begin fetching from offset n, which will ultimately cause an out of order offset error because broker A will return the full batch beginning from offset 11 which broker B will be unable to append. 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 and everything will appear fine though the logs have actually diverged. 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in the middle of the batch, it will truncate all the way to offset 10. It can begin fetching from offset 11 and everything is fine. The case we have actually seen is the first one. The second one would likely go unnoticed in practice and everything is fine in the third case. To workaround the issue, we deleted the active segment on the replica which allowed it to re-replicate consistently from the leader. I'm not sure the best solution for this scenario. Maybe if the leader isn't aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} instead of using the offset of the next highest epoch. That would cause the follower to truncate using its high watermark. Or perhaps instead of doing so, it could send another OffsetForLeaderEpoch request at the next previous cached epoch and then truncate using that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Re: [DISCUSS] KIP-210: Provide for custom error handling when Kafka Streams fails to produce
Metrics: this is a good point. Note that currently we have two metrics for `skipped-records` on different levels: 1) on the highest level, the thread-level, we have a `skipped-records`, that records all the skipped records due to deserialization errors. 2) on the lower processor-node level, we have a `skippedDueToDeserializationError`, that records the skipped records on that specific source node due to deserialization errors. So you can see that 1) does not cover any other scenarios and can just be thought of as an aggregate of 2) across all the tasks' source nodes. However, there are other places that can cause a record to be dropped, for example: 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be dropped due to window elapsed. 2) KIP-210: records could be dropped on the producer side. 3) records could be dropped during user-customized processing on errors. I think improving the skipped records of all these scenarios itself worth having another KIP; so I'd suggest we do not drag this KIP-210 into this. Guozhang On Wed, Dec 13, 2017 at 3:45 PM, Matthias J. Saxwrote: > One more after thought: should we add a metric for this? We also have a > metric for `skippedDueToDeserializationError-rate` ? > > > -Matthias > > > > On 12/6/17 7:54 AM, Bill Bejeck wrote: > > Thanks for the clearly written KIP, no further comments from my end. > > > > -Bill > > > > On Wed, Dec 6, 2017 at 9:52 AM, Matt Farmer wrote: > > > >> There is already a vote thread for this KIP. I can bump it so that it’s > >> towards the top of your inbox. > >> > >> With regard to your concerns: > >> > >> 1) We do not have the "ProductionExceptionHandler" interface defined in > the > >> wiki page, thought it is sort of clear that it is a one-function > interface > >> with record and exception. Could you add it? > >> > >> > >> It is defined, it’s just not defined using a code snippet. The KIP > reads as > >> follows: > >> > >> === > >> > >> A public interface named ProductionExceptionHandler with a single > method, > >> handle, that has the following signature: > >> > >>- ProductionExceptionHandlerResponse handle(ProducerRecord >>byte[]> record, Exception exception) > >> > >> > >> === > >> > >> If you’d like me to add a code snippet illustrating this that’s simple > for > >> me to do, but it seemed superfluous. > >> > >> 2) A quick question about your example code: where would be the "logger" > >> object be created? > >> > >> > >> SLF4J loggers are typically created as a class member in the class. Such > >> as: > >> > >> private Logger logger = LoggerFactory.getLogger(HelloWorld.class); > >> > >> I omit that in my implementation examples for brevity. > >> > >> On December 6, 2017 at 2:14:58 AM, Guozhang Wang (wangg...@gmail.com) > >> wrote: > >> > >> Hello Matt, > >> > >> Thanks for writing up the KIP. I made a pass over it and here is a few > >> minor comments. I think you can consider starting a voting thread for > this > >> KIP while addressing them. > >> > >> 1) We do not have the "ProductionExceptionHandler" interface defined in > the > >> wiki page, thought it is sort of clear that it is a one-function > interface > >> with record and exception. Could you add it? > >> > >> 2) A quick question about your example code: where would be the "logger" > >> object be created? Note that the implementation of this interface have > to > >> give a non-param constructor, or as a static field of the class but in > that > >> case you would not be able to log which instance is throwing this error > (we > >> may have multiple producers within a single instance, even within a > >> thread). Just a reminder to consider in your implementation. > >> > >> > >> Guozhang > >> > >> On Tue, Dec 5, 2017 at 3:15 PM, Matthias J. Sax > >> wrote: > >> > >>> Thanks a lot for the update! Great write-up! Very clearly explained > what > >>> the change will look like! > >>> > >>> Looks good to me. No further comments from my side. > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 12/5/17 9:14 AM, Matt Farmer wrote: > I have updated this KIP accordingly. > > Can you please take a look and let me know if what I wrote looks > >> correct > >>> to > you? > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>> 210+-+Provide+for+custom+error+handling++when+Kafka+ > >>> Streams+fails+to+produce > > Thanks! > > Matt > > > On December 4, 2017 at 9:39:13 PM, Matt Farmer (m...@frmr.me) wrote: > > Hey Matthias, thanks for getting back to me. > > That's fine. But if we add it to `test` package, we don't need to talk > about it in the KIP. `test` is not public API. > > Yes, that makes sense. It was in the KIP originally because I was, at > >> one > point, planning on including it. We can remove it now that we’ve > >> decided > >>> we > won’t include it in the public
Re: [DISCUSS] KIP-210: Provide for custom error handling when Kafka Streams fails to produce
One more after thought: should we add a metric for this? We also have a metric for `skippedDueToDeserializationError-rate` ? -Matthias On 12/6/17 7:54 AM, Bill Bejeck wrote: > Thanks for the clearly written KIP, no further comments from my end. > > -Bill > > On Wed, Dec 6, 2017 at 9:52 AM, Matt Farmerwrote: > >> There is already a vote thread for this KIP. I can bump it so that it’s >> towards the top of your inbox. >> >> With regard to your concerns: >> >> 1) We do not have the "ProductionExceptionHandler" interface defined in the >> wiki page, thought it is sort of clear that it is a one-function interface >> with record and exception. Could you add it? >> >> >> It is defined, it’s just not defined using a code snippet. The KIP reads as >> follows: >> >> === >> >> A public interface named ProductionExceptionHandler with a single method, >> handle, that has the following signature: >> >>- ProductionExceptionHandlerResponse handle(ProducerRecord >byte[]> record, Exception exception) >> >> >> === >> >> If you’d like me to add a code snippet illustrating this that’s simple for >> me to do, but it seemed superfluous. >> >> 2) A quick question about your example code: where would be the "logger" >> object be created? >> >> >> SLF4J loggers are typically created as a class member in the class. Such >> as: >> >> private Logger logger = LoggerFactory.getLogger(HelloWorld.class); >> >> I omit that in my implementation examples for brevity. >> >> On December 6, 2017 at 2:14:58 AM, Guozhang Wang (wangg...@gmail.com) >> wrote: >> >> Hello Matt, >> >> Thanks for writing up the KIP. I made a pass over it and here is a few >> minor comments. I think you can consider starting a voting thread for this >> KIP while addressing them. >> >> 1) We do not have the "ProductionExceptionHandler" interface defined in the >> wiki page, thought it is sort of clear that it is a one-function interface >> with record and exception. Could you add it? >> >> 2) A quick question about your example code: where would be the "logger" >> object be created? Note that the implementation of this interface have to >> give a non-param constructor, or as a static field of the class but in that >> case you would not be able to log which instance is throwing this error (we >> may have multiple producers within a single instance, even within a >> thread). Just a reminder to consider in your implementation. >> >> >> Guozhang >> >> On Tue, Dec 5, 2017 at 3:15 PM, Matthias J. Sax >> wrote: >> >>> Thanks a lot for the update! Great write-up! Very clearly explained what >>> the change will look like! >>> >>> Looks good to me. No further comments from my side. >>> >>> >>> -Matthias >>> >>> >>> On 12/5/17 9:14 AM, Matt Farmer wrote: I have updated this KIP accordingly. Can you please take a look and let me know if what I wrote looks >> correct >>> to you? https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 210+-+Provide+for+custom+error+handling++when+Kafka+ >>> Streams+fails+to+produce Thanks! Matt On December 4, 2017 at 9:39:13 PM, Matt Farmer (m...@frmr.me) wrote: Hey Matthias, thanks for getting back to me. That's fine. But if we add it to `test` package, we don't need to talk about it in the KIP. `test` is not public API. Yes, that makes sense. It was in the KIP originally because I was, at >> one point, planning on including it. We can remove it now that we’ve >> decided >>> we won’t include it in the public API. Understood. That makes sense. We should explain this clearly in the KIP and maybe log all other following exceptions at DEBUG level? I thought it was clear in the KIP, but I can go back and double check >> my wording and revise it to try and make it clearer. I’ll take a look at doing more work on the KIP and the Pull Request tomorrow. Thanks again! On December 4, 2017 at 5:50:33 PM, Matthias J. Sax ( >>> matth...@confluent.io) wrote: Hey, About your questions: >>> Acknowledged, so is ProducerFencedException the only kind of >>> exception I >>> need to change my behavior on? Or are there other types I need to check? Is >>> there a comprehensive list somewhere? I cannot think if any other atm. We should list all fatal exceptions >> for which we don't call the handler and explain why (exception is "global" and will affect all other records, too | ProducerFenced is >> self-healing). We started to collect and categorize exception here (not completed >> yet): https://cwiki.apache.org/confluence/display/KAFKA/ >>> Kafka+Streams+Architecture#KafkaStreamsArchitecture-TypesofExceptions : This list should be a good starting point though. > I include it in the test package because I have tests that assert
[GitHub] kafka pull request #4322: KAFKA-6126: Remove unnecessary topics created chec...
GitHub user mjsax opened a pull request: https://github.com/apache/kafka/pull/4322 KAFKA-6126: Remove unnecessary topics created check ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/kafka kafka-6126-remove-topic-check-on-rebalance-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4322.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4322 commit c0a8b4868cf979e47a25d5e837b26e62e2aac212 Author: Matthias J. SaxDate: 2017-12-13T21:06:18Z KAFKA-6126: Remove unnecessary topics created check ---
[GitHub] kafka pull request #4308: catch and log exceptions thrown in waiters added t...
Github user xvrl closed the pull request at: https://github.com/apache/kafka/pull/4308 ---
Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment
Tom: bq. create a znode /admin/reassignments/$topic-$partition Looks like the tree structure above should be: /admin/reassignments/$topic/$partition bq. The controller removes /admin/reassignment/$topic/$partition Note the lack of 's' for reassignment. It would be good to make zookeeper paths consistent. Thanks On Wed, Dec 13, 2017 at 9:49 AM, Tom Bentleywrote: > Hi Jun and Ted, > > Jun, you're right that needing one watcher per reassigned partition > presents a scalability problem, and using a separate notification path > solves that. I also agree that it makes sense to prevent users from using > both methods on the same reassignment. > > Ted, naming the reassignments like mytopic-42 was simpler while I was > proposing a watcher-per-reassignment (I'd have needed a child watcher on > /admin/reassignments and also on /admin/reassignments/mytopic). Using the > separate notification path means I don't need any watchers in the > /admin/reassignments subtree, so switching to /admin/reassignments/mytopic/ > 42 > would work, and avoid /admin/reassignments having a very large number of > child nodes. On the other hand it also means I have to create and delete > the topic nodes (e.g. /admin/reassignments/mytopic), which incurs the cost > of extra round trips to zookeeper. I suppose that since reassignment is > generally a slow process it makes little difference if we increase the > latency of the interactions with zookeeper. > > I have updated the KIP with these improvements, and a more detailed > description of exactly how we would manage these znodes. > > Reading the algorithm in KafkaController.onPartitionReassignment(), it > seems that it would be suboptimal for changing reassignments in-flight. > Consider an initial assignment of [1,2], reassigned to [2,3] and then > changed to [2,4]. Broker 3 will remain in the assigned replicas until > broker 4 is in sync, even though 3 wasn't actually one of the original > assigned replicas and is no longer a new assigned replica. I think this > also affects the case where the reassignment is cancelled > ([1,2]->[2,3]->[1,2]): We again have to wait for 3 to catch up, even though > its replica will then be deleted. > > Should we seek to improve this algorithm in this KIP, or leave that as a > later optimisation? > > Cheers, > > Tom > > On 11 December 2017 at 21:31, Jun Rao wrote: > > > Another question is on the compatibility. Since now there are 2 ways of > > specifying a partition reassignment, one under /admin/reassign_partitions > > and the other under /admin/reassignments, we probably want to prevent the > > same topic being reassigned under both paths at the same time? > > Thanks, > > > > Jun > > > > > > > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao wrote: > > > > > Hi, Tom, > > > > > > Thanks for the KIP. It definitely addresses one of the pain points in > > > partition reassignment. Another issue that it also addresses is the ZK > > node > > > size limit when writing the reassignment JSON. > > > > > > My only concern is that the KIP needs to create one watcher per > > reassigned > > > partition. This could add overhead in ZK and complexity for debugging > > when > > > lots of partitions are being reassigned simultaneously. We could > > > potentially improve this by introducing a separate ZK path for change > > > notification as we do for configs. For example, every time we change > the > > > assignment for a set of partitions, we could further write a sequential > > > node /admin/reassignment_changes/[change_x]. That way, the controller > > > only needs to watch the change path. Once a change is triggered, the > > > controller can read everything under /admin/reassignments/. > > > > > > Jun > > > > > > > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley > > wrote: > > > > > >> Hi, > > >> > > >> This is still very new, but I wanted some quick feedback on a > > preliminary > > >> KIP which could, I think, help with providing an AdminClient API for > > >> partition reassignment. > > >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236% > > >> 3A+Interruptible+Partition+Reassignment > > >> > > >> I wasn't sure whether to start fleshing out a whole AdminClient API in > > >> this > > >> KIP (which would make it very big, and difficult to read), or whether > to > > >> break it down into smaller KIPs (which makes it easier to read and > > >> implement in pieces, but harder to get a high-level picture of the > > >> ultimate > > >> destination). For now I've gone for a very small initial KIP, but I'm > > >> happy > > >> to sketch the bigger picture here if people are interested. > > >> > > >> Cheers, > > >> > > >> Tom > > >> > > > > > > > > > > > On 11 December 2017 at 21:31, Jun Rao wrote: > > > Another question is on the compatibility. Since now there are 2 ways of > > specifying a partition reassignment, one under /admin/reassign_partitions > > and
Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient
On Wed, Dec 13, 2017, at 10:00, dan wrote: > > Why not just return > > org.apache.kafka.clients.admin.Config like describeConfigs does? > > brokers have a `num.partitions` config that does not map to a valid > `Config` entry for a topic. Hi Dan, Sorry if I'm misunderstanding something, but why not map it to num.partitions? > > another added benefit to using `NewTopic` may be (future kip) having the > cluster return the actual replica mappings it would create (i have no > idea if this is actually possible) A better way of doing that would probably be extending CreateTopicsRequest so that it returns partition assignment information to the caller. Then using validOnly = true to get this information. Actually, come to think of it, maybe we should be doing that for this KIP too. Why not have CreateTopicsRequest return the config that was used, plus the partition assignment that was made? We don't create topics that often, so the extra space on the wire should not be a concern. best, Colin > > dan > > On Wed, Dec 13, 2017 at 9:55 AM, Colin McCabewrote: > > > On Tue, Dec 12, 2017, at 19:02, Ewen Cheslack-Postava wrote: > > > re: API versions, I actually wasn't sure if we needed it or not. I'm fine > > > if people would prefer just bumping it, but I was actually curious if we > > > could get away without bumping it. I don't know the behavior of the > > > broker code paths for this well enough to know what types of errors those > > > non-null assertions get converted into. > > > > There's no advantage to trying to keep the API version number the same, > > though. Since we have bidirectional client compatibility now, the > > clients and the server will just negotiate whatever version they need. > > New clients can still talk to older brokers that don't support this > > feature. > > > > If you don't bump the API version, the best case scenario is that you > > get a disconnect exception and the end-user is left confused about why. > > The worse-case scenario is that you crash the broker (but probably not, > > since you'd just get an NPE in serde, I think). If you bump the version > > number, you can provide a proper UnsupportedVersionException when the > > feature is not supported. > > > > > For the return type, NewTopic seems reasonable and kind of intuitive -- > > > basically a description of the NewTopic you would get. The only reason I > > > would be wary of reusing it is that what we don't want people doing is > > > taking that and passing it directly into AdminClient.createTopics since > > > we don't want them explicitly overriding all the defaults. > > > > Yeah. Another thing is that NewTopic has a lot of stuff related to > > replication that doesn't seem relevant here. For example, when creating > > NewTopic, you have the option of either setting replicationFactor, or > > setting up a specific replica assignment. Why not just return > > org.apache.kafka.clients.admin.Config like describeConfigs does? > > > > best, > > Colin > > > > > > > > -Ewen > > > > > > On Tue, Dec 12, 2017 at 2:32 PM, dan wrote: > > > > > > > Colin/Ewen, > > > > > > > > i will add changes to bump the API version. > > > > > > > > any preferences on the return type for the new method? tbh it seems > > like > > > > returning a NewTopic could make sense because the ConfigResource for a > > > > TOPIC type does not let me encode `numPartitions` > > > > > > > > thanks > > > > dan > > > > > > > > On Mon, Dec 11, 2017 at 7:22 PM, Colin McCabe > > wrote: > > > > > > > > > Hi Dan, > > > > > > > > > > The KIP looks good overall. > > > > > > > > > > On Mon, Dec 11, 2017, at 18:28, Ewen Cheslack-Postava wrote: > > > > > > I think the key point is when the kafka admin and user creating > > topics > > > > > > differ. I think a more realistic example of Dan's point (2) is for > > > > > > retention. I know that realistically, admins aren't just going to > > > > > > randomly > > > > > > drop the broker defaults from 1w to 1d without warning anyone > > (they'd > > > > > > likely be fired...). But as a user, I may not know the broker > > configs, > > > > if > > > > > > admins have overridden them, etc. I may want a *minimum* of, e.g., > > 2d. > > > > > > But if the broker defaults are higher such that the admins are > > > > confident > > > > > the > > > > > > cluster can handle 1w, I'd rather just fall back on the default > > value. > > > > > > > > > > Right. I think this API addresses a similar set of use-cases as > > adding > > > > > the "validateOnly" boolean for createTopics. You shouldn't have to > > > > > create a topic to know whether it was possible to create it, or what > > the > > > > > retention will end up being, etc. etc. > > > > > > > > > > > Now, there's arguably a better solution for that case -- allow > > topic > > > > > > configs to express a *minimum* value (or maximum depending on the > > > > > > particular config), with the broker config taking
Re: [DISCUSS] KIP-222 - Add "describe consumer group" to KafkaAdminClient
On Tue, Dec 12, 2017, at 09:39, Jason Gustafson wrote: > Hi Colin, > > They do share the same namespace. We have a "protocol type" field in the > JoinGroup request to make sure that all members are of the same kind. Hi Jason, Thanks. That makes sense. > Very roughly what I was thinking is something like this. First we introduce an > interface for deserialization: > > interface GroupMetadataDeserializer{ > String protocolType(); > Metadata desrializeMetadata(ByteBuffer); > Assignment deserializeAssignment(ByteBuffer); > } > > Then we add some kind of generic container: > > class MemberMetadata { > Metadata metadata; > Assignment assignment; > } > > Then we have two APIs: one generic and one specific to consumer groups: > > Map > describeGroup(String groupId, > GroupMetadataDeserializer deserializer); > > Map describeConsumerGroup(String groupId); > > (This is just a sketch, so obviously we can change them to use futures or > to batch or whatever.) > > I think it would be fine to not provide a connect-specific API since this > usage will probably be limited to Connect itself. Yeah, it probably makes sense to have a separation between describeGroup and describeConsumerGroup. We will have to be pretty careful with cross-version compatibility in describeConsumerGroup. It should be possible for an old client to talk to a new broker, and a new client to talk to an old broker. So we should be prepared to read data in multiple formats. I'm not sure if we need to have a 'deserializer' argument to describeGroup. We can just let them access a byte array, right? Theoretically they might also just want to check for the presence or absence of a group, but not deserialize anything. best, Colin > > Thanks, > Jason > > > On Mon, Dec 11, 2017 at 9:15 PM, Colin McCabe wrote: > > > Sorry... this is probably a silly question, but do Kafka Connect groups > > share a namespace with consumer groups? If we had a separate API for > > Kafka Connect groups vs. Consumer groups, would that make sense? Or > > should we unify them? > > > > best, > > Colin > > > > > > On Mon, Dec 11, 2017, at 16:11, Jason Gustafson wrote: > > > Hi Jorge, > > > > > > Kafka group management is actually more general than consumer groups > > > (e.g. > > > there are kafka connect groups). If we are adding these APIs, I would > > > suggest we consider the more general protocol and how to expose > > > group-protocol-specific metadata. For example, it might be reasonable to > > > have both an API to access to the low-level bytes as well as some > > > higher-level convenience APIs for accessing consumer groups. > > > > > > Thanks, > > > Jason > > > > > > On Mon, Dec 4, 2017 at 4:07 PM, Matthias J. Sax > > > wrote: > > > > > > > Jorge, > > > > > > > > is there any update regarding this KIP? > > > > > > > > > > > > -Matthias > > > > > > > > On 11/17/17 9:14 AM, Guozhang Wang wrote: > > > > > Hello Jorge, > > > > > > > > > > I made a pass over the wiki, and here are a few comments: > > > > > > > > > > 1. First, regarding to Tom's comment #2 above, I think if we are only > > > > going > > > > > to include the String groupId. Then it is Okay to keep as a String > > than > > > > > using a new wrapper class. However, I think we could include the > > > > > protocol_type returned from the ListGroupsResponse along with the > > > > groupId. > > > > > This is a very useful information to tell which consumer groups are > > from > > > > > Connect, which ones are from Streams, which ones are user-customized > > etc. > > > > > With this, it is reasonable to keep a wrapper class. > > > > > > > > > > 2. In ConsumerDescription, could we also add the state, protocol_type > > > > > (these two are form DescribeGroupResponse), and the Node coordinator > > > > (this > > > > > may be returned from the AdminClient itself) as well? This is also > > for > > > > > information consistency with the old client (note that protocol_type > > was > > > > > called assignment_strategy there). > > > > > > > > > > 3. With 1) / 2) above, maybe we can rename "ConsumerGroupListing" to > > > > > "ConsumerGroupSummary" and make "ConsumerGroupDescription" an > > extended > > > > > class of the former with the additional fields? > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Tue, Nov 7, 2017 at 2:13 AM, Jorge Esteban Quilcate Otoya < > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > >> Hi Tom, > > > > >> > > > > >> 1. You're right. I've updated the KIP accordingly. > > > > >> 2. Yes, I have add it to keep consistency, but I'd like to know what > > > > others > > > > >> think about this too. > > > > >> > > > > >> Cheers, > > > > >> Jorge. > > > > >> > > > > >> El mar., 7 nov. 2017 a las 9:29, Tom Bentley (< > > t.j.bent...@gmail.com>) > > > > >> escribió: > > > > >> > >
Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient
> Why not just return > org.apache.kafka.clients.admin.Config like describeConfigs does? brokers have a `num.partitions` config that does not map to a valid `Config` entry for a topic. another added benefit to using `NewTopic` may be (future kip) having the cluster return the actual replica mappings it would create (i have no idea if this is actually possible) dan On Wed, Dec 13, 2017 at 9:55 AM, Colin McCabewrote: > On Tue, Dec 12, 2017, at 19:02, Ewen Cheslack-Postava wrote: > > re: API versions, I actually wasn't sure if we needed it or not. I'm fine > > if people would prefer just bumping it, but I was actually curious if we > > could get away without bumping it. I don't know the behavior of the > > broker code paths for this well enough to know what types of errors those > > non-null assertions get converted into. > > There's no advantage to trying to keep the API version number the same, > though. Since we have bidirectional client compatibility now, the > clients and the server will just negotiate whatever version they need. > New clients can still talk to older brokers that don't support this > feature. > > If you don't bump the API version, the best case scenario is that you > get a disconnect exception and the end-user is left confused about why. > The worse-case scenario is that you crash the broker (but probably not, > since you'd just get an NPE in serde, I think). If you bump the version > number, you can provide a proper UnsupportedVersionException when the > feature is not supported. > > > For the return type, NewTopic seems reasonable and kind of intuitive -- > > basically a description of the NewTopic you would get. The only reason I > > would be wary of reusing it is that what we don't want people doing is > > taking that and passing it directly into AdminClient.createTopics since > > we don't want them explicitly overriding all the defaults. > > Yeah. Another thing is that NewTopic has a lot of stuff related to > replication that doesn't seem relevant here. For example, when creating > NewTopic, you have the option of either setting replicationFactor, or > setting up a specific replica assignment. Why not just return > org.apache.kafka.clients.admin.Config like describeConfigs does? > > best, > Colin > > > > > -Ewen > > > > On Tue, Dec 12, 2017 at 2:32 PM, dan wrote: > > > > > Colin/Ewen, > > > > > > i will add changes to bump the API version. > > > > > > any preferences on the return type for the new method? tbh it seems > like > > > returning a NewTopic could make sense because the ConfigResource for a > > > TOPIC type does not let me encode `numPartitions` > > > > > > thanks > > > dan > > > > > > On Mon, Dec 11, 2017 at 7:22 PM, Colin McCabe > wrote: > > > > > > > Hi Dan, > > > > > > > > The KIP looks good overall. > > > > > > > > On Mon, Dec 11, 2017, at 18:28, Ewen Cheslack-Postava wrote: > > > > > I think the key point is when the kafka admin and user creating > topics > > > > > differ. I think a more realistic example of Dan's point (2) is for > > > > > retention. I know that realistically, admins aren't just going to > > > > > randomly > > > > > drop the broker defaults from 1w to 1d without warning anyone > (they'd > > > > > likely be fired...). But as a user, I may not know the broker > configs, > > > if > > > > > admins have overridden them, etc. I may want a *minimum* of, e.g., > 2d. > > > > > But if the broker defaults are higher such that the admins are > > > confident > > > > the > > > > > cluster can handle 1w, I'd rather just fall back on the default > value. > > > > > > > > Right. I think this API addresses a similar set of use-cases as > adding > > > > the "validateOnly" boolean for createTopics. You shouldn't have to > > > > create a topic to know whether it was possible to create it, or what > the > > > > retention will end up being, etc. etc. > > > > > > > > > Now, there's arguably a better solution for that case -- allow > topic > > > > > configs to express a *minimum* value (or maximum depending on the > > > > > particular config), with the broker config taking precedence if it > has > > > a > > > > > smaller value (or larger in the case of maximums). This lets you > > > express > > > > > your minimum requirements but allows the cluster to do more if > that's > > > the > > > > > default. However, that would represent a much more significant and > > > > > invasive change, and honestly I think it is more likely to confuse > > > users. > > > > > > > > There always need to be topic defaults, though. If we add a foobar > > > > configuration for topics, existing topics will need to get > grandfathered > > > > in with a default foobar. And they won't be able to set min and max > > > > ranges, because foobars didn't exist back when the old topics were > > > > created. > > > > > > > > > > > > > > @Dan, regarding compatibility, this changes behavior without > revving > > > the > > > > >
Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient
On Tue, Dec 12, 2017, at 19:02, Ewen Cheslack-Postava wrote: > re: API versions, I actually wasn't sure if we needed it or not. I'm fine > if people would prefer just bumping it, but I was actually curious if we > could get away without bumping it. I don't know the behavior of the > broker code paths for this well enough to know what types of errors those > non-null assertions get converted into. There's no advantage to trying to keep the API version number the same, though. Since we have bidirectional client compatibility now, the clients and the server will just negotiate whatever version they need. New clients can still talk to older brokers that don't support this feature. If you don't bump the API version, the best case scenario is that you get a disconnect exception and the end-user is left confused about why. The worse-case scenario is that you crash the broker (but probably not, since you'd just get an NPE in serde, I think). If you bump the version number, you can provide a proper UnsupportedVersionException when the feature is not supported. > For the return type, NewTopic seems reasonable and kind of intuitive -- > basically a description of the NewTopic you would get. The only reason I > would be wary of reusing it is that what we don't want people doing is > taking that and passing it directly into AdminClient.createTopics since > we don't want them explicitly overriding all the defaults. Yeah. Another thing is that NewTopic has a lot of stuff related to replication that doesn't seem relevant here. For example, when creating NewTopic, you have the option of either setting replicationFactor, or setting up a specific replica assignment. Why not just return org.apache.kafka.clients.admin.Config like describeConfigs does? best, Colin > > -Ewen > > On Tue, Dec 12, 2017 at 2:32 PM, danwrote: > > > Colin/Ewen, > > > > i will add changes to bump the API version. > > > > any preferences on the return type for the new method? tbh it seems like > > returning a NewTopic could make sense because the ConfigResource for a > > TOPIC type does not let me encode `numPartitions` > > > > thanks > > dan > > > > On Mon, Dec 11, 2017 at 7:22 PM, Colin McCabe wrote: > > > > > Hi Dan, > > > > > > The KIP looks good overall. > > > > > > On Mon, Dec 11, 2017, at 18:28, Ewen Cheslack-Postava wrote: > > > > I think the key point is when the kafka admin and user creating topics > > > > differ. I think a more realistic example of Dan's point (2) is for > > > > retention. I know that realistically, admins aren't just going to > > > > randomly > > > > drop the broker defaults from 1w to 1d without warning anyone (they'd > > > > likely be fired...). But as a user, I may not know the broker configs, > > if > > > > admins have overridden them, etc. I may want a *minimum* of, e.g., 2d. > > > > But if the broker defaults are higher such that the admins are > > confident > > > the > > > > cluster can handle 1w, I'd rather just fall back on the default value. > > > > > > Right. I think this API addresses a similar set of use-cases as adding > > > the "validateOnly" boolean for createTopics. You shouldn't have to > > > create a topic to know whether it was possible to create it, or what the > > > retention will end up being, etc. etc. > > > > > > > Now, there's arguably a better solution for that case -- allow topic > > > > configs to express a *minimum* value (or maximum depending on the > > > > particular config), with the broker config taking precedence if it has > > a > > > > smaller value (or larger in the case of maximums). This lets you > > express > > > > your minimum requirements but allows the cluster to do more if that's > > the > > > > default. However, that would represent a much more significant and > > > > invasive change, and honestly I think it is more likely to confuse > > users. > > > > > > There always need to be topic defaults, though. If we add a foobar > > > configuration for topics, existing topics will need to get grandfathered > > > in with a default foobar. And they won't be able to set min and max > > > ranges, because foobars didn't exist back when the old topics were > > > created. > > > > > > > > > > > @Dan, regarding compatibility, this changes behavior without revving > > the > > > > request version number, which normally we only do for things that are > > > > reasonably considered bugfixes or were it has no compatibility > > > > implications. In this case, older brokers talking to newer AdminClients > > > > will presumably return some error. Do we know what the non-null > > assertion > > > > gets converted to and if we're happy with the behavior (i.e. will > > > > applications be able to do something reasonable, distinguish it from > > some > > > > completely unrelated error, etc)? Similarly, it's obviously only one > > > > implementation using the KIP-4 APIs, but do we know what client-side > > > > validation AdminClient is already
Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment
Hi Jun and Ted, Jun, you're right that needing one watcher per reassigned partition presents a scalability problem, and using a separate notification path solves that. I also agree that it makes sense to prevent users from using both methods on the same reassignment. Ted, naming the reassignments like mytopic-42 was simpler while I was proposing a watcher-per-reassignment (I'd have needed a child watcher on /admin/reassignments and also on /admin/reassignments/mytopic). Using the separate notification path means I don't need any watchers in the /admin/reassignments subtree, so switching to /admin/reassignments/mytopic/42 would work, and avoid /admin/reassignments having a very large number of child nodes. On the other hand it also means I have to create and delete the topic nodes (e.g. /admin/reassignments/mytopic), which incurs the cost of extra round trips to zookeeper. I suppose that since reassignment is generally a slow process it makes little difference if we increase the latency of the interactions with zookeeper. I have updated the KIP with these improvements, and a more detailed description of exactly how we would manage these znodes. Reading the algorithm in KafkaController.onPartitionReassignment(), it seems that it would be suboptimal for changing reassignments in-flight. Consider an initial assignment of [1,2], reassigned to [2,3] and then changed to [2,4]. Broker 3 will remain in the assigned replicas until broker 4 is in sync, even though 3 wasn't actually one of the original assigned replicas and is no longer a new assigned replica. I think this also affects the case where the reassignment is cancelled ([1,2]->[2,3]->[1,2]): We again have to wait for 3 to catch up, even though its replica will then be deleted. Should we seek to improve this algorithm in this KIP, or leave that as a later optimisation? Cheers, Tom On 11 December 2017 at 21:31, Jun Raowrote: > Another question is on the compatibility. Since now there are 2 ways of > specifying a partition reassignment, one under /admin/reassign_partitions > and the other under /admin/reassignments, we probably want to prevent the > same topic being reassigned under both paths at the same time? > Thanks, > > Jun > > > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao wrote: > > > Hi, Tom, > > > > Thanks for the KIP. It definitely addresses one of the pain points in > > partition reassignment. Another issue that it also addresses is the ZK > node > > size limit when writing the reassignment JSON. > > > > My only concern is that the KIP needs to create one watcher per > reassigned > > partition. This could add overhead in ZK and complexity for debugging > when > > lots of partitions are being reassigned simultaneously. We could > > potentially improve this by introducing a separate ZK path for change > > notification as we do for configs. For example, every time we change the > > assignment for a set of partitions, we could further write a sequential > > node /admin/reassignment_changes/[change_x]. That way, the controller > > only needs to watch the change path. Once a change is triggered, the > > controller can read everything under /admin/reassignments/. > > > > Jun > > > > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley > wrote: > > > >> Hi, > >> > >> This is still very new, but I wanted some quick feedback on a > preliminary > >> KIP which could, I think, help with providing an AdminClient API for > >> partition reassignment. > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236% > >> 3A+Interruptible+Partition+Reassignment > >> > >> I wasn't sure whether to start fleshing out a whole AdminClient API in > >> this > >> KIP (which would make it very big, and difficult to read), or whether to > >> break it down into smaller KIPs (which makes it easier to read and > >> implement in pieces, but harder to get a high-level picture of the > >> ultimate > >> destination). For now I've gone for a very small initial KIP, but I'm > >> happy > >> to sketch the bigger picture here if people are interested. > >> > >> Cheers, > >> > >> Tom > >> > > > > > On 11 December 2017 at 21:31, Jun Rao wrote: > Another question is on the compatibility. Since now there are 2 ways of > specifying a partition reassignment, one under /admin/reassign_partitions > and the other under /admin/reassignments, we probably want to prevent the > same topic being reassigned under both paths at the same time? > Thanks, > > Jun > > > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao wrote: > > > Hi, Tom, > > > > Thanks for the KIP. It definitely addresses one of the pain points in > > partition reassignment. Another issue that it also addresses is the ZK > node > > size limit when writing the reassignment JSON. > > > > My only concern is that the KIP needs to create one watcher per > reassigned > > partition. This could add overhead in ZK and complexity for debugging >
[jira] [Created] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
Damian Guy created KAFKA-6360: - Summary: RocksDB segments not removed when store is closed causes re-initialization to fail Key: KAFKA-6360 URL: https://issues.apache.org/jira/browse/KAFKA-6360 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Damian Guy Assignee: Damian Guy Priority: Blocker Fix For: 1.1.0 When a store is re-initialized it is first closed, before it is opened again. When this happens the segments in the {{Segments}} class are closed, but they are not removed from the list of segments. So when the store is re-initialized the old closed segments are used. This results in: {code} [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] task [1_3] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-24: (org.apache.kafka.streams.processor.internals.ProcessorStateManager) org.apache.kafka.streams.errors.InvalidStateStoreException: Store KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) at org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) at org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6359) Work for KIP-236
Tom Bentley created KAFKA-6359: -- Summary: Work for KIP-236 Key: KAFKA-6359 URL: https://issues.apache.org/jira/browse/KAFKA-6359 Project: Kafka Issue Type: Improvement Reporter: Tom Bentley Assignee: Tom Bentley Priority: Minor This issue is for the work described in KIP-236. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] kafka pull request #4321: KAFKA-6342 : Move workaround for JSON parsing of n...
GitHub user umesh9794 opened a pull request: https://github.com/apache/kafka/pull/4321 KAFKA-6342 : Move workaround for JSON parsing of non-escaped strings This PR moves the JSON parsing workaround of [this PR](https://github.com/apache/kafka/pull/4303) to new method and uses this method in `ZkClient` etc. classes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/umesh9794/kafka KAFKA-6342 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4321 commit ec8abab0629ecdea4a3a5970653e4c88025b8dfd Author: umesh chaudharyDate: 2017-12-13T10:51:23Z Initial Commit ---
[GitHub] kafka pull request #4320: [WIP] Add logs to debug testHighConcurrencyModific...
GitHub user omkreddy opened a pull request: https://github.com/apache/kafka/pull/4320 [WIP] Add logs to debug testHighConcurrencyModificationOfResourceAcls test case Not able to reproduce locally. add few logs to check on jenkins looks like some synchronization issue ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/omkreddy/kafka KAFKA-6335 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4320.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4320 commit 32ca396f676136f6964ecd65ba1ba93badda6955 Author: Manikumar ReddyDate: 2017-12-13T10:26:35Z [WIP] Add logs to debug ---
[jira] [Created] (KAFKA-6358) Per topic producer/fetch_consumer/fetch_follower metrics
Ricardo Bartolome created KAFKA-6358: Summary: Per topic producer/fetch_consumer/fetch_follower metrics Key: KAFKA-6358 URL: https://issues.apache.org/jira/browse/KAFKA-6358 Project: Kafka Issue Type: Wish Components: metrics Affects Versions: 1.0.0 Reporter: Ricardo Bartolome Priority: Minor We are using the following JMX beans to monitor Kafka 1.0.0: {code} kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce Mean 50thPercentile ... 99thPercentile kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer Count kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower Count {code} There are more, but this provide an idea of what we are using in order to get produce/fetch operations on a per-broker basis. Nevertheless, in order to identify abusing consumers/clients in our kafka cluster, we would appreciate to have these metrics in a per-topic basis. As example of per-topic metrics we have: {code} kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=.* {code} Where we have a per-topic bean with a "Count" attribute that we can query. That way we can know which topics are ingesting more data and which ones less data. We can't do that with the metric explained above. Would you consider a change in an upcoming Kafka version as a feature request? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6357) Return nonzero code in kafka-consumer-groups.sh tool in case of error
Rinat Shigapov created KAFKA-6357: - Summary: Return nonzero code in kafka-consumer-groups.sh tool in case of error Key: KAFKA-6357 URL: https://issues.apache.org/jira/browse/KAFKA-6357 Project: Kafka Issue Type: Improvement Components: tools Environment: kafka_2.12-0.11.0.0 Reporter: Rinat Shigapov Use case that triggered that issue: kafka-consumer-groups.sh can reset offset if there is no active consumer in the group. Otherwise it just prints error message about this situation and returns zero error code. Expected behaviour: nonzero code should be returned on error. Than proper scripting around kafka-consumer-groups.sh would be possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6356) UnknownTopicOrPartitionException & NotLeaderForPartitionException and log deletion happening with retention bytes kept at -1.
kaushik srinivas created KAFKA-6356: --- Summary: UnknownTopicOrPartitionException & NotLeaderForPartitionException and log deletion happening with retention bytes kept at -1. Key: KAFKA-6356 URL: https://issues.apache.org/jira/browse/KAFKA-6356 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.0 Environment: Cent OS 7.2, HDD : 2Tb, CPUs: 56 cores, RAM : 256GB Reporter: kaushik srinivas Attachments: configs.txt, stderr_b0, stderr_b1, stderr_b2, stdout_b0, stdout_b1, stdout_b2, topic_description, topic_offsets Facing issues in kafka topic with partitions and replication factor of 3. Config used : No of partitions : 20 replication factor : 3 No of brokers : 3 Memory for broker : 32GB Heap for broker : 12GB Producer is run to produce data for 20 partitions of a single topic. But observed that partitions for which the leader is one of the broker(broker-1), the offsets are never incremented and also we see log file with 0MB size in the broker disk. Seeing below error in the brokers : error 1: 2017-12-13 07:11:11,191] ERROR [ReplicaFetcherThread-0-2], Error for partition [test2,5] to broker 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) error 2: [2017-12-11 12:19:41,599] ERROR [ReplicaFetcherThread-0-2], Error for partition [test1,13] to broker 2:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) Attaching, 1. error and std out files of all the brokers. 2. kafka config used. 3. offsets and topic description. Retention bytes was kept to -1 and retention period 96 hours. But still observing some of the log files deleting at the broker, from logs : [2017-12-11 12:20:20,586] INFO Deleting index /var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12/.timeindex (kafka.log.TimeIndex) [2017-12-11 12:20:20,587] INFO Deleted log for partition [test1,12] in /var/lib/mesos/slave/slaves/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-S7/frameworks/7b319cf4-f06e-4a35-a6fe-fd4fcc0548e6-0006/executors/ckafka__5f085d0c-e296-40f0-a686-8953dd14e4c6/runs/506a1ce7-23d1-45ea-bb7c-84e015405285/kafka-broker-data/broker-1/test1-12. (kafka.log.LogManager) We are expecting the logs to be never delete if retention bytes set to -1. -- This message was sent by Atlassian JIRA (v6.4.14#64029)