[jira] [Created] (KAFKA-6559) Iterate record sets before calling Log.append
Todd Palino created KAFKA-6559: -- Summary: Iterate record sets before calling Log.append Key: KAFKA-6559 URL: https://issues.apache.org/jira/browse/KAFKA-6559 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 1.0.0 Reporter: Todd Palino Assignee: Todd Palino If a Produce request contains multiple record sets for a single topic-partition, it is better to iterate these before calling Log.append. This is because append will process all the sets together, and therefore will need to reassign offsets even if the offsets for an individual record set are properly formed. By iterating the record sets before calling append, each set can be considered on its own and potentially be appended without reassigning offsets. While the core Java producer client does not current operate this way, it is permitted by the protocol and may be used by other clients that aggregate multiple batches together to produce them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consumer fetch requests removed
[ https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-5056: --- Summary: Shuffling of partitions in old consumer fetch requests removed (was: Shuffling of partitions in old consume fetch requests removed) > Shuffling of partitions in old consumer fetch requests removed > -- > > Key: KAFKA-5056 > URL: https://issues.apache.org/jira/browse/KAFKA-5056 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Todd Palino >Assignee: Todd Palino > > [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] > deprecated the constructor to {{FetchRequest}} which shuffles the > {{requestInfo}} parameter, in favor of round robin reordering logic added to > the replica fetcher and the consumer API. However, this was not added to the > old consumer {{ConsumerFetcherThread}}, which has resulted in unfair > partition fetching since 0.10.1. > In order to maintain the old consumer, we need to add the removed shuffle to > {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} > is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed
[ https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-5056: --- Reviewer: Joel Koshy Status: Patch Available (was: In Progress) > Shuffling of partitions in old consume fetch requests removed > - > > Key: KAFKA-5056 > URL: https://issues.apache.org/jira/browse/KAFKA-5056 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Todd Palino >Assignee: Todd Palino > > [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] > deprecated the constructor to {{FetchRequest}} which shuffles the > {{requestInfo}} parameter, in favor of round robin reordering logic added to > the replica fetcher and the consumer API. However, this was not added to the > old consumer {{ConsumerFetcherThread}}, which has resulted in unfair > partition fetching since 0.10.1. > In order to maintain the old consumer, we need to add the removed shuffle to > {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} > is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed
[ https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5056 started by Todd Palino. -- > Shuffling of partitions in old consume fetch requests removed > - > > Key: KAFKA-5056 > URL: https://issues.apache.org/jira/browse/KAFKA-5056 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Todd Palino >Assignee: Todd Palino > > [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] > deprecated the constructor to {{FetchRequest}} which shuffles the > {{requestInfo}} parameter, in favor of round robin reordering logic added to > the replica fetcher and the consumer API. However, this was not added to the > old consumer {{ConsumerFetcherThread}}, which has resulted in unfair > partition fetching since 0.10.1. > In order to maintain the old consumer, we need to add the removed shuffle to > {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} > is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed
Todd Palino created KAFKA-5056: -- Summary: Shuffling of partitions in old consume fetch requests removed Key: KAFKA-5056 URL: https://issues.apache.org/jira/browse/KAFKA-5056 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.0 Reporter: Todd Palino Assignee: Todd Palino [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes] deprecated the constructor to {{FetchRequest}} which shuffles the {{requestInfo}} parameter, in favor of round robin reordering logic added to the replica fetcher and the consumer API. However, this was not added to the old consumer {{ConsumerFetcherThread}}, which has resulted in unfair partition fetching since 0.10.1. In order to maintain the old consumer, we need to add the removed shuffle to {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} is composed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests
[ https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15893728#comment-15893728 ] Todd Palino commented on KAFKA-1342: [~wushujames], I'm not sure about increasing the number of requests, but a lot of this should have been resolved recently with some updates that I believe [~becket_qin] made. To make it better, we had bumped controller.socket.timeout.ms significantly (to 30). We didn't see any side effects from doing that. > Slow controlled shutdowns can result in stale shutdown requests > --- > > Key: KAFKA-1342 > URL: https://issues.apache.org/jira/browse/KAFKA-1342 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Joel Koshy >Assignee: Joel Koshy >Priority: Critical > Labels: newbie++, newbiee, reliability > Fix For: 0.11.0.0 > > > I don't think this is a bug introduced in 0.8.1., but triggered by the fact > that controlled shutdown seems to have become slower in 0.8.1 (will file a > separate ticket to investigate that). When doing a rolling bounce, it is > possible for a bounced broker to stop all its replica fetchers since the > previous PID's shutdown requests are still being shutdown. > - 515 is the controller > - Controlled shutdown initiated for 503 > - Controller starts controlled shutdown for 503 > - The controlled shutdown takes a long time in moving leaders and moving > follower replicas on 503 to the offline state. > - So 503's read from the shutdown channel times out and a new channel is > created. It issues another shutdown request. This request (since it is a > new channel) is accepted at the controller's socket server but then waits > on the broker shutdown lock held by the previous controlled shutdown which > is still in progress. > - The above step repeats for the remaining retries (six more requests). > - 503 hits SocketTimeout exception on reading the response of the last > shutdown request and proceeds to do an unclean shutdown. > - The controller's onBrokerFailure call-back fires and moves 503's replicas > to offline (not too important in this sequence). > - 503 is brought back up. > - The controller's onBrokerStartup call-back fires and moves its replicas > (and partitions) to online state. 503 starts its replica fetchers. > - Unfortunately, the (phantom) shutdown requests are still being handled and > the controller sends StopReplica requests to 503. > - The first shutdown request finally finishes (after 76 minutes in my case!). > - The remaining shutdown requests also execute and do the same thing (sends > StopReplica requests for all partitions to > 503). > - The remaining requests complete quickly because they end up not having to > touch zookeeper paths - no leaders left on the broker and no need to > shrink ISR in zookeeper since it has already been done by the first > shutdown request. > - So in the end-state 503 is up, but effectively idle due to the previous > PID's shutdown requests. > There are some obvious fixes that can be made to controlled shutdown to help > address the above issue. E.g., we don't really need to move follower > partitions to Offline. We did that as an "optimization" so the broker falls > out of ISR sooner - which is helpful when producers set required.acks to -1. > However it adds a lot of latency to controlled shutdown. Also, (more > importantly) we should have a mechanism to abort any stale shutdown process. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15700938#comment-15700938 ] Todd Palino commented on KAFKA-3959: +1 as well. I think keeping the config/server.properties file as a quickstart that gets you going with a standalone broker is a good plan. > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668010#comment-15668010 ] Todd Palino commented on KAFKA-3959: As noted, I just want the config enforced. If RF=3 is configured, that's what we should get. If you need RF=1 for testing, or for specific use cases, set it. Even make the default 1 if that's really what we want. But if I explicitly set RF=3, that's what I should get. And if it causes errors, and I've explicitly set it, that's on me as the user. > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL
[ https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425756#comment-15425756 ] Todd Palino commented on KAFKA-4050: So first off, yes, the thread dump (which [~jjkoshy] posted) shows that the offending line of code is "NativePRNG.java:481". I checked, and that's very clearly in the non-blocking NativePRNG variant that explictly uses /dev/urandom. I had considered changing the default, [~ijuma], and I actually thought about adding a note to this ticket about it earlier today. Despite the fact that the default clearly has performance issues, I don't think we should change the default behavior, which is to let the JRE pick the PRNG implementation. The reason is that we can't be sure that on any given system, in any given JRE, that the new one we set explicitly will exist, and that would cause the default behavior to break. The SHA1PRNG implementation should exist everywhere, but I'd rather not take the risk. I think it's better to leave the default as is, and call out the issue very clearly in the documentation. > Allow configuration of the PRNG used for SSL > > > Key: KAFKA-4050 > URL: https://issues.apache.org/jira/browse/KAFKA-4050 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 0.10.0.1 >Reporter: Todd Palino >Assignee: Todd Palino > Labels: security, ssl > > This change will make the pseudo-random number generator (PRNG) > implementation used by the SSLContext configurable. The configuration is not > required, and the default is to use whatever the default PRNG for the JDK/JRE > is. Providing a string, such as "SHA1PRNG", will cause that specific > SecureRandom implementation to get passed to the SSLContext. > When enabling inter-broker SSL in our certification cluster, we observed > severe performance issues. For reference, this cluster can take up to 600 > MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close > to saturation, and the mirror maker normally produces about 400 MB/sec > (unless it is lagging). When we enabled inter-broker SSL, we saw persistent > replication problems in the cluster at any inbound rate of more than about 6 > or 7 MB/sec per-broker. This was narrowed down to all the network threads > blocking on a single lock in the SecureRandom code. > It turns out that the default PRNG implementation on Linux is NativePRNG. > This uses randomness from /dev/urandom (which, by itself, is a non-blocking > read) and mixes it with randomness from SHA1. The problem is that the entire > application shares a single SecureRandom instance, and NativePRNG has a > global lock within the implNextBytes method. Switching to another > implementation (SHA1PRNG, which has better performance characteristics and is > still considered secure) completely eliminated the bottleneck and allowed the > cluster to work properly at saturation. > The SSLContext initialization has an optional argument to provide a > SecureRandom instance, which the code currently sets to null. This change > creates a new config to specify an implementation, and instantiates that and > passes it to SSLContext if provided. This will also let someone select a > stronger source of randomness (obviously at a performance cost) if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL
[ https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423400#comment-15423400 ] Todd Palino commented on KAFKA-4050: It appears to be called every time something needs to be encrypted (have to get randomness to run the crypto routines), so yeah, every single packet being sent would need a call to get random bytes. > Allow configuration of the PRNG used for SSL > > > Key: KAFKA-4050 > URL: https://issues.apache.org/jira/browse/KAFKA-4050 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 0.10.0.1 >Reporter: Todd Palino >Assignee: Todd Palino > Labels: security, ssl > > This change will make the pseudo-random number generator (PRNG) > implementation used by the SSLContext configurable. The configuration is not > required, and the default is to use whatever the default PRNG for the JDK/JRE > is. Providing a string, such as "SHA1PRNG", will cause that specific > SecureRandom implementation to get passed to the SSLContext. > When enabling inter-broker SSL in our certification cluster, we observed > severe performance issues. For reference, this cluster can take up to 600 > MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close > to saturation, and the mirror maker normally produces about 400 MB/sec > (unless it is lagging). When we enabled inter-broker SSL, we saw persistent > replication problems in the cluster at any inbound rate of more than about 6 > or 7 MB/sec per-broker. This was narrowed down to all the network threads > blocking on a single lock in the SecureRandom code. > It turns out that the default PRNG implementation on Linux is NativePRNG. > This uses randomness from /dev/urandom (which, by itself, is a non-blocking > read) and mixes it with randomness from SHA1. The problem is that the entire > application shares a single SecureRandom instance, and NativePRNG has a > global lock within the implNextBytes method. Switching to another > implementation (SHA1PRNG, which has better performance characteristics and is > still considered secure) completely eliminated the bottleneck and allowed the > cluster to work properly at saturation. > The SSLContext initialization has an optional argument to provide a > SecureRandom instance, which the code currently sets to null. This change > creates a new config to specify an implementation, and instantiates that and > passes it to SSLContext if provided. This will also let someone select a > stronger source of randomness (obviously at a performance cost) if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4050) Allow configuration of the PRNG used for SSL
Todd Palino created KAFKA-4050: -- Summary: Allow configuration of the PRNG used for SSL Key: KAFKA-4050 URL: https://issues.apache.org/jira/browse/KAFKA-4050 Project: Kafka Issue Type: Improvement Components: security Affects Versions: 0.10.0.1 Reporter: Todd Palino Assignee: Todd Palino This change will make the pseudo-random number generator (PRNG) implementation used by the SSLContext configurable. The configuration is not required, and the default is to use whatever the default PRNG for the JDK/JRE is. Providing a string, such as "SHA1PRNG", will cause that specific SecureRandom implementation to get passed to the SSLContext. When enabling inter-broker SSL in our certification cluster, we observed severe performance issues. For reference, this cluster can take up to 600 MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close to saturation, and the mirror maker normally produces about 400 MB/sec (unless it is lagging). When we enabled inter-broker SSL, we saw persistent replication problems in the cluster at any inbound rate of more than about 6 or 7 MB/sec per-broker. This was narrowed down to all the network threads blocking on a single lock in the SecureRandom code. It turns out that the default PRNG implementation on Linux is NativePRNG. This uses randomness from /dev/urandom (which, by itself, is a non-blocking read) and mixes it with randomness from SHA1. The problem is that the entire application shares a single SecureRandom instance, and NativePRNG has a global lock within the implNextBytes method. Switching to another implementation (SHA1PRNG, which has better performance characteristics and is still considered secure) completely eliminated the bottleneck and allowed the cluster to work properly at saturation. The SSLContext initialization has an optional argument to provide a SecureRandom instance, which the code currently sets to null. This change creates a new config to specify an implementation, and instantiates that and passes it to SSLContext if provided. This will also let someone select a stronger source of randomness (obviously at a performance cost) if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420091#comment-15420091 ] Todd Palino commented on KAFKA-3959: If that's the case, then the default should be set to 1. As least having that as the default, and returning an error if the RF cannot be satisfied, is a reasonable and expected outcome. But having it set up to not enforce the configured RF leads to unintended behavior, even if it's documented here. > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419365#comment-15419365 ] Todd Palino commented on KAFKA-3959: Agree with Onur 100% here. We've been running into this a lot lately, and we never know about it until we do a rolling restart of the cluster and consumers break when the topic goes offline. > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic
[ https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323930#comment-15323930 ] Todd Palino commented on KAFKA-3797: Obviously we can't do something like that with a running consumer. But we do it all the time with inactive consumers. I haven't looked at the most recent changes to the offset API yet, so I'm not sure whether or not there are restrictions in place that would allow a client that is not part of the group to commit offsets for the group. > Improve security of __consumer_offsets topic > > > Key: KAFKA-3797 > URL: https://issues.apache.org/jira/browse/KAFKA-3797 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Vahid Hashemian > > By default, we allow clients to override committed offsets and group metadata > using the Produce API as long as they have Write access to the > __consumer_offsets topic. From one perspective, this is fine: administrators > can restrict access to this topic to trusted users. From another, it seems > less than ideal for Write permission on that topic to subsume Group-level > permissions for the full cluster. With this access, a user can cause all > kinds of mischief including making the group "lose" data by setting offsets > ahead of the actual position. This is probably not obvious to administrators > who grant access to topics using a wildcard and it increases the risk from > incorrectly applying topic patterns (if we ever add support for them). It > seems reasonable to consider safer default behavior: > 1. A simple option to fix this would be to prevent wildcard topic rules from > applying to internal topics. To write to an internal topic, you need a > separate rule which explicitly grants authorization to that topic. > 2. A more extreme and perhaps safer option might be to prevent all writes to > this topic (and potentially other internal topics) through the Produce API. > Do we have any use cases which actually require writing to > __consumer_offsets? The only potential case that comes to mind is replication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic
[ https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318854#comment-15318854 ] Todd Palino commented on KAFKA-3797: I think the first option is more reasonable, and provides sufficient security. I can see cases where you might want to have an external tool for changing a group's committed offsets. > Improve security of __consumer_offsets topic > > > Key: KAFKA-3797 > URL: https://issues.apache.org/jira/browse/KAFKA-3797 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson > > By default, we allow clients to override committed offsets and group metadata > using the Produce API as long as they have Write access to the > __consumer_offsets topic. From one perspective, this is fine: administrators > can restrict access to this topic to trusted users. From another, it seems > less than ideal for Write permission on that topic to subsume Group-level > permissions for the full cluster. With this access, a user can cause all > kinds of mischief including making the group "lose" data by setting offsets > ahead of the actual position. This is probably not obvious to administrators > who grant access to topics using a wildcard and it increases the risk from > incorrectly applying topic patterns (if we ever add support for them). It > seems reasonable to consider safer default behavior: > 1. A simple option to fix this would be to prevent wildcard topic rules from > applying to internal topics. To write to an internal topic, you need a > separate rule which explicitly grants authorization to that topic. > 2. A more extreme and perhaps safer option might be to prevent all writes to > this topic (and potentially other internal topics) through the Produce API. > Do we have any use cases which actually require writing to > __consumer_offsets? The only potential case that comes to mind is replication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3725) Update documentation with regards to XFS
[ https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312288#comment-15312288 ] Todd Palino commented on KAFKA-3725: I'll take a look at this and put together a PR with some updates. > Update documentation with regards to XFS > > > Key: KAFKA-3725 > URL: https://issues.apache.org/jira/browse/KAFKA-3725 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Todd Palino > > Our documentation currently states that only Ext4 has been tried (by > LinkedIn): > "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS > supposedly handle locking during fsync better. We have only tried Ext4, > though." > http://kafka.apache.org/documentation.html#ext4 > I think this is no longer true, so we should update the documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3725) Update documentation with regards to XFS
[ https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino reassigned KAFKA-3725: -- Assignee: Todd Palino > Update documentation with regards to XFS > > > Key: KAFKA-3725 > URL: https://issues.apache.org/jira/browse/KAFKA-3725 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Todd Palino > > Our documentation currently states that only Ext4 has been tried (by > LinkedIn): > "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS > supposedly handle locking during fsync better. We have only tried Ext4, > though." > http://kafka.apache.org/documentation.html#ext4 > I think this is no longer true, so we should update the documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3174) Re-evaluate the CRC32 class performance.
[ https://issues.apache.org/jira/browse/KAFKA-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126292#comment-15126292 ] Todd Palino commented on KAFKA-3174: Yeah, definitely no problems with Java 1.8. We've been running 1.8 u5 for quite some time, and we're in the process of updating to u40. It's worth noting that we have been running into a number of SEGVs with mirror maker (but not the broker) under u5, but the problem is supposedly fixed in u40. > Re-evaluate the CRC32 class performance. > > > Key: KAFKA-3174 > URL: https://issues.apache.org/jira/browse/KAFKA-3174 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Fix For: 0.9.0.1 > > > We used org.apache.kafka.common.utils.CRC32 in clients because it has better > performance than java.util.zip.CRC32 in Java 1.6. > In a recent test I ran it looks in Java 1.8 the CRC32 class is 2x as fast as > the Crc32 class we are using now. We may want to re-evaluate the performance > of Crc32 class and see it makes sense to simply use java CRC32 instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing
[ https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15065636#comment-15065636 ] Todd Palino commented on KAFKA-3015: [~jkreps] So yes, I'm essentially saying that I would prefer to see optimizations to the current partitioning scheme, and the addition of being able to handle single disk failures without terminating the entire broker. I would argue that this would have a higher payoff for people who do not have the ability to easily swap in new machines (as we do, or those in AWS would), because it will allow for more granular failures. Disks tend to fail more than any other component, so the ability to survive a disk failure should be attractive to anyone. As noted, there are a lot of benefits of using JBOD, and I would not argue against having good support for it. Especially as we've been moving to new hardware, we now can't take advantage of all the network we have with 10gig interfaces primarily due to limitations on disk capacity and throughput. Additionally, we'd like to move to RF=3, but can't stomach it because of the cost. If we drop the RAID 10 we will significantly increase throughput and double our storage capacity. Then we can easily move to RF=3 (using 50% more disk) with little impact. > Improve JBOD data balancing > --- > > Key: KAFKA-3015 > URL: https://issues.apache.org/jira/browse/KAFKA-3015 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps > > When running with multiple data directories (i.e. JBOD) we currently place > partitions entirely within one data directory. This tends to lead to poor > balancing across disks as some topics have more throughput/retention and not > all disks get data from all topics. You can't fix this problem with smarter > partition placement strategies because ultimately you don't know when a > partition is created when or how heavily it will be used (this is a subtle > point, and the tendency is to try to think of some more sophisticated way to > place partitions based on current data size but this is actually > exceptionally dangerous and can lead to much worse imbalance when creating > many partitions at once as they would all go to the disk with the least > data). We don't support online rebalancing across directories/disks so this > imbalance is a big problem and limits the usefulness of this configuration. > Implementing online rebalancing of data across disks without downtime is > actually quite hard and requires lots of I/O since you have to actually > rewrite full partitions of data. > An alternative would be to place each partition in *all* directories/drives > and round-robin *segments* within the partition across the directories. So > the layout would be something like: > drive-a/mytopic-0/ > 000.data > 000.index > 0024680.data > 0024680.index > drive-a/mytopic-0/ > 0012345.data > 0012345.index > 0036912.data > 0036912.index > This is a little harder to implement than the current approach but not very > hard, and it is a lot easier than implementing online data balancing across > disks while retaining the current approach. I think this could easily be done > in a backwards compatible way. > I think the balancing you would get from this in most cases would be good > enough to make JBOD the default configuration. Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing
[ https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15065239#comment-15065239 ] Todd Palino commented on KAFKA-3015: While this seems good on the surface, it makes it impossible to continue running the broker on a single disk failure. This is one of our primary complaints about JBOD, and one of the main reasons we cannot use it (as much as we would like to). > Improve JBOD data balancing > --- > > Key: KAFKA-3015 > URL: https://issues.apache.org/jira/browse/KAFKA-3015 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps > > When running with multiple data directories (i.e. JBOD) we currently place > partitions entirely within one data directory. This tends to lead to poor > balancing across disks as some topics have more throughput/retention and not > all disks get data from all topics. You can't fix this problem with smarter > partition placement strategies because ultimately you don't know when a > partition is created when or how heavily it will be used (this is a subtle > point, and the tendency is to try to think of some more sophisticated way to > place partitions based on current data size but this is actually > exceptionally dangerous and can lead to much worse imbalance when creating > many partitions at once as they would all go to the disk with the least > data). We don't support online rebalancing across directories/disks so this > imbalance is a big problem and limits the usefulness of this configuration. > Implementing online rebalancing of data across disks without downtime is > actually quite hard and requires lots of I/O since you have to actually > rewrite full partitions of data. > An alternative would be to place each partition in *all* directories/drives > and round-robin *segments* within the partition across the directories. So > the layout would be something like: > drive-a/mytopic-0/ > 000.data > 000.index > 0024680.data > 0024680.index > drive-a/mytopic-0/ > 0012345.data > 0012345.index > 0036912.data > 0036912.index > This is a little harder to implement than the current approach but not very > hard, and it is a lot easier than implementing online data balancing across > disks while retaining the current approach. I think this could easily be done > in a backwards compatible way. > I think the balancing you would get from this in most cases would be good > enough to make JBOD the default configuration. Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2759) Mirror maker can leave gaps of missing messages if the process dies after a partition is reset and before the first offset commit
[ https://issues.apache.org/jira/browse/KAFKA-2759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14994590#comment-14994590 ] Todd Palino commented on KAFKA-2759: The thought we had internally on the situation in KAFKA-1006 was that we should differentiate the behavior between what happens if a consumer has no offsets at all for a partition and what happens if the consumer thinks it has offsets, but is out of range. These are two different situations. In the case of mirror maker, we would want "earliest" behavior for the first situation (whether default or configured, doesn't matter), as we never want to lose data from new topics or partitions. For the second situation we want "closest". That is, if your out of range offset is closer to the earliest offset, start there. Otherwise, if your out of range offset is closer to the latest offset, start there. There's a little more detail involved on this, as there are intricacies with the fetch response. [~jjkoshy] has more information on that discussion. > Mirror maker can leave gaps of missing messages if the process dies after a > partition is reset and before the first offset commit > - > > Key: KAFKA-2759 > URL: https://issues.apache.org/jira/browse/KAFKA-2759 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.2 >Reporter: Ewen Cheslack-Postava >Priority: Minor > > Based on investigation of KAFKA-2747. When mirror maker first starts or if it > picks up new topics/partitions, it will use the reset policy to choose where > to start. By default this uses 'latest'. If it starts reading messages and > then dies before committing offsets for the first time, then the mirror maker > that takes over that partition will also reset. This can result in some > messages making it to the consumer, then a gap that were skipped, and then > messages that get processed by the new MM process. > One solution to this problem would be to make sure that offsets are committed > after they are reset but before the first message is passed to the producer. > In other words, in the case of a reset, MM should record where it's going to > start reading data from before processing any messages. This guarantees all > messages after the first one delivered by MM will appear at least once. > This is minor since it should be very rare, but it does break an assumption > that people probably make about the output -- once you start receiving data, > you aren't guaranteed all subsequent messages will appear at least once. > This same issue could affect Copycat as well. In fact, it may be generally > useful to allow consumers to know when the offset was reset so they can > handle cases like this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow
[ https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971806#comment-14971806 ] Todd Palino commented on KAFKA-2235: I don't think we can. I have already increased it from 512MB to 1GB, and we still hit the same problems. That only provides a 2x increase in the size of the map, and I would need almost a 10x increase to solve the problem. > LogCleaner offset map overflow > -- > > Key: KAFKA-2235 > URL: https://issues.apache.org/jira/browse/KAFKA-2235 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 0.8.1, 0.8.2.0 >Reporter: Ivan Simoneko >Assignee: Ivan Simoneko > Fix For: 0.9.0.0 > > Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch > > > We've seen log cleaning generating an error for a topic with lots of small > messages. It seems that cleanup map overflow is possible if a log segment > contains more unique keys than empty slots in offsetMap. Check for baseOffset > and map utilization before processing segment seems to be not enough because > it doesn't take into account segment size (number of unique messages in the > segment). > I suggest to estimate upper bound of keys in a segment as a number of > messages in the segment and compare it with the number of available slots in > the map (keeping in mind desired load factor). It should work in cases where > an empty map is capable to hold all the keys for a single segment. If even a > single segment no able to fit into an empty map cleanup process will still > fail. Probably there should be a limit on the log segment entries count? > Here is the stack trace for this error: > 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] > kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to > java.lang.IllegalArgumentException: requirement failed: Attempt to add a new > entry to a full offset map. >at scala.Predef$.require(Predef.scala:233) >at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538) >at scala.collection.Iterator$class.foreach(Iterator.scala:727) >at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >at kafka.message.MessageSet.foreach(MessageSet.scala:67) >at > kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512) >at scala.collection.immutable.Stream.foreach(Stream.scala:547) >at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512) >at kafka.log.Cleaner.clean(LogCleaner.scala:307) >at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221) >at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199) >at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow
[ https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971740#comment-14971740 ] Todd Palino commented on KAFKA-2235: I'm sure [~jjkoshy] will follow along with more detail on this, but we've run into a serious problem with this check. Basically, it's impossible to perform this kind of check accurately before the offset map is built. We now have partitions that should be able to be compacted as the total number of unique keys is far below the size of the offset map (currently at ~39 million for our configuration) but the messages are very frequent and very small. Even at a segment size of 64 MB, we have over 300 million messages in those segments. So this check creates a situation where log compaction should succeed, but fails because of a speculative check. While I can play the game of trying to walk back segment sizes, there's no way to size segments by number of messages, so it's a guessing game. In addition, the check is clearly wrong in that case, so I shouldn't have to config around it. Lastly, the check causes the log cleaner thread to exit, which means log compaction on the broker fails entirely, rather than just skipping that partition. A better way to handle this would be to cleanly catch the original error you are seeing, generate a clear error message in the logs as to what the failure is, and allow the log cleaner to continue and handle other partitions. You could also maintain a blacklist of partitions in memory in the log cleaner to make sure you don't come back around and try and compact the partition again. > LogCleaner offset map overflow > -- > > Key: KAFKA-2235 > URL: https://issues.apache.org/jira/browse/KAFKA-2235 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 0.8.1, 0.8.2.0 >Reporter: Ivan Simoneko >Assignee: Ivan Simoneko > Fix For: 0.9.0.0 > > Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch > > > We've seen log cleaning generating an error for a topic with lots of small > messages. It seems that cleanup map overflow is possible if a log segment > contains more unique keys than empty slots in offsetMap. Check for baseOffset > and map utilization before processing segment seems to be not enough because > it doesn't take into account segment size (number of unique messages in the > segment). > I suggest to estimate upper bound of keys in a segment as a number of > messages in the segment and compare it with the number of available slots in > the map (keeping in mind desired load factor). It should work in cases where > an empty map is capable to hold all the keys for a single segment. If even a > single segment no able to fit into an empty map cleanup process will still > fail. Probably there should be a limit on the log segment entries count? > Here is the stack trace for this error: > 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] > kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to > java.lang.IllegalArgumentException: requirement failed: Attempt to add a new > entry to a full offset map. >at scala.Predef$.require(Predef.scala:233) >at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543) >at > kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538) >at scala.collection.Iterator$class.foreach(Iterator.scala:727) >at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >at kafka.message.MessageSet.foreach(MessageSet.scala:67) >at > kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515) >at > kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512) >at scala.collection.immutable.Stream.foreach(Stream.scala:547) >at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512) >at kafka.log.Cleaner.clean(LogCleaner.scala:307) >at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221) >at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199) >at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964228#comment-14964228 ] Todd Palino commented on KAFKA-2017: I think we definitely need to maintain the ability to get that type of information, whether it's within the protocol or via an admin endpoint. Being able to tell what consumers exist in a group, as well as what partitions each of them owns, is important information to have. And it should be available not just from the consumers, but from the coordinator itself. That way you can debug issues where they have gone out of sync, and you can provide tools (such as burrow) to provide consumer status information independently. > Persist Coordinator State for Coordinator Failover > -- > > Key: KAFKA-2017 > URL: https://issues.apache.org/jira/browse/KAFKA-2017 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Onur Karaman >Assignee: Guozhang Wang > Fix For: 0.9.0.0 > > Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, > KAFKA-2017_2015-05-21_19:02:47.patch > > > When a coordinator fails, the group membership protocol tries to failover to > a new coordinator without forcing all the consumers rejoin their groups. This > is possible if the coordinator persists its state so that the state can be > transferred during coordinator failover. This state consists of most of the > information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)
[ https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964003#comment-14964003 ] Todd Palino commented on KAFKA-2580: It's about as graceful as an OOM, which is to say "not very". Essentially, it hits the limit and falls over and dies with an exception. We've run into it a bit with both leaking FDs from an implementation issue, and with runaway clients that don't do the right thing. In both situations, you are correct that you will generally end up seeing it as a cascading failure through the cluster. > Kafka Broker keeps file handles open for all log files (even if its not > written to/read from) > - > > Key: KAFKA-2580 > URL: https://issues.apache.org/jira/browse/KAFKA-2580 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.1 >Reporter: Vinoth Chandar >Assignee: Grant Henke > > We noticed this in one of our clusters where we stage logs for a longer > amount of time. It appears that the Kafka broker keeps file handles open even > for non active (not written to or read from) files. (in fact, there are some > threads going back to 2013 > http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) > Needless to say, this is a problem and forces us to either artificially bump > up ulimit (its already at 100K) or expand the cluster (even if we have > sufficient IO and everything). > Filing this ticket, since I could find anything similar. Very interested to > know if there are plans to address this (given how Samza's changelog topic is > meant to be a persistent large state use case). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)
[ https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963986#comment-14963986 ] Todd Palino commented on KAFKA-2580: I agree with [~jkreps] here, that having a high FD limit is not a bad thing. As [~jjkoshy] noted, we're already running at 400k internally (recently increased from 200k). Part of that is to handle growth, and part of that is to have a good bit of headroom if something starts to leak FDs so we have some time to address it before it kills the process (we alert at 50% utilization). The LRU cache option is probably the best. You can set it to an arbitrarily high number (the best option here might be to cap it near, but below, your per-process limit) if you want to effectively disable it, and it would generally avoid the process of having to check and act on expiring the FDs in the timer option. I can see arguments for setting the default either high or low (and I consider 10k to be low). Regardless, as long as it's configurable and documented it will be fine. > Kafka Broker keeps file handles open for all log files (even if its not > written to/read from) > - > > Key: KAFKA-2580 > URL: https://issues.apache.org/jira/browse/KAFKA-2580 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.1 >Reporter: Vinoth Chandar >Assignee: Grant Henke > > We noticed this in one of our clusters where we stage logs for a longer > amount of time. It appears that the Kafka broker keeps file handles open even > for non active (not written to or read from) files. (in fact, there are some > threads going back to 2013 > http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) > Needless to say, this is a problem and forces us to either artificially bump > up ulimit (its already at 100K) or expand the cluster (even if we have > sufficient IO and everything). > Filing this ticket, since I could find anything similar. Very interested to > know if there are plans to address this (given how Samza's changelog topic is > meant to be a persistent large state use case). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14960809#comment-14960809 ] Todd Palino commented on KAFKA-2017: Just to throw in my 2 cents here, I don't think that persisting this state in a special topic in Kafka is a bad idea. My only concern is that we have seen issues with the offsets already from time to time, and we'll want to make sure we take those lessons learned and handle them from the start. The ones I am aware of are: 1) Creation of the special topic at cluster initialization. If we specify an RF of N for the special topic, then the brokers must make this happen. The first broker that comes up can't create it with an RF of 1 and own all the partitions. Either it must reject all operations that would use the special topic until N brokers are members of the cluster and the it can be created, or it must create the topic in such a way that as soon as there are N brokers available the RF is corrected to the configured number. 2) Load of the special topic into local cache. Whenever a coordinator loads the special topic, there is a period of time while it is loading state where it cannot service requests. We've seen problems with this related to log compaction, where the partitions were excessively large, but I can see as we move an increasing number of (group, partition) tuples over to Kafka-committed offsets it could become a scale issue very easily. This should not be a big deal for group state information, as that should always be smaller than the offset information for the group, but we may want to create a longer term plan for handling auto-scaling of the special topics (the ability to increase the number of partitions and move group information from the partition it used to hash to to the one it hashes to after scaling). > Persist Coordinator State for Coordinator Failover > -- > > Key: KAFKA-2017 > URL: https://issues.apache.org/jira/browse/KAFKA-2017 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: Onur Karaman >Assignee: Guozhang Wang > Fix For: 0.9.0.0 > > Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, > KAFKA-2017_2015-05-21_19:02:47.patch > > > When a coordinator fails, the group membership protocol tries to failover to > a new coordinator without forcing all the consumers rejoin their groups. This > is possible if the coordinator persists its state so that the state can be > transferred during coordinator failover. This state consists of most of the > information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)
[ https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14708282#comment-14708282 ] Todd Palino commented on KAFKA-1566: This makes a lot of sense. We don't use any of the standard start/stop components or scripts, just the admin CLI tools, so I don't see this having an effect on us. > Kafka environment configuration (kafka-env.sh) > -- > > Key: KAFKA-1566 > URL: https://issues.apache.org/jira/browse/KAFKA-1566 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Cosmin Lehene >Assignee: Sriharsha Chintalapani > Labels: newbie > Fix For: 0.8.3 > > Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, > KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch > > > It would be useful (especially for automated deployments) to have an > environment configuration file that could be sourced from the launcher files > (e.g. kafka-run-server.sh). > This is how this could look like kafka-env.sh > {code} > export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops > -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC > -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35' %>" > export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" > export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" > {code} > kafka-server-start.sh > {code} > ... > source $base_dir/config/kafka-env.sh > ... > {code} > This approach is consistent with Hadoop and HBase. However the idea here is > to be able to set these values in a single place without having to edit > startup scripts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2252) Socket connection closing is logged, but not corresponding opening of socket
[ https://issues.apache.org/jira/browse/KAFKA-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14586437#comment-14586437 ] Todd Palino commented on KAFKA-2252: I moved the normal connection closed message to DEBUG level in KAFKA-2175. In general, we should not log either normal connection close or open at INFO level. There is far too much noise due to these messages. > Socket connection closing is logged, but not corresponding opening of socket > > > Key: KAFKA-2252 > URL: https://issues.apache.org/jira/browse/KAFKA-2252 > Project: Kafka > Issue Type: Bug >Reporter: Jason Rosenberg > > (using 0.8.2.1) > We see a large number of "Closing socket connection" logging to the broker > logs, e.g.: > {code} > 2015-06-04 16:49:30,262 INFO [kafka-network-thread-27330-2] > network.Processor - Closing socket connection to /1.2.3.4. > 2015-06-04 16:49:30,262 INFO [kafka-network-thread-27330-0] > network.Processor - Closing socket connection to /5.6.7.8. > 2015-06-04 16:49:30,695 INFO [kafka-network-thread-27330-0] > network.Processor - Closing socket connection to /9.10.11.12. > 2015-06-04 16:49:31,465 INFO [kafka-network-thread-27330-1] > network.Processor - Closing socket connection to /13.14.15.16. > 2015-06-04 16:49:31,806 INFO [kafka-network-thread-27330-0] > network.Processor - Closing socket connection to /17.18.19.20. > 2015-06-04 16:49:31,842 INFO [kafka-network-thread-27330-2] > network.Processor - Closing socket connection to /21.22.23.24. > {code} > However, we have no corresponding logging for when these connections are > established. Consequently, it's not very useful to see a flood of closed > connections, etc. I'd think we'd want to see the corresponding 'connection > established' messages, also logged as INFO. > Occasionally, we see a flood of the above messages, and have no idea as to > whether it indicates a problem, etc. (Sometimes it might be due to an > ongoing rolling restart, or a change in the Zookeeper cluster). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2175) Reduce server log verbosity at info level
[ https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino reassigned KAFKA-2175: -- Assignee: Todd Palino (was: Neha Narkhede) > Reduce server log verbosity at info level > - > > Key: KAFKA-2175 > URL: https://issues.apache.org/jira/browse/KAFKA-2175 > Project: Kafka > Issue Type: Improvement > Components: controller, zkclient >Affects Versions: 0.8.3 >Reporter: Todd Palino >Assignee: Todd Palino >Priority: Minor > Labels: newbie > Attachments: KAFKA-2175.patch > > > Currently, the broker logs two messages at INFO level that should be at a > lower level. This serves only to fill up log files on disk, and can cause > performance issues due to synchronous logging as well. > The first is the "Closing socket connection" message when there is no error. > This should be reduced to debug level. The second is the message that ZkUtil > writes when updating the partition reassignment JSON. This message contains > the entire JSON blob and should never be written at info level. In addition, > there is already a message in the controller log stating that the ZK node has > been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level
[ https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-2175: --- Attachment: KAFKA-2175.patch > Reduce server log verbosity at info level > - > > Key: KAFKA-2175 > URL: https://issues.apache.org/jira/browse/KAFKA-2175 > Project: Kafka > Issue Type: Improvement > Components: controller, zkclient >Affects Versions: 0.8.3 >Reporter: Todd Palino >Assignee: Neha Narkhede >Priority: Minor > Labels: newbie > Attachments: KAFKA-2175.patch > > > Currently, the broker logs two messages at INFO level that should be at a > lower level. This serves only to fill up log files on disk, and can cause > performance issues due to synchronous logging as well. > The first is the "Closing socket connection" message when there is no error. > This should be reduced to debug level. The second is the message that ZkUtil > writes when updating the partition reassignment JSON. This message contains > the entire JSON blob and should never be written at info level. In addition, > there is already a message in the controller log stating that the ZK node has > been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level
[ https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-2175: --- Status: Patch Available (was: Open) > Reduce server log verbosity at info level > - > > Key: KAFKA-2175 > URL: https://issues.apache.org/jira/browse/KAFKA-2175 > Project: Kafka > Issue Type: Improvement > Components: controller, zkclient >Affects Versions: 0.8.3 >Reporter: Todd Palino >Assignee: Todd Palino >Priority: Minor > Labels: newbie > Attachments: KAFKA-2175.patch > > > Currently, the broker logs two messages at INFO level that should be at a > lower level. This serves only to fill up log files on disk, and can cause > performance issues due to synchronous logging as well. > The first is the "Closing socket connection" message when there is no error. > This should be reduced to debug level. The second is the message that ZkUtil > writes when updating the partition reassignment JSON. This message contains > the entire JSON blob and should never be written at info level. In addition, > there is already a message in the controller log stating that the ZK node has > been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2175) Reduce server log verbosity at info level
Todd Palino created KAFKA-2175: -- Summary: Reduce server log verbosity at info level Key: KAFKA-2175 URL: https://issues.apache.org/jira/browse/KAFKA-2175 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.8.3 Reporter: Todd Palino Assignee: Neha Narkhede Priority: Minor Currently, the broker logs two messages at INFO level that should be at a lower level. This serves only to fill up log files on disk, and can cause performance issues due to synchronous logging as well. The first is the "Closing socket connection" message when there is no error. This should be reduced to debug level. The second is the message that ZkUtil writes when updating the partition reassignment JSON. This message contains the entire JSON blob and should never be written at info level. In addition, there is already a message in the controller log stating that the ZK node has been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests
[ https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14484278#comment-14484278 ] Todd Palino commented on KAFKA-1342: Bump I think we need to revive this. We have a "safe shutdown" bit of wrapper code we use which relies on an external resource that we should eliminate. It would be better to provide a safe shutdown option within Kafka itself without that wrapper (i.e. do not shut down unless your under replicated count is 0). However, this is not possible without serialized shutdown at the controller level. We can't allow a second broker to shut down until the first broker has completed its shutdown process. Then the second broker can check the URP count and be allowed to proceed. > Slow controlled shutdowns can result in stale shutdown requests > --- > > Key: KAFKA-1342 > URL: https://issues.apache.org/jira/browse/KAFKA-1342 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Joel Koshy >Priority: Blocker > Fix For: 0.9.0 > > > I don't think this is a bug introduced in 0.8.1., but triggered by the fact > that controlled shutdown seems to have become slower in 0.8.1 (will file a > separate ticket to investigate that). When doing a rolling bounce, it is > possible for a bounced broker to stop all its replica fetchers since the > previous PID's shutdown requests are still being shutdown. > - 515 is the controller > - Controlled shutdown initiated for 503 > - Controller starts controlled shutdown for 503 > - The controlled shutdown takes a long time in moving leaders and moving > follower replicas on 503 to the offline state. > - So 503's read from the shutdown channel times out and a new channel is > created. It issues another shutdown request. This request (since it is a > new channel) is accepted at the controller's socket server but then waits > on the broker shutdown lock held by the previous controlled shutdown which > is still in progress. > - The above step repeats for the remaining retries (six more requests). > - 503 hits SocketTimeout exception on reading the response of the last > shutdown request and proceeds to do an unclean shutdown. > - The controller's onBrokerFailure call-back fires and moves 503's replicas > to offline (not too important in this sequence). > - 503 is brought back up. > - The controller's onBrokerStartup call-back fires and moves its replicas > (and partitions) to online state. 503 starts its replica fetchers. > - Unfortunately, the (phantom) shutdown requests are still being handled and > the controller sends StopReplica requests to 503. > - The first shutdown request finally finishes (after 76 minutes in my case!). > - The remaining shutdown requests also execute and do the same thing (sends > StopReplica requests for all partitions to > 503). > - The remaining requests complete quickly because they end up not having to > touch zookeeper paths - no leaders left on the broker and no need to > shrink ISR in zookeeper since it has already been done by the first > shutdown request. > - So in the end-state 503 is up, but effectively idle due to the previous > PID's shutdown requests. > There are some obvious fixes that can be made to controlled shutdown to help > address the above issue. E.g., we don't really need to move follower > partitions to Offline. We did that as an "optimization" so the broker falls > out of ISR sooner - which is helpful when producers set required.acks to -1. > However it adds a lot of latency to controlled shutdown. Also, (more > importantly) we should have a mechanism to abort any stale shutdown process. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2012) Broker should automatically handle corrupt index files
Todd Palino created KAFKA-2012: -- Summary: Broker should automatically handle corrupt index files Key: KAFKA-2012 URL: https://issues.apache.org/jira/browse/KAFKA-2012 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Todd Palino We had a bunch of unclean system shutdowns (power failure), which caused corruption on our disks holding log segments in many cases. While the broker is handling the log segment corruption properly (truncation), it is having problems with corruption in the index files. Additionally, this only seems to be happening on some startups (while we are upgrading). The broker should just do what I do when I hit a corrupt index file - remove it and rebuild it. 2015/03/09 17:58:53.873 FATAL [KafkaServerStartable] [main] [kafka-server] [] Fatal error during KafkaServerStartable startup. Prepare to shutdown java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (/export/content/kafka/i001_caches/__consumer_offsets-39/.index) has non-zero size but the last offset is -2121629628 and the base offset is 0 at scala.Predef$.require(Predef.scala:233) at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:185) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:184) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.loadSegments(Log.scala:184) at kafka.log.Log.(Log.scala:82) at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:141) at kafka.utils.Utils$$anon$1.run(Utils.scala:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1987) Potential race condition in partition creation
Todd Palino created KAFKA-1987: -- Summary: Potential race condition in partition creation Key: KAFKA-1987 URL: https://issues.apache.org/jira/browse/KAFKA-1987 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1.1 Reporter: Todd Palino Assignee: Neha Narkhede I am finding that there appears to be a race condition when creating partitions, with replication factor 2 or higher, between the creation of the partition on the leader and the follower. What appears to be happening is that the follower is processing the command to create the partition before the leader does, and when the follower starts the replica fetcher, it fails with an UnknownTopicOrPartitionException. The situation is that I am creating a large number of partitions on a cluster, preparing it for data being mirrored from another cluster. So there are a sizeable number of create and alter commands being sent sequentially. Eventually, the replica fetchers start up properly. But it seems like the controller should issue the command to create the partition to the leader, wait for confirmation, and then issue the command to create the partition to the followers. 2015/02/26 21:11:50.413 INFO [LogManager] [kafka-request-handler-12] [kafka-server] [] Created log for partition [topicA,30] in /path_to/i001_caches with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 6, segment.bytes -> 268435456, flush.ms -> 1, delete.retention.ms -> 8640, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> delete, unclean.leader.election.enable -> true, segment.ms -> 4320, max.message.bytes -> 100, flush.messages -> 2, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 8640, segment.jitter.ms -> 0}. 2015/02/26 21:11:50.418 WARN [Partition] [kafka-request-handler-12] [kafka-server] [] Partition [topicA,30] on broker 1551: No checkpointed highwatermark is found for partition [topicA,30] 2015/02/26 21:11:50.418 INFO [ReplicaFetcherManager] [kafka-request-handler-12] [kafka-server] [] [ReplicaFetcherManager on broker 1551] Removed fetcher for partitions [topicA,30] 2015/02/26 21:11:50.418 INFO [Log] [kafka-request-handler-12] [kafka-server] [] Truncating log topicA-30 to offset 0. 2015/02/26 21:11:50.450 INFO [ReplicaFetcherManager] [kafka-request-handler-12] [kafka-server] [] [ReplicaFetcherManager on broker 1551] Added fetcher for partitions List([[topicA,30], initOffset 0 to broker id:1555,host:host1555.example.com,port:10251] ) 2015/02/26 21:11:50.615 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.616 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.618 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.620 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2015/02/26 21:11:50.621 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], Error for partition [topicA,30] to broker 1555:class kafka.common.UnknownTopicOrPartitionException 2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1648) Round robin consumer balance throws an NPE when there are no topics
Todd Palino created KAFKA-1648: -- Summary: Round robin consumer balance throws an NPE when there are no topics Key: KAFKA-1648 URL: https://issues.apache.org/jira/browse/KAFKA-1648 Project: Kafka Issue Type: Bug Components: consumer Reporter: Todd Palino Assignee: Neha Narkhede If you use the roundrobin rebalance method with a wildcard consumer, and there are no topics in the cluster, rebalance throws a NullPointerException in the consumer and fails. It retries the rebalance, but will continue to throw the NPE. 2014/09/23 17:51:16.147 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Cleared all relevant queues for this fetcher 2014/09/23 17:51:16.147 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Cleared the data chunks in all the consumer message iterators 2014/09/23 17:51:16.148 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Committing all offsets after clearing the fetcher queues 2014/09/23 17:51:46.148 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], begin rebalancing consumer kafka-audit_lva1-app0007.corp-1411494404908-4e620544 try #0 2014/09/23 17:51:46.148 ERROR [OffspringServletRuntime] [main] [kafka-console-audit] [] Boot listener com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditBootListener failed kafka.common.ConsumerRebalanceFailedException: kafka-audit_lva1-app0007.corp-1411494404908-4e620544 can't rebalance after 10 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:630) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:897) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:931) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:159) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) at com.linkedin.tracker.consumer.TrackingConsumerImpl.initWildcardIterators(TrackingConsumerImpl.java:88) at com.linkedin.tracker.consumer.TrackingConsumerImpl.getWildcardIterators(TrackingConsumerImpl.java:116) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAudit.createAuditThreads(KafkaConsoleAudit.java:59) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAudit.initializeAudit(KafkaConsoleAudit.java:50) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditFactory.createInstance(KafkaConsoleAuditFactory.java:125) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditFactory.createInstance(KafkaConsoleAuditFactory.java:20) at com.linkedin.util.factory.SimpleSingletonFactory.createInstance(SimpleSingletonFactory.java:20) at com.linkedin.util.factory.SimpleSingletonFactory.createInstance(SimpleSingletonFactory.java:14) at com.linkedin.util.factory.Generator.doGetBean(Generator.java:337) at com.linkedin.util.factory.Generator.getBean(Generator.java:270) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditBootListener.onBoot(KafkaConsoleAuditBootListener.java:16) at com.linkedin.offspring.servlet.OffspringServletRuntime.startGenerator(OffspringServletRuntime.java:147) at com.linkedin.offspring.servlet.OffspringServletRuntime.start(OffspringServletRuntime.java:73) at com.linkedin.offspring.servlet.OffspringServletContextListener.contextInitialized(OffspringServletContextListener.java:28) at org.eclipse.jetty.server.handler.ContextHandler.callContextInitialized(ContextHandler.java:771) at org.eclipse.jetty.servlet.ServletContextHandler.callContextInitialized(ServletContextHandler.java:424) at org.eclipse.jetty.server.handler.ContextHandler.startContext(ContextHandler.java:763) at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:249) at org.eclipse.jetty.webapp.WebAppContext.startContext(WebAppContext.java:1250) at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:706) at org.eclipse.jetty.webapp.WebAppContext.doStart(WebAppContext.java:492) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at com.linkedin.emweb.ContextBasedHandlerImpl.doStart(ContextBasedHandlerImpl.java:105) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at com.linkedin.emweb.WebappDeployerImpl.start(WebappDeployerImpl.java:333) at com.linkedin.emweb.WebappDeployerImpl.deploy(WebappDeployerImpl.java:187) at c
[jira] [Resolved] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo
[ https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino resolved KAFKA-1588. Resolution: Won't Fix > Offset response does not support two requests for the same topic/partition > combo > > > Key: KAFKA-1588 > URL: https://issues.apache.org/jira/browse/KAFKA-1588 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2 >Reporter: Todd Palino >Assignee: Sriharsha Chintalapani > Labels: newbie > > When performing an OffsetRequest, if you request the same topic and partition > combination in a single request more than once (for example, if you want to > get both the head and tail offsets for a partition in the same request), you > will get a response for both, but they will be the same offset. > We identified that the problem is that when the offset response is assembled, > a map is used to store the offset info before it is converted to the response > format and sent to the client. Therefore, the second request for a > topic/partition combination will overwrite the offset from the first request. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo
[ https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114814#comment-14114814 ] Todd Palino commented on KAFKA-1588: I believe that since this involves significant changes to the protocol, and we have a reasonable workaround, we can close it. > Offset response does not support two requests for the same topic/partition > combo > > > Key: KAFKA-1588 > URL: https://issues.apache.org/jira/browse/KAFKA-1588 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2 >Reporter: Todd Palino >Assignee: Sriharsha Chintalapani > Labels: newbie > > When performing an OffsetRequest, if you request the same topic and partition > combination in a single request more than once (for example, if you want to > get both the head and tail offsets for a partition in the same request), you > will get a response for both, but they will be the same offset. > We identified that the problem is that when the offset response is assembled, > a map is used to store the offset info before it is converted to the response > format and sent to the client. Therefore, the second request for a > topic/partition combination will overwrite the offset from the first request. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo
[ https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105601#comment-14105601 ] Todd Palino commented on KAFKA-1588: Thanks for digging into that [~sriharsha]. I had not dug down into the internals of kafka-python on this issue that far, but I know when I reviewed the responses there were multiples. It's possible that when decoding the response, kafka-python is duplicating the returned entries to assure the number of responses matches the number of requests. > Offset response does not support two requests for the same topic/partition > combo > > > Key: KAFKA-1588 > URL: https://issues.apache.org/jira/browse/KAFKA-1588 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2 >Reporter: Todd Palino >Assignee: Sriharsha Chintalapani > Labels: newbie > > When performing an OffsetRequest, if you request the same topic and partition > combination in a single request more than once (for example, if you want to > get both the head and tail offsets for a partition in the same request), you > will get a response for both, but they will be the same offset. > We identified that the problem is that when the offset response is assembled, > a map is used to store the offset info before it is converted to the response > format and sent to the client. Therefore, the second request for a > topic/partition combination will overwrite the offset from the first request. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1605) Producer should not require metadata.broker.list
[ https://issues.apache.org/jira/browse/KAFKA-1605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14104581#comment-14104581 ] Todd Palino commented on KAFKA-1605: There are a lot of concerns with this... 1) We are moving consumers away from talking to Zookeeper (with the new consumer implementation that puts it all in the broker). Given that, we should probably not move another component (producer) towards Zookeeper 2) Building on #1, if we want to restrict access to Zookeeper (to protect the Kafka metadata), we cannot have clients, whether consumers or producers, relying on it. 3) This requires adding a Zookeeper client to the producer. I don't see a benefit in doing this, especially considering #1 above. You can put a DNS round robin or a VIP in the metadata broker list. 4) Building on #3, if at some point in the future there is a desire to use an alternate metadata store, it will be much simpler to do if only one component needs to be changed. All told, I just don't see any benefit in this proposal. > Producer should not require metadata.broker.list > > > Key: KAFKA-1605 > URL: https://issues.apache.org/jira/browse/KAFKA-1605 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1.1 >Reporter: Prashant Deva > > Producer should just query the zookeeper and get the list of brokers from it > directly. > With the current requirement the producer explicitly needs to know the ips of > all the brokers. So if you decide to replace them, you have to change the > Producer's code. > Isnt the whole point of using a zookeeper to avoid these kind of dependencies? -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1605) Producer should not require metadata.broker.list
[ https://issues.apache.org/jira/browse/KAFKA-1605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14104110#comment-14104110 ] Todd Palino commented on KAFKA-1605: Upcoming work for the consumer is going to move clients away from connecting to Zookeeper, and I think this is a very good thing. I do not think we should be moving producers towards Zookeeper. We've run into issues where it would be very nice to be able to restrict access to Zookeeper, and the only effective way to do that is if the brokers themselves are the only ones talking to Zookeeper (no clients). > Producer should not require metadata.broker.list > > > Key: KAFKA-1605 > URL: https://issues.apache.org/jira/browse/KAFKA-1605 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1.1 >Reporter: Prashant Deva > > Producer should just query the zookeeper and get the list of brokers from it > directly. > With the current requirement the producer explicitly needs to know the ips of > all the brokers. So if you decide to replace them, you have to change the > Producer's code. > Isnt the whole point of using a zookeeper to avoid these kind of dependencies? -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1599) Change preferred replica election admin command to handle large clusters
Todd Palino created KAFKA-1599: -- Summary: Change preferred replica election admin command to handle large clusters Key: KAFKA-1599 URL: https://issues.apache.org/jira/browse/KAFKA-1599 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: Todd Palino We ran into a problem with a cluster that has 70k partitions where we could not trigger a preferred replica election for all topics and partitions using the admin tool. Upon investigation, it was determined that this was because the JSON object that was being written to the admin znode to tell the controller to start the election was 1.8 MB in size. As the default Zookeeper data size limit is 1MB, and it is non-trivial to change, we should come up with a better way to represent the list of topics and partitions for this admin command. I have several thoughts on this so far: 1) Trigger the command for all topics and partitions with a JSON object that does not include an explicit list of them (i.e. a flag that says "all partitions") 2) Use a more compact JSON representation. Currently, the JSON contains a 'partitions' key which holds a list of dictionaries that each have a 'topic' and 'partition' key, and there must be one list item for each partition. This results in a lot of repetition of key names that is unneeded. Changing this to a format like this would be much more compact: {'topics': {'topicName1': [0, 1, 2, 3], 'topicName2': [0,1]}, 'version': 1} 3) Use a representation other than JSON. Strings are inefficient. A binary format would be the most compact. This does put a greater burden on tools and scripts that do not use the inbuilt libraries, but it is not too high. 4) Use a representation that involves multiple znodes. A structured tree in the admin command would probably provide the most complete solution. However, we would need to make sure to not exceed the data size limit with a wide tree (the list of children for any single znode cannot exceed the ZK data size of 1MB) Obviously, there could be a combination of #1 with a change in the representation, which would likely be appropriate as well. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo
[ https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14096083#comment-14096083 ] Todd Palino commented on KAFKA-1588: When working with this, I am sending my requests from a Python application that uses the kafka-python library. So I am not using the Java library for creating and sending the request across the wire. I was definitely able to put the same topic and partition in a single request, it is just the response that I received was two OffsetResponses with the same information in both (same offset). > Offset response does not support two requests for the same topic/partition > combo > > > Key: KAFKA-1588 > URL: https://issues.apache.org/jira/browse/KAFKA-1588 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2 >Reporter: Todd Palino >Assignee: Sriharsha Chintalapani > Labels: newbie > > When performing an OffsetRequest, if you request the same topic and partition > combination in a single request more than once (for example, if you want to > get both the head and tail offsets for a partition in the same request), you > will get a response for both, but they will be the same offset. > We identified that the problem is that when the offset response is assembled, > a map is used to store the offset info before it is converted to the response > format and sent to the client. Therefore, the second request for a > topic/partition combination will overwrite the offset from the first request. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo
[ https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-1588: --- Description: When performing an OffsetRequest, if you request the same topic and partition combination in a single request more than once (for example, if you want to get both the head and tail offsets for a partition in the same request), you will get a response for both, but they will be the same offset. We identified that the problem is that when the offset response is assembled, a map is used to store the offset info before it is converted to the response format and sent to the client. Therefore, the second request for a topic/partition combination will overwrite the offset from the first request. was: When performing an OffsetFetchRequest, if you request the same topic and partition combination in a single request more than once (for example, if you want to get both the head and tail offsets for a partition in the same request), you will get a response for both, but they will be the same offset. We identified that the problem is that when the offset response is assembled, a map is used to store the offset info before it is converted to the response format and sent to the client. Therefore, the second request for a topic/partition combination will overwrite the offset from the first request. Thanks, Jun, I had written that incorrectly. It is the OffsetRequest that manifests the problem. I've edited the description. > Offset response does not support two requests for the same topic/partition > combo > > > Key: KAFKA-1588 > URL: https://issues.apache.org/jira/browse/KAFKA-1588 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2 >Reporter: Todd Palino > > When performing an OffsetRequest, if you request the same topic and partition > combination in a single request more than once (for example, if you want to > get both the head and tail offsets for a partition in the same request), you > will get a response for both, but they will be the same offset. > We identified that the problem is that when the offset response is assembled, > a map is used to store the offset info before it is converted to the response > format and sent to the client. Therefore, the second request for a > topic/partition combination will overwrite the offset from the first request. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo
Todd Palino created KAFKA-1588: -- Summary: Offset response does not support two requests for the same topic/partition combo Key: KAFKA-1588 URL: https://issues.apache.org/jira/browse/KAFKA-1588 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Todd Palino When performing an OffsetFetchRequest, if you request the same topic and partition combination in a single request more than once (for example, if you want to get both the head and tail offsets for a partition in the same request), you will get a response for both, but they will be the same offset. We identified that the problem is that when the offset response is assembled, a map is used to store the offset info before it is converted to the response format and sent to the client. Therefore, the second request for a topic/partition combination will overwrite the offset from the first request. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1581) Log cleaner should have an option to ignore messages without keys
[ https://issues.apache.org/jira/browse/KAFKA-1581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14091622#comment-14091622 ] Todd Palino commented on KAFKA-1581: I suggest that you should just skip any invalid message (compressed or without a key) and make sure to throw a warning in the log when it is encountered. > Log cleaner should have an option to ignore messages without keys > - > > Key: KAFKA-1581 > URL: https://issues.apache.org/jira/browse/KAFKA-1581 > Project: Kafka > Issue Type: Bug >Reporter: Joel Koshy > > Right now, there is a strict requirement that compacted topics contain only > messages with keys. This makes sense, but the issue with a hard requirement > is that if it fails the cleaner quits. We should probably allow ignoring > these messages (with a warning). Alternatively, we can catch this scenario > (instead of the hard requirement) and just skip compaction for that partition. > This came up because I saw an invalid message (compressed and without a key) > in the offsets topic which broke both log compaction and the offset load > process. I filed KAFKA-1580 to prevent that from happening in the first place > but KAFKA-1580 is only for internal topics. In the general case (compacted > non-internal topics) we would not want the cleaners to exit permanently due > to an invalid (key-less) message in one of the partitions since that would > prevent compaction for other partitions as well. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14066488#comment-14066488 ] Todd Palino commented on KAFKA-1476: I tend to agree, ~jkreps. We currently have kafka-topics.sh that groups a number of topic-centric functions in a single command, and I think that's a good model to continue with. On the consumer side, I can think of the following things we should be able to do: List groups Describe group (should include the hosts in the group) List topics by group (for a given group, what topics does it consume) Describe topic by group (for a given group and topic, list partitions, host owning the partition, and the committed offset) Set offsets (for a given group and topic, explicitly set the offsets. Should allow setting to smallest, largest, and custom, which is explicitly setting the value for each partition) List groups by topic (given a topic name, what groups consume it) All of these functions have to work with both Zookeeper and group management within the brokers (once implemented). I want to know which one the data comes from as part of the results, but I don't want to have to specify it in advance. As far as the offset checker goes, I could go either way on that. Ultimately, it combines information from two different areas (consumer and broker), which means it doesn't fit cleanly in either one even if we all agree that it's really a consumer function. I think it's fine if it stays where it is for now. > Get a list of consumer groups > - > > Key: KAFKA-1476 > URL: https://issues.apache.org/jira/browse/KAFKA-1476 > Project: Kafka > Issue Type: Wish > Components: tools >Affects Versions: 0.8.1.1 >Reporter: Ryan Williams > Labels: newbie > Fix For: 0.9.0 > > Attachments: KAFKA-1476.patch > > > It would be useful to have a way to get a list of consumer groups currently > active via some tool/script that ships with kafka. This would be helpful so > that the system tools can be explored more easily. > For example, when running the ConsumerOffsetChecker, it requires a group > option > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group > ? > But, when just getting started with kafka, using the console producer and > consumer, it is not clear what value to use for the group option. If a list > of consumer groups could be listed, then it would be clear what value to use. > Background: > http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1440) Per-request tracing
Todd Palino created KAFKA-1440: -- Summary: Per-request tracing Key: KAFKA-1440 URL: https://issues.apache.org/jira/browse/KAFKA-1440 Project: Kafka Issue Type: Improvement Components: clients Reporter: Todd Palino Could we have a flag in requests (potentially in the header for all requests, but at least for produce and fetch) that would enable tracing for that request. Currently, if we want to debug an issue with a request, we need to turn on trace level logging for the entire broker. If the client could ask for the request to be traced, we could then log detailed information for just that request. Ideally, this would be available as a flag that can be enabled in the client via JMX, so it could be done without needing to restart the client application. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1276) Provide a list of config overrides available when running kafka.topics
Todd Palino created KAFKA-1276: -- Summary: Provide a list of config overrides available when running kafka.topics Key: KAFKA-1276 URL: https://issues.apache.org/jira/browse/KAFKA-1276 Project: Kafka Issue Type: Bug Components: tools Reporter: Todd Palino It would be helpful to have the help for kafka-topics enumerate a list of the per-topic configuration overrides that are available with the --config option. -- This message was sent by Atlassian JIRA (v6.1.5#6160)
[jira] [Commented] (KAFKA-1182) Topic not created if number of live brokers less than # replicas
[ https://issues.apache.org/jira/browse/KAFKA-1182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13903790#comment-13903790 ] Todd Palino commented on KAFKA-1182: I really don't think it needs to be that complicated, Jun. whether the topic is created under replicated because there aren't enough brokers right now or there never have been, it's the same situation. It has an operational solution (add a broker). I'm not sure I see the value in tracking brokers that used to be around, and even if I only ever had 3 brokers in the cluster and someone specified a RF of 4, I still want the topic created. -Todd > Topic not created if number of live brokers less than # replicas > > > Key: KAFKA-1182 > URL: https://issues.apache.org/jira/browse/KAFKA-1182 > Project: Kafka > Issue Type: Improvement > Components: producer >Affects Versions: 0.8.0 > Environment: Centos 6.3 >Reporter: Hanish Bansal >Assignee: Jun Rao > > We are having kafka cluster of 2 nodes. (Using Kafka 0.8.0 version) > Replication Factor: 2 > Number of partitions: 2 > Actual Behaviour: > Out of two nodes, if any one node goes down then topic is not created in > kafka. > Steps to Reproduce: > 1. Create a 2 node kafka cluster with replication factor 2 > 2. Start the Kafka cluster > 3. Kill any one node > 4. Start the producer to write on a new topic > 5. Observe the exception stated below: > 2013-12-12 19:37:19 0 [WARN ] ClientUtils$ - Fetching topic metadata with > correlation id 3 for topics [Set(test-topic)] from broker > [id:0,host:122.98.12.11,port:9092] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect(Native Method) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:76) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > Expected Behaviour: > In case of live brokers less than # replicas: > There should be topic created so at least live brokers can receive the data. > They can replicate data to other broker once any down broker comes up. > Because now in case of live brokers less than # replicas, there is complete > loss of data. -- This message was sent by Atlassian JIRA (v6.1.5#6160)
[jira] [Commented] (KAFKA-1182) Topic not created if number of live brokers less than # replicas
[ https://issues.apache.org/jira/browse/KAFKA-1182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13894781#comment-13894781 ] Todd Palino commented on KAFKA-1182: I'll have to side with Clark on this one. Given a choice between creating the topic in a degraded state (an undesirable, but tolerable problem) and losing data that is getting sent into a topic that does not yet exist (a serious problem, and one that is quite possibly not recoverable depending on the data), I'll take the degraded state. > Topic not created if number of live brokers less than # replicas > > > Key: KAFKA-1182 > URL: https://issues.apache.org/jira/browse/KAFKA-1182 > Project: Kafka > Issue Type: Improvement > Components: producer >Affects Versions: 0.8.0 > Environment: Centos 6.3 >Reporter: Hanish Bansal >Assignee: Jun Rao > > We are having kafka cluster of 2 nodes. (Using Kafka 0.8.0 version) > Replication Factor: 2 > Number of partitions: 2 > Actual Behaviour: > Out of two nodes, if any one node goes down then topic is not created in > kafka. > Steps to Reproduce: > 1. Create a 2 node kafka cluster with replication factor 2 > 2. Start the Kafka cluster > 3. Kill any one node > 4. Start the producer to write on a new topic > 5. Observe the exception stated below: > 2013-12-12 19:37:19 0 [WARN ] ClientUtils$ - Fetching topic metadata with > correlation id 3 for topics [Set(test-topic)] from broker > [id:0,host:122.98.12.11,port:9092] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect(Native Method) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:76) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > Expected Behaviour: > In case of live brokers less than # replicas: > There should be topic created so at least live brokers can receive the data. > They can replicate data to other broker once any down broker comes up. > Because now in case of live brokers less than # replicas, there is complete > loss of data. -- This message was sent by Atlassian JIRA (v6.1.5#6160)
[jira] [Commented] (KAFKA-1125) Add options to let admin tools block until finish
[ https://issues.apache.org/jira/browse/KAFKA-1125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13889898#comment-13889898 ] Todd Palino commented on KAFKA-1125: I believe an option for each of these tools to make them block would be very useful. As noted, in scripts where I want to perform tasks in sequence (such as moving partitions one topic at a time so as not to overload the cluster), I need to check for completion of each command by using the reassignment JSON file with --verify and parsing the response. On the separate issue of creating a queue of admin tasks, this is probably a good idea as, for the most part, we do not want to perform two actions at the same time. However, I would request that if we have that queue of tasks, tools to view and modify the queue (removing tasks or potentially reordering them) would be helpful. > Add options to let admin tools block until finish > - > > Key: KAFKA-1125 > URL: https://issues.apache.org/jira/browse/KAFKA-1125 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Guozhang Wang >Assignee: Guozhang Wang > > Topic config change as well as create-topic, add-partition, > partition-reassignment and preferred leader election are all asynchronous in > the sense that the admin command would return immediately and one has to > check himself if the process has finished. It is better to add an option to > make these commands blocking until the process is done. > Also, it would be good to order admin tasks in order so that they can be > executed sequentially in logic. -- This message was sent by Atlassian JIRA (v6.1.5#6160)