[jira] [Created] (KAFKA-6559) Iterate record sets before calling Log.append

2018-02-13 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-6559:
--

 Summary: Iterate record sets before calling Log.append
 Key: KAFKA-6559
 URL: https://issues.apache.org/jira/browse/KAFKA-6559
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 1.0.0
Reporter: Todd Palino
Assignee: Todd Palino


If a Produce request contains multiple record sets for a single 
topic-partition, it is better to iterate these before calling Log.append. This 
is because append will process all the sets together, and therefore will need 
to reassign offsets even if the offsets for an individual record set are 
properly formed. By iterating the record sets before calling append, each set 
can be considered on its own and potentially be appended without reassigning 
offsets.

While the core Java producer client does not current operate this way, it is 
permitted by the protocol and may be used by other clients that aggregate 
multiple batches together to produce them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consumer fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-5056:
---
Summary: Shuffling of partitions in old consumer fetch requests removed  
(was: Shuffling of partitions in old consume fetch requests removed)

> Shuffling of partitions in old consumer fetch requests removed
> --
>
> Key: KAFKA-5056
> URL: https://issues.apache.org/jira/browse/KAFKA-5056
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Todd Palino
>Assignee: Todd Palino
>
> [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
>  deprecated the constructor to {{FetchRequest}} which shuffles the 
> {{requestInfo}} parameter, in favor of round robin reordering logic added to 
> the replica fetcher and the consumer API. However, this was not added to the 
> old consumer {{ConsumerFetcherThread}}, which has resulted in unfair 
> partition fetching since 0.10.1.
> In order to maintain the old consumer, we need to add the removed shuffle to 
> {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} 
> is composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-5056:
---
Reviewer: Joel Koshy
  Status: Patch Available  (was: In Progress)

> Shuffling of partitions in old consume fetch requests removed
> -
>
> Key: KAFKA-5056
> URL: https://issues.apache.org/jira/browse/KAFKA-5056
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Todd Palino
>Assignee: Todd Palino
>
> [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
>  deprecated the constructor to {{FetchRequest}} which shuffles the 
> {{requestInfo}} parameter, in favor of round robin reordering logic added to 
> the replica fetcher and the consumer API. However, this was not added to the 
> old consumer {{ConsumerFetcherThread}}, which has resulted in unfair 
> partition fetching since 0.10.1.
> In order to maintain the old consumer, we need to add the removed shuffle to 
> {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} 
> is composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5056 started by Todd Palino.
--
> Shuffling of partitions in old consume fetch requests removed
> -
>
> Key: KAFKA-5056
> URL: https://issues.apache.org/jira/browse/KAFKA-5056
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Todd Palino
>Assignee: Todd Palino
>
> [KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
>  deprecated the constructor to {{FetchRequest}} which shuffles the 
> {{requestInfo}} parameter, in favor of round robin reordering logic added to 
> the replica fetcher and the consumer API. However, this was not added to the 
> old consumer {{ConsumerFetcherThread}}, which has resulted in unfair 
> partition fetching since 0.10.1.
> In order to maintain the old consumer, we need to add the removed shuffle to 
> {{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} 
> is composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5056) Shuffling of partitions in old consume fetch requests removed

2017-04-11 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-5056:
--

 Summary: Shuffling of partitions in old consume fetch requests 
removed
 Key: KAFKA-5056
 URL: https://issues.apache.org/jira/browse/KAFKA-5056
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Todd Palino
Assignee: Todd Palino


[KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes]
 deprecated the constructor to {{FetchRequest}} which shuffles the 
{{requestInfo}} parameter, in favor of round robin reordering logic added to 
the replica fetcher and the consumer API. However, this was not added to the 
old consumer {{ConsumerFetcherThread}}, which has resulted in unfair partition 
fetching since 0.10.1.

In order to maintain the old consumer, we need to add the removed shuffle to 
{{buildFetchRequest}} as the topic-partition list for each {{FetchRequest}} is 
composed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests

2017-03-02 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15893728#comment-15893728
 ] 

Todd Palino commented on KAFKA-1342:


[~wushujames], I'm not sure about increasing the number of requests, but a lot 
of this should have been resolved recently with some updates that I believe 
[~becket_qin] made. To make it better, we had bumped 
controller.socket.timeout.ms significantly (to 30). We didn't see any side 
effects from doing that.

> Slow controlled shutdowns can result in stale shutdown requests
> ---
>
> Key: KAFKA-1342
> URL: https://issues.apache.org/jira/browse/KAFKA-1342
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: newbie++, newbiee, reliability
> Fix For: 0.11.0.0
>
>
> I don't think this is a bug introduced in 0.8.1., but triggered by the fact
> that controlled shutdown seems to have become slower in 0.8.1 (will file a
> separate ticket to investigate that). When doing a rolling bounce, it is
> possible for a bounced broker to stop all its replica fetchers since the
> previous PID's shutdown requests are still being shutdown.
> - 515 is the controller
> - Controlled shutdown initiated for 503
> - Controller starts controlled shutdown for 503
> - The controlled shutdown takes a long time in moving leaders and moving
>   follower replicas on 503 to the offline state.
> - So 503's read from the shutdown channel times out and a new channel is
>   created. It issues another shutdown request.  This request (since it is a
>   new channel) is accepted at the controller's socket server but then waits
>   on the broker shutdown lock held by the previous controlled shutdown which
>   is still in progress.
> - The above step repeats for the remaining retries (six more requests).
> - 503 hits SocketTimeout exception on reading the response of the last
>   shutdown request and proceeds to do an unclean shutdown.
> - The controller's onBrokerFailure call-back fires and moves 503's replicas
>   to offline (not too important in this sequence).
> - 503 is brought back up.
> - The controller's onBrokerStartup call-back fires and moves its replicas
>   (and partitions) to online state. 503 starts its replica fetchers.
> - Unfortunately, the (phantom) shutdown requests are still being handled and
>   the controller sends StopReplica requests to 503.
> - The first shutdown request finally finishes (after 76 minutes in my case!).
> - The remaining shutdown requests also execute and do the same thing (sends
>   StopReplica requests for all partitions to
>   503).
> - The remaining requests complete quickly because they end up not having to
>   touch zookeeper paths - no leaders left on the broker and no need to
>   shrink ISR in zookeeper since it has already been done by the first
>   shutdown request.
> - So in the end-state 503 is up, but effectively idle due to the previous
>   PID's shutdown requests.
> There are some obvious fixes that can be made to controlled shutdown to help
> address the above issue. E.g., we don't really need to move follower
> partitions to Offline. We did that as an "optimization" so the broker falls
> out of ISR sooner - which is helpful when producers set required.acks to -1.
> However it adds a lot of latency to controlled shutdown. Also, (more
> importantly) we should have a mechanism to abort any stale shutdown process.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-11-27 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15700938#comment-15700938
 ] 

Todd Palino commented on KAFKA-3959:


+1 as well. I think keeping the config/server.properties file as a quickstart 
that gets you going with a standalone broker is a good plan.

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-11-15 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668010#comment-15668010
 ] 

Todd Palino commented on KAFKA-3959:


As noted, I just want the config enforced. If RF=3 is configured, that's what 
we should get. If you need RF=1 for testing, or for specific use cases, set it. 
Even make the default 1 if that's really what we want. But if I explicitly set 
RF=3, that's what I should get. And if it causes errors, and I've explicitly 
set it, that's on me as the user.

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-17 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425756#comment-15425756
 ] 

Todd Palino commented on KAFKA-4050:


So first off, yes, the thread dump (which [~jjkoshy] posted) shows that the 
offending line of code is "NativePRNG.java:481". I checked, and that's very 
clearly in the non-blocking NativePRNG variant that explictly uses /dev/urandom.

I had considered changing the default, [~ijuma], and I actually thought about 
adding a note to this ticket about it earlier today. Despite the fact that the 
default clearly has performance issues, I don't think we should change the 
default behavior, which is to let the JRE pick the PRNG implementation. The 
reason is that we can't be sure that on any given system, in any given JRE, 
that the new one we set explicitly will exist, and that would cause the default 
behavior to break. The SHA1PRNG implementation should exist everywhere, but I'd 
rather not take the risk. I think it's better to leave the default as is, and 
call out the issue very clearly in the documentation.

> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to provide a 
> SecureRandom instance, which the code currently sets to null. This change 
> creates a new config to specify an implementation, and instantiates that and 
> passes it to SSLContext if provided. This will also let someone select a 
> stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-16 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423400#comment-15423400
 ] 

Todd Palino commented on KAFKA-4050:


It appears to be called every time something needs to be encrypted (have to get 
randomness to run the crypto routines), so yeah, every single packet being sent 
would need a call to get random bytes.

> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to provide a 
> SecureRandom instance, which the code currently sets to null. This change 
> creates a new config to specify an implementation, and instantiates that and 
> passes it to SSLContext if provided. This will also let someone select a 
> stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-16 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-4050:
--

 Summary: Allow configuration of the PRNG used for SSL
 Key: KAFKA-4050
 URL: https://issues.apache.org/jira/browse/KAFKA-4050
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 0.10.0.1
Reporter: Todd Palino
Assignee: Todd Palino


This change will make the pseudo-random number generator (PRNG) implementation 
used by the SSLContext configurable. The configuration is not required, and the 
default is to use whatever the default PRNG for the JDK/JRE is. Providing a 
string, such as "SHA1PRNG", will cause that specific SecureRandom 
implementation to get passed to the SSLContext.

When enabling inter-broker SSL in our certification cluster, we observed severe 
performance issues. For reference, this cluster can take up to 600 MB/sec of 
inbound produce traffic over SSL, with RF=2, before it gets close to 
saturation, and the mirror maker normally produces about 400 MB/sec (unless it 
is lagging). When we enabled inter-broker SSL, we saw persistent replication 
problems in the cluster at any inbound rate of more than about 6 or 7 MB/sec 
per-broker. This was narrowed down to all the network threads blocking on a 
single lock in the SecureRandom code.

It turns out that the default PRNG implementation on Linux is NativePRNG. This 
uses randomness from /dev/urandom (which, by itself, is a non-blocking read) 
and mixes it with randomness from SHA1. The problem is that the entire 
application shares a single SecureRandom instance, and NativePRNG has a global 
lock within the implNextBytes method. Switching to another implementation 
(SHA1PRNG, which has better performance characteristics and is still considered 
secure) completely eliminated the bottleneck and allowed the cluster to work 
properly at saturation.

The SSLContext initialization has an optional argument to provide a 
SecureRandom instance, which the code currently sets to null. This change 
creates a new config to specify an implementation, and instantiates that and 
passes it to SSLContext if provided. This will also let someone select a 
stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-08-13 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420091#comment-15420091
 ] 

Todd Palino commented on KAFKA-3959:


If that's the case, then the default should be set to 1. As least having that 
as the default, and returning an error if the RF cannot be satisfied, is a 
reasonable and expected outcome. But having it set up to not enforce the 
configured RF leads to unintended behavior, even if it's documented here.

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2016-08-12 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15419365#comment-15419365
 ] 

Todd Palino commented on KAFKA-3959:


Agree with Onur 100% here. We've been running into this a lot lately, and we 
never know about it until we do a rolling restart of the cluster and consumers 
break when the topic goes offline.

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic

2016-06-09 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323930#comment-15323930
 ] 

Todd Palino commented on KAFKA-3797:


Obviously we can't do something like that with a running consumer. But we do it 
all the time with inactive consumers.

I haven't looked at the most recent changes to the offset API yet, so I'm not 
sure whether or not there are restrictions in place that would allow a client 
that is not part of the group to commit offsets for the group.

> Improve security of __consumer_offsets topic
> 
>
> Key: KAFKA-3797
> URL: https://issues.apache.org/jira/browse/KAFKA-3797
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> By default, we allow clients to override committed offsets and group metadata 
> using the Produce API as long as they have Write access to the 
> __consumer_offsets topic. From one perspective, this is fine: administrators 
> can restrict access to this topic to trusted users. From another, it seems 
> less than ideal for Write permission on that topic to subsume Group-level 
> permissions for the full cluster. With this access, a user can cause all 
> kinds of mischief including making the group "lose" data by setting offsets 
> ahead of the actual position. This is probably not obvious to administrators 
> who grant access to topics using a wildcard and it increases the risk from 
> incorrectly applying topic patterns (if we ever add support for them). It 
> seems reasonable to consider safer default behavior:
> 1. A simple option to fix this would be to prevent wildcard topic rules from 
> applying to internal topics. To write to an internal topic, you need a 
> separate rule which explicitly grants authorization to that topic.
> 2. A more extreme and perhaps safer option might be to prevent all writes to 
> this topic (and potentially other internal topics) through the Produce API. 
> Do we have any use cases which actually require writing to 
> __consumer_offsets? The only potential case that comes to mind is replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3797) Improve security of __consumer_offsets topic

2016-06-07 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318854#comment-15318854
 ] 

Todd Palino commented on KAFKA-3797:


I think the first option is more reasonable, and provides sufficient security. 
I can see cases where you might want to have an external tool for changing a 
group's committed offsets.

> Improve security of __consumer_offsets topic
> 
>
> Key: KAFKA-3797
> URL: https://issues.apache.org/jira/browse/KAFKA-3797
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>
> By default, we allow clients to override committed offsets and group metadata 
> using the Produce API as long as they have Write access to the 
> __consumer_offsets topic. From one perspective, this is fine: administrators 
> can restrict access to this topic to trusted users. From another, it seems 
> less than ideal for Write permission on that topic to subsume Group-level 
> permissions for the full cluster. With this access, a user can cause all 
> kinds of mischief including making the group "lose" data by setting offsets 
> ahead of the actual position. This is probably not obvious to administrators 
> who grant access to topics using a wildcard and it increases the risk from 
> incorrectly applying topic patterns (if we ever add support for them). It 
> seems reasonable to consider safer default behavior:
> 1. A simple option to fix this would be to prevent wildcard topic rules from 
> applying to internal topics. To write to an internal topic, you need a 
> separate rule which explicitly grants authorization to that topic.
> 2. A more extreme and perhaps safer option might be to prevent all writes to 
> this topic (and potentially other internal topics) through the Produce API. 
> Do we have any use cases which actually require writing to 
> __consumer_offsets? The only potential case that comes to mind is replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3725) Update documentation with regards to XFS

2016-06-02 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312288#comment-15312288
 ] 

Todd Palino commented on KAFKA-3725:


I'll take a look at this and put together a PR with some updates.

> Update documentation with regards to XFS
> 
>
> Key: KAFKA-3725
> URL: https://issues.apache.org/jira/browse/KAFKA-3725
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Todd Palino
>
> Our documentation currently states that only Ext4 has been tried (by 
> LinkedIn):
> "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS 
> supposedly handle locking during fsync better. We have only tried Ext4, 
> though."
> http://kafka.apache.org/documentation.html#ext4
> I think this is no longer true, so we should update the documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3725) Update documentation with regards to XFS

2016-06-02 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino reassigned KAFKA-3725:
--

Assignee: Todd Palino

> Update documentation with regards to XFS
> 
>
> Key: KAFKA-3725
> URL: https://issues.apache.org/jira/browse/KAFKA-3725
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Todd Palino
>
> Our documentation currently states that only Ext4 has been tried (by 
> LinkedIn):
> "Ext4 may or may not be the best filesystem for Kafka. Filesystems like XFS 
> supposedly handle locking during fsync better. We have only tried Ext4, 
> though."
> http://kafka.apache.org/documentation.html#ext4
> I think this is no longer true, so we should update the documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3174) Re-evaluate the CRC32 class performance.

2016-02-01 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126292#comment-15126292
 ] 

Todd Palino commented on KAFKA-3174:


Yeah, definitely no problems with Java 1.8. We've been running 1.8 u5 for quite 
some time, and we're in the process of updating to u40. It's worth noting that 
we have been running into a number of SEGVs with mirror maker (but not the 
broker) under u5, but the problem is supposedly fixed in u40.

> Re-evaluate the CRC32 class performance.
> 
>
> Key: KAFKA-3174
> URL: https://issues.apache.org/jira/browse/KAFKA-3174
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.9.0.1
>
>
> We used org.apache.kafka.common.utils.CRC32 in clients because it has better 
> performance than java.util.zip.CRC32 in Java 1.6.
> In a recent test I ran it looks in Java 1.8 the CRC32 class is 2x as fast as 
> the Crc32 class we are using now. We may want to re-evaluate the performance 
> of Crc32 class and see it makes sense to simply use java CRC32 instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing

2015-12-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15065636#comment-15065636
 ] 

Todd Palino commented on KAFKA-3015:


[~jkreps] So yes, I'm essentially saying that I would prefer to see 
optimizations to the current partitioning scheme, and the addition of being 
able to handle single disk failures without terminating the entire broker. I 
would argue that this would have a higher payoff for people who do not have the 
ability to easily swap in new machines (as we do, or those in AWS would), 
because it will allow for more granular failures. Disks tend to fail more than 
any other component, so the ability to survive a disk failure should be 
attractive to anyone.

As noted, there are a lot of benefits of using JBOD, and I would not argue 
against having good support for it. Especially as we've been moving to new 
hardware, we now can't take advantage of all the network we have with 10gig 
interfaces primarily due to limitations on disk capacity and throughput. 
Additionally, we'd like to move to RF=3, but can't stomach it because of the 
cost. If we drop the RAID 10 we will significantly increase throughput and 
double our storage capacity. Then we can easily move to RF=3 (using 50% more 
disk) with little impact.

> Improve JBOD data balancing
> ---
>
> Key: KAFKA-3015
> URL: https://issues.apache.org/jira/browse/KAFKA-3015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>
> When running with multiple data directories (i.e. JBOD) we currently place 
> partitions entirely within one data directory. This tends to lead to poor 
> balancing across disks as some topics have more throughput/retention and not 
> all disks get data from all topics. You can't fix this problem with smarter 
> partition placement strategies because ultimately you don't know when a 
> partition is created when or how heavily it will be used (this is a subtle 
> point, and the tendency is to try to think of some more sophisticated way to 
> place partitions based on current data size but this is actually 
> exceptionally dangerous and can lead to much worse imbalance when creating 
> many partitions at once as they would all go to the disk with the least 
> data). We don't support online rebalancing across directories/disks so this 
> imbalance is a big problem and limits the usefulness of this configuration. 
> Implementing online rebalancing of data across disks without downtime is 
> actually quite hard and requires lots of I/O since you have to actually 
> rewrite full partitions of data.
> An alternative would be to place each partition in *all* directories/drives 
> and round-robin *segments* within the partition across the directories. So 
> the layout would be something like:
>   drive-a/mytopic-0/
>   000.data
>   000.index
>   0024680.data
>   0024680.index
>   drive-a/mytopic-0/
>   0012345.data
>   0012345.index
>   0036912.data
>   0036912.index
> This is a little harder to implement than the current approach but not very 
> hard, and it is a lot easier than implementing online data balancing across 
> disks while retaining the current approach. I think this could easily be done 
> in a backwards compatible way.
> I think the balancing you would get from this in most cases would be good 
> enough to make JBOD the default configuration. Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3015) Improve JBOD data balancing

2015-12-18 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15065239#comment-15065239
 ] 

Todd Palino commented on KAFKA-3015:


While this seems good on the surface, it makes it impossible to continue 
running the broker on a single disk failure. This is one of our primary 
complaints about JBOD, and one of the main reasons we cannot use it (as much as 
we would like to).

> Improve JBOD data balancing
> ---
>
> Key: KAFKA-3015
> URL: https://issues.apache.org/jira/browse/KAFKA-3015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>
> When running with multiple data directories (i.e. JBOD) we currently place 
> partitions entirely within one data directory. This tends to lead to poor 
> balancing across disks as some topics have more throughput/retention and not 
> all disks get data from all topics. You can't fix this problem with smarter 
> partition placement strategies because ultimately you don't know when a 
> partition is created when or how heavily it will be used (this is a subtle 
> point, and the tendency is to try to think of some more sophisticated way to 
> place partitions based on current data size but this is actually 
> exceptionally dangerous and can lead to much worse imbalance when creating 
> many partitions at once as they would all go to the disk with the least 
> data). We don't support online rebalancing across directories/disks so this 
> imbalance is a big problem and limits the usefulness of this configuration. 
> Implementing online rebalancing of data across disks without downtime is 
> actually quite hard and requires lots of I/O since you have to actually 
> rewrite full partitions of data.
> An alternative would be to place each partition in *all* directories/drives 
> and round-robin *segments* within the partition across the directories. So 
> the layout would be something like:
>   drive-a/mytopic-0/
>   000.data
>   000.index
>   0024680.data
>   0024680.index
>   drive-a/mytopic-0/
>   0012345.data
>   0012345.index
>   0036912.data
>   0036912.index
> This is a little harder to implement than the current approach but not very 
> hard, and it is a lot easier than implementing online data balancing across 
> disks while retaining the current approach. I think this could easily be done 
> in a backwards compatible way.
> I think the balancing you would get from this in most cases would be good 
> enough to make JBOD the default configuration. Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2759) Mirror maker can leave gaps of missing messages if the process dies after a partition is reset and before the first offset commit

2015-11-06 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14994590#comment-14994590
 ] 

Todd Palino commented on KAFKA-2759:


The thought we had internally on the situation in KAFKA-1006 was that we should 
differentiate the behavior between what happens if a consumer has no offsets at 
all for a partition and what happens if the consumer thinks it has offsets, but 
is out of range. These are two different situations. In the case of mirror 
maker, we would want "earliest" behavior for the first situation (whether 
default or configured, doesn't matter), as we never want to lose data from new 
topics or partitions. For the second situation we want "closest". That is, if 
your out of range offset is closer to the earliest offset, start there. 
Otherwise, if your out of range offset is closer to the latest offset, start 
there. There's a little more detail involved on this, as there are intricacies 
with the fetch response. [~jjkoshy] has more information on that discussion.


> Mirror maker can leave gaps of missing messages if the process dies after a 
> partition is reset and before the first offset commit
> -
>
> Key: KAFKA-2759
> URL: https://issues.apache.org/jira/browse/KAFKA-2759
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.2
>Reporter: Ewen Cheslack-Postava
>Priority: Minor
>
> Based on investigation of KAFKA-2747. When mirror maker first starts or if it 
> picks up new topics/partitions, it will use the reset policy to choose where 
> to start. By default this uses 'latest'. If it starts reading messages and 
> then dies before committing offsets for the first time, then the mirror maker 
> that takes over that partition will also reset. This can result in some 
> messages making it to the consumer, then a gap that were skipped, and then 
> messages that get processed by the new MM process.
> One solution to this problem would be to make sure that offsets are committed 
> after they are reset but before the first message is passed to the producer. 
> In other words, in the case of a reset, MM should record where it's going to 
> start reading data from before processing any messages. This guarantees all 
> messages after the first one delivered by MM will appear at least once.
> This is minor since it should be very rare, but it does break an assumption 
> that people probably make about the output -- once you start receiving data, 
> you aren't guaranteed all subsequent messages will appear at least once.
> This same issue could affect Copycat as well. In fact, it may be generally 
> useful to allow consumers to know when the offset was reset so they can 
> handle cases like this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow

2015-10-23 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971806#comment-14971806
 ] 

Todd Palino commented on KAFKA-2235:


I don't think we can. I have already increased it from 512MB to 1GB, and we 
still hit the same problems. That only provides a 2x increase in the size of 
the map, and I would need almost a 10x increase to solve the problem.

> LogCleaner offset map overflow
> --
>
> Key: KAFKA-2235
> URL: https://issues.apache.org/jira/browse/KAFKA-2235
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 0.8.1, 0.8.2.0
>Reporter: Ivan Simoneko
>Assignee: Ivan Simoneko
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch
>
>
> We've seen log cleaning generating an error for a topic with lots of small 
> messages. It seems that cleanup map overflow is possible if a log segment 
> contains more unique keys than empty slots in offsetMap. Check for baseOffset 
> and map utilization before processing segment seems to be not enough because 
> it doesn't take into account segment size (number of unique messages in the 
> segment).
> I suggest to estimate upper bound of keys in a segment as a number of 
> messages in the segment and compare it with the number of available slots in 
> the map (keeping in mind desired load factor). It should work in cases where 
> an empty map is capable to hold all the keys for a single segment. If even a 
> single segment no able to fit into an empty map cleanup process will still 
> fail. Probably there should be a limit on the log segment entries count?
> Here is the stack trace for this error:
> 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: Attempt to add a new 
> entry to a full offset map.
>at scala.Predef$.require(Predef.scala:233)
>at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538)
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>at 
> kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512)
>at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512)
>at kafka.log.Cleaner.clean(LogCleaner.scala:307)
>at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
>at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
>at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2235) LogCleaner offset map overflow

2015-10-23 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971740#comment-14971740
 ] 

Todd Palino commented on KAFKA-2235:


I'm sure [~jjkoshy] will follow along with more detail on this, but we've run 
into a serious problem with this check. Basically, it's impossible to perform 
this kind of check accurately before the offset map is built. We now have 
partitions that should be able to be compacted as the total number of unique 
keys is far below the size of the offset map (currently at ~39 million for our 
configuration) but the messages are very frequent and very small. Even at a 
segment size of 64 MB, we have over 300 million messages in those segments. So 
this check creates a situation where log compaction should succeed, but fails 
because of a speculative check.

While I can play the game of trying to walk back segment sizes, there's no way 
to size segments by number of messages, so it's a guessing game. In addition, 
the check is clearly wrong in that case, so I shouldn't have to config around 
it. Lastly, the check causes the log cleaner thread to exit, which means log 
compaction on the broker fails entirely, rather than just skipping that 
partition.

A better way to handle this would be to cleanly catch the original error you 
are seeing, generate a clear error message in the logs as to what the failure 
is, and allow the log cleaner to continue and handle other partitions. You 
could also maintain a blacklist of partitions in memory in the log cleaner to 
make sure you don't come back around and try and compact the partition again.

> LogCleaner offset map overflow
> --
>
> Key: KAFKA-2235
> URL: https://issues.apache.org/jira/browse/KAFKA-2235
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 0.8.1, 0.8.2.0
>Reporter: Ivan Simoneko
>Assignee: Ivan Simoneko
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2235_v1.patch, KAFKA-2235_v2.patch
>
>
> We've seen log cleaning generating an error for a topic with lots of small 
> messages. It seems that cleanup map overflow is possible if a log segment 
> contains more unique keys than empty slots in offsetMap. Check for baseOffset 
> and map utilization before processing segment seems to be not enough because 
> it doesn't take into account segment size (number of unique messages in the 
> segment).
> I suggest to estimate upper bound of keys in a segment as a number of 
> messages in the segment and compare it with the number of available slots in 
> the map (keeping in mind desired load factor). It should work in cases where 
> an empty map is capable to hold all the keys for a single segment. If even a 
> single segment no able to fit into an empty map cleanup process will still 
> fail. Probably there should be a limit on the log segment entries count?
> Here is the stack trace for this error:
> 2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] 
> kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: Attempt to add a new 
> entry to a full offset map.
>at scala.Predef$.require(Predef.scala:233)
>at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543)
>at 
> kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538)
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>at 
> kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515)
>at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512)
>at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512)
>at kafka.log.Cleaner.clean(LogCleaner.scala:307)
>at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
>at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
>at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964228#comment-14964228
 ] 

Todd Palino commented on KAFKA-2017:


I think we definitely need to maintain the ability to get that type of 
information, whether it's within the protocol or via an admin endpoint. Being 
able to tell what consumers exist in a group, as well as what partitions each 
of them owns, is important information to have. And it should be available not 
just from the consumers, but from the coordinator itself. That way you can 
debug issues where they have gone out of sync, and you can provide tools (such 
as burrow) to provide consumer status information independently.

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-10-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14964003#comment-14964003
 ] 

Todd Palino commented on KAFKA-2580:


It's about as graceful as an OOM, which is to say "not very". Essentially, it 
hits the limit and falls over and dies with an exception. We've run into it a 
bit with both leaking FDs from an implementation issue, and with runaway 
clients that don't do the right thing. In both situations, you are correct that 
you will generally end up seeing it as a cascading failure through the cluster.

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>Assignee: Grant Henke
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-10-19 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963986#comment-14963986
 ] 

Todd Palino commented on KAFKA-2580:


I agree with [~jkreps] here, that having a high FD limit is not a bad thing. As 
[~jjkoshy] noted, we're already running at 400k internally (recently increased 
from 200k). Part of that is to handle growth, and part of that is to have a 
good bit of headroom if something starts to leak FDs so we have some time to 
address it before it kills the process (we alert at 50% utilization).

The LRU cache option is probably the best. You can set it to an arbitrarily 
high number (the best option here might be to cap it near, but below, your 
per-process limit) if you want to effectively disable it, and it would 
generally avoid the process of having to check and act on expiring the FDs in 
the timer option. I can see arguments for setting the default either high or 
low (and I consider 10k to be low). Regardless, as long as it's configurable 
and documented it will be fine.

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>Assignee: Grant Henke
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-16 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14960809#comment-14960809
 ] 

Todd Palino commented on KAFKA-2017:


Just to throw in my 2 cents here, I don't think that persisting this state in a 
special topic in Kafka is a bad idea. My only concern is that we have seen 
issues with the offsets already from time to time, and we'll want to make sure 
we take those lessons learned and handle them from the start. The ones I am 
aware of are:

1) Creation of the special topic at cluster initialization. If we specify an RF 
of N for the special topic, then the brokers must make this happen. The first 
broker that comes up can't create it with an RF of 1 and own all the 
partitions. Either it must reject all operations that would use the special 
topic until N brokers are members of the cluster and the it can be created, or 
it must create the topic in such a way that as soon as there are N brokers 
available the RF is corrected to the configured number.

2) Load of the special topic into local cache. Whenever a coordinator loads the 
special topic, there is a period of time while it is loading state where it 
cannot service requests. We've seen problems with this related to log 
compaction, where the partitions were excessively large, but I can see as we 
move an increasing number of (group, partition) tuples over to Kafka-committed 
offsets it could become a scale issue very easily. This should not be a big 
deal for group state information, as that should always be smaller than the 
offset information for the group, but we may want to create a longer term plan 
for handling auto-scaling of the special topics (the ability to increase the 
number of partitions and move group information from the partition it used to 
hash to to the one it hashes to after scaling).

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-08-22 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14708282#comment-14708282
 ] 

Todd Palino commented on KAFKA-1566:


This makes a lot of sense. We don't use any of the standard start/stop 
components or scripts, just the admin CLI tools, so I don't see this having an 
effect on us.

> Kafka environment configuration (kafka-env.sh)
> --
>
> Key: KAFKA-1566
> URL: https://issues.apache.org/jira/browse/KAFKA-1566
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Cosmin Lehene
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
> KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch
>
>
> It would be useful (especially for automated deployments) to have an 
> environment configuration file that could be sourced from the launcher files 
> (e.g. kafka-run-server.sh). 
> This is how this could look like kafka-env.sh 
> {code}
> export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops 
> -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
> -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35' %>" 
> export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" 
> export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" 
> {code} 
> kafka-server-start.sh 
> {code} 
> ... 
> source $base_dir/config/kafka-env.sh 
> ... 
> {code} 
> This approach is consistent with Hadoop and HBase. However the idea here is 
> to be able to set these values in a single place without having to edit 
> startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2252) Socket connection closing is logged, but not corresponding opening of socket

2015-06-15 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14586437#comment-14586437
 ] 

Todd Palino commented on KAFKA-2252:


I moved the normal connection closed message to DEBUG level in KAFKA-2175. In 
general, we should not log either normal connection close or open at INFO 
level. There is far too much noise due to these messages.

> Socket connection closing is logged, but not corresponding opening of socket
> 
>
> Key: KAFKA-2252
> URL: https://issues.apache.org/jira/browse/KAFKA-2252
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>
> (using 0.8.2.1)
> We see a large number of "Closing socket connection" logging to the broker 
> logs, e.g.:
> {code}
> 2015-06-04 16:49:30,262  INFO [kafka-network-thread-27330-2] 
> network.Processor - Closing socket connection to /1.2.3.4.
> 2015-06-04 16:49:30,262  INFO [kafka-network-thread-27330-0] 
> network.Processor - Closing socket connection to /5.6.7.8.
> 2015-06-04 16:49:30,695  INFO [kafka-network-thread-27330-0] 
> network.Processor - Closing socket connection to /9.10.11.12.
> 2015-06-04 16:49:31,465  INFO [kafka-network-thread-27330-1] 
> network.Processor - Closing socket connection to /13.14.15.16.
> 2015-06-04 16:49:31,806  INFO [kafka-network-thread-27330-0] 
> network.Processor - Closing socket connection to /17.18.19.20.
> 2015-06-04 16:49:31,842  INFO [kafka-network-thread-27330-2] 
> network.Processor - Closing socket connection to /21.22.23.24.
> {code}
> However, we have no corresponding logging for when these connections are 
> established.  Consequently, it's not very useful to see a flood of closed 
> connections, etc.  I'd think we'd want to see the corresponding 'connection 
> established' messages, also logged as INFO.
> Occasionally, we see a flood of the above messages, and have no idea as to 
> whether it indicates a problem, etc.  (Sometimes it might be due to an 
> ongoing rolling restart, or a change in the Zookeeper cluster).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino reassigned KAFKA-2175:
--

Assignee: Todd Palino  (was: Neha Narkhede)

> Reduce server log verbosity at info level
> -
>
> Key: KAFKA-2175
> URL: https://issues.apache.org/jira/browse/KAFKA-2175
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.8.3
>Reporter: Todd Palino
>Assignee: Todd Palino
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-2175.patch
>
>
> Currently, the broker logs two messages at INFO level that should be at a 
> lower level. This serves only to fill up log files on disk, and can cause 
> performance issues due to synchronous logging as well.
> The first is the "Closing socket connection" message when there is no error. 
> This should be reduced to debug level. The second is the message that ZkUtil 
> writes when updating the partition reassignment JSON. This message contains 
> the entire JSON blob and should never be written at info level. In addition, 
> there is already a message in the controller log stating that the ZK node has 
> been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-2175:
---
Attachment: KAFKA-2175.patch

> Reduce server log verbosity at info level
> -
>
> Key: KAFKA-2175
> URL: https://issues.apache.org/jira/browse/KAFKA-2175
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.8.3
>Reporter: Todd Palino
>Assignee: Neha Narkhede
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-2175.patch
>
>
> Currently, the broker logs two messages at INFO level that should be at a 
> lower level. This serves only to fill up log files on disk, and can cause 
> performance issues due to synchronous logging as well.
> The first is the "Closing socket connection" message when there is no error. 
> This should be reduced to debug level. The second is the message that ZkUtil 
> writes when updating the partition reassignment JSON. This message contains 
> the entire JSON blob and should never be written at info level. In addition, 
> there is already a message in the controller log stating that the ZK node has 
> been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-2175:
---
Status: Patch Available  (was: Open)

> Reduce server log verbosity at info level
> -
>
> Key: KAFKA-2175
> URL: https://issues.apache.org/jira/browse/KAFKA-2175
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.8.3
>Reporter: Todd Palino
>Assignee: Todd Palino
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-2175.patch
>
>
> Currently, the broker logs two messages at INFO level that should be at a 
> lower level. This serves only to fill up log files on disk, and can cause 
> performance issues due to synchronous logging as well.
> The first is the "Closing socket connection" message when there is no error. 
> This should be reduced to debug level. The second is the message that ZkUtil 
> writes when updating the partition reassignment JSON. This message contains 
> the entire JSON blob and should never be written at info level. In addition, 
> there is already a message in the controller log stating that the ZK node has 
> been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-2175:
--

 Summary: Reduce server log verbosity at info level
 Key: KAFKA-2175
 URL: https://issues.apache.org/jira/browse/KAFKA-2175
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.8.3
Reporter: Todd Palino
Assignee: Neha Narkhede
Priority: Minor


Currently, the broker logs two messages at INFO level that should be at a lower 
level. This serves only to fill up log files on disk, and can cause performance 
issues due to synchronous logging as well.

The first is the "Closing socket connection" message when there is no error. 
This should be reduced to debug level. The second is the message that ZkUtil 
writes when updating the partition reassignment JSON. This message contains the 
entire JSON blob and should never be written at info level. In addition, there 
is already a message in the controller log stating that the ZK node has been 
updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1342) Slow controlled shutdowns can result in stale shutdown requests

2015-04-07 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14484278#comment-14484278
 ] 

Todd Palino commented on KAFKA-1342:


Bump

I think we need to revive this. We have a "safe shutdown" bit of wrapper code 
we use which relies on an external resource that we should eliminate. It would 
be better to provide a safe shutdown option within Kafka itself without that 
wrapper (i.e. do not shut down unless your under replicated count is 0). 
However, this is not possible without serialized shutdown at the controller 
level. We can't allow a second broker to shut down until the first broker has 
completed its shutdown process. Then the second broker can check the URP count 
and be allowed to proceed.

> Slow controlled shutdowns can result in stale shutdown requests
> ---
>
> Key: KAFKA-1342
> URL: https://issues.apache.org/jira/browse/KAFKA-1342
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Joel Koshy
>Priority: Blocker
> Fix For: 0.9.0
>
>
> I don't think this is a bug introduced in 0.8.1., but triggered by the fact
> that controlled shutdown seems to have become slower in 0.8.1 (will file a
> separate ticket to investigate that). When doing a rolling bounce, it is
> possible for a bounced broker to stop all its replica fetchers since the
> previous PID's shutdown requests are still being shutdown.
> - 515 is the controller
> - Controlled shutdown initiated for 503
> - Controller starts controlled shutdown for 503
> - The controlled shutdown takes a long time in moving leaders and moving
>   follower replicas on 503 to the offline state.
> - So 503's read from the shutdown channel times out and a new channel is
>   created. It issues another shutdown request.  This request (since it is a
>   new channel) is accepted at the controller's socket server but then waits
>   on the broker shutdown lock held by the previous controlled shutdown which
>   is still in progress.
> - The above step repeats for the remaining retries (six more requests).
> - 503 hits SocketTimeout exception on reading the response of the last
>   shutdown request and proceeds to do an unclean shutdown.
> - The controller's onBrokerFailure call-back fires and moves 503's replicas
>   to offline (not too important in this sequence).
> - 503 is brought back up.
> - The controller's onBrokerStartup call-back fires and moves its replicas
>   (and partitions) to online state. 503 starts its replica fetchers.
> - Unfortunately, the (phantom) shutdown requests are still being handled and
>   the controller sends StopReplica requests to 503.
> - The first shutdown request finally finishes (after 76 minutes in my case!).
> - The remaining shutdown requests also execute and do the same thing (sends
>   StopReplica requests for all partitions to
>   503).
> - The remaining requests complete quickly because they end up not having to
>   touch zookeeper paths - no leaders left on the broker and no need to
>   shrink ISR in zookeeper since it has already been done by the first
>   shutdown request.
> - So in the end-state 503 is up, but effectively idle due to the previous
>   PID's shutdown requests.
> There are some obvious fixes that can be made to controlled shutdown to help
> address the above issue. E.g., we don't really need to move follower
> partitions to Offline. We did that as an "optimization" so the broker falls
> out of ISR sooner - which is helpful when producers set required.acks to -1.
> However it adds a lot of latency to controlled shutdown. Also, (more
> importantly) we should have a mechanism to abort any stale shutdown process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2012) Broker should automatically handle corrupt index files

2015-03-09 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-2012:
--

 Summary: Broker should automatically handle corrupt index files
 Key: KAFKA-2012
 URL: https://issues.apache.org/jira/browse/KAFKA-2012
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Todd Palino


We had a bunch of unclean system shutdowns (power failure), which caused 
corruption on our disks holding log segments in many cases. While the broker is 
handling the log segment corruption properly (truncation), it is having 
problems with corruption in the index files. Additionally, this only seems to 
be happening on some startups (while we are upgrading).

The broker should just do what I do when I hit a corrupt index file - remove it 
and rebuild it.

2015/03/09 17:58:53.873 FATAL [KafkaServerStartable] [main] [kafka-server] [] 
Fatal error during KafkaServerStartable startup. Prepare to shutdown
java.lang.IllegalArgumentException: requirement failed: Corrupt index found, 
index file 
(/export/content/kafka/i001_caches/__consumer_offsets-39/.index)
 has non-zero size but the last offset is -2121629628 and the base offset is 0
at scala.Predef$.require(Predef.scala:233)
at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:185)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:184)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.Log.loadSegments(Log.scala:184)
at kafka.log.Log.(Log.scala:82)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:141)
at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1987) Potential race condition in partition creation

2015-02-26 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-1987:
--

 Summary: Potential race condition in partition creation
 Key: KAFKA-1987
 URL: https://issues.apache.org/jira/browse/KAFKA-1987
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Todd Palino
Assignee: Neha Narkhede


I am finding that there appears to be a race condition when creating 
partitions, with replication factor 2 or higher, between the creation of the 
partition on the leader and the follower. What appears to be happening is that 
the follower is processing the command to create the partition before the 
leader does, and when the follower starts the replica fetcher, it fails with an 
UnknownTopicOrPartitionException.

The situation is that I am creating a large number of partitions on a cluster, 
preparing it for data being mirrored from another cluster. So there are a 
sizeable number of create and alter commands being sent sequentially. 
Eventually, the replica fetchers start up properly. But it seems like the 
controller should issue the command to create the partition to the leader, wait 
for confirmation, and then issue the command to create the partition to the 
followers.

2015/02/26 21:11:50.413 INFO [LogManager] [kafka-request-handler-12] 
[kafka-server] [] Created log for partition [topicA,30] in /path_to/i001_caches 
with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 
6, segment.bytes -> 268435456, flush.ms -> 1, delete.retention.ms -> 
8640, index.interval.bytes -> 4096, retention.bytes -> -1, 
min.insync.replicas -> 1, cleanup.policy -> delete, 
unclean.leader.election.enable -> true, segment.ms -> 4320, 
max.message.bytes -> 100, flush.messages -> 2, 
min.cleanable.dirty.ratio -> 0.5, retention.ms -> 8640, segment.jitter.ms 
-> 0}.
2015/02/26 21:11:50.418 WARN [Partition] [kafka-request-handler-12] 
[kafka-server] [] Partition [topicA,30] on broker 1551: No checkpointed 
highwatermark is found for partition [topicA,30]
2015/02/26 21:11:50.418 INFO [ReplicaFetcherManager] [kafka-request-handler-12] 
[kafka-server] [] [ReplicaFetcherManager on broker 1551] Removed fetcher for 
partitions [topicA,30]
2015/02/26 21:11:50.418 INFO [Log] [kafka-request-handler-12] [kafka-server] [] 
Truncating log topicA-30 to offset 0.
2015/02/26 21:11:50.450 INFO [ReplicaFetcherManager] [kafka-request-handler-12] 
[kafka-server] [] [ReplicaFetcherManager on broker 1551] Added fetcher for 
partitions List([[topicA,30], initOffset 0 to broker 
id:1555,host:host1555.example.com,port:10251] )
2015/02/26 21:11:50.615 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.616 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.618 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.620 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2015/02/26 21:11:50.621 ERROR [ReplicaFetcherThread] 
[ReplicaFetcherThread-0-1555] [kafka-server] [] [ReplicaFetcherThread-0-1555], 
Error for partition [topicA,30] to broker 1555:class 
kafka.common.UnknownTopicOrPartitionException
2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1648) Round robin consumer balance throws an NPE when there are no topics

2014-09-23 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-1648:
--

 Summary: Round robin consumer balance throws an NPE when there are 
no topics
 Key: KAFKA-1648
 URL: https://issues.apache.org/jira/browse/KAFKA-1648
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Todd Palino
Assignee: Neha Narkhede


If you use the roundrobin rebalance method with a wildcard consumer, and there 
are no topics in the cluster, rebalance throws a NullPointerException in the 
consumer and fails. It retries the rebalance, but will continue to throw the 
NPE.

2014/09/23 17:51:16.147 [ZookeeperConsumerConnector] 
[kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Cleared all relevant 
queues for this fetcher
2014/09/23 17:51:16.147 [ZookeeperConsumerConnector] 
[kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Cleared the data chunks 
in all the consumer message iterators
2014/09/23 17:51:16.148 [ZookeeperConsumerConnector] 
[kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Committing all offsets 
after clearing the fetcher queues
2014/09/23 17:51:46.148 [ZookeeperConsumerConnector] 
[kafka-audit_lva1-app0007.corp-1411494404908-4e620544], begin rebalancing 
consumer kafka-audit_lva1-app0007.corp-1411494404908-4e620544 try #0
2014/09/23 17:51:46.148 ERROR [OffspringServletRuntime] [main] 
[kafka-console-audit] [] Boot listener 
com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditBootListener failed
kafka.common.ConsumerRebalanceFailedException: 
kafka-audit_lva1-app0007.corp-1411494404908-4e620544 can't rebalance after 10 
retries
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:630)
at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:897)
at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:931)
at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:159)
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101)
at 
com.linkedin.tracker.consumer.TrackingConsumerImpl.initWildcardIterators(TrackingConsumerImpl.java:88)
at 
com.linkedin.tracker.consumer.TrackingConsumerImpl.getWildcardIterators(TrackingConsumerImpl.java:116)
at 
com.linkedin.kafkaconsoleaudit.KafkaConsoleAudit.createAuditThreads(KafkaConsoleAudit.java:59)
at 
com.linkedin.kafkaconsoleaudit.KafkaConsoleAudit.initializeAudit(KafkaConsoleAudit.java:50)
at 
com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditFactory.createInstance(KafkaConsoleAuditFactory.java:125)
at 
com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditFactory.createInstance(KafkaConsoleAuditFactory.java:20)
at 
com.linkedin.util.factory.SimpleSingletonFactory.createInstance(SimpleSingletonFactory.java:20)
at 
com.linkedin.util.factory.SimpleSingletonFactory.createInstance(SimpleSingletonFactory.java:14)
at com.linkedin.util.factory.Generator.doGetBean(Generator.java:337)
at com.linkedin.util.factory.Generator.getBean(Generator.java:270)
at 
com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditBootListener.onBoot(KafkaConsoleAuditBootListener.java:16)
at 
com.linkedin.offspring.servlet.OffspringServletRuntime.startGenerator(OffspringServletRuntime.java:147)
at 
com.linkedin.offspring.servlet.OffspringServletRuntime.start(OffspringServletRuntime.java:73)
at 
com.linkedin.offspring.servlet.OffspringServletContextListener.contextInitialized(OffspringServletContextListener.java:28)
at 
org.eclipse.jetty.server.handler.ContextHandler.callContextInitialized(ContextHandler.java:771)
at 
org.eclipse.jetty.servlet.ServletContextHandler.callContextInitialized(ServletContextHandler.java:424)
at 
org.eclipse.jetty.server.handler.ContextHandler.startContext(ContextHandler.java:763)
at 
org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:249)
at 
org.eclipse.jetty.webapp.WebAppContext.startContext(WebAppContext.java:1250)
at 
org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:706)
at 
org.eclipse.jetty.webapp.WebAppContext.doStart(WebAppContext.java:492)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at 
com.linkedin.emweb.ContextBasedHandlerImpl.doStart(ContextBasedHandlerImpl.java:105)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at 
com.linkedin.emweb.WebappDeployerImpl.start(WebappDeployerImpl.java:333)
at 
com.linkedin.emweb.WebappDeployerImpl.deploy(WebappDeployerImpl.java:187)
at 
c

[jira] [Resolved] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2014-08-28 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino resolved KAFKA-1588.


Resolution: Won't Fix

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2014-08-28 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114814#comment-14114814
 ] 

Todd Palino commented on KAFKA-1588:


I believe that since this involves significant changes to the protocol, and we 
have a reasonable workaround, we can close it.

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2014-08-21 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105601#comment-14105601
 ] 

Todd Palino commented on KAFKA-1588:


Thanks for digging into that [~sriharsha]. I had not dug down into the 
internals of kafka-python on this issue that far, but I know when I reviewed 
the responses there were multiples. It's possible that when decoding the 
response, kafka-python is duplicating the returned entries to assure the number 
of responses matches the number of requests.

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1605) Producer should not require metadata.broker.list

2014-08-20 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14104581#comment-14104581
 ] 

Todd Palino commented on KAFKA-1605:


There are a lot of concerns with this...

1) We are moving consumers away from talking to Zookeeper (with the new 
consumer implementation that puts it all in the broker). Given that, we should 
probably not move another component (producer) towards Zookeeper

2) Building on #1, if we want to restrict access to Zookeeper (to protect the 
Kafka metadata), we cannot have clients, whether consumers or producers, 
relying on it.

3) This requires adding a Zookeeper client to the producer. I don't see a 
benefit in doing this, especially considering #1 above. You can put a DNS round 
robin or a VIP in the metadata broker list.

4) Building on #3, if at some point in the future there is a desire to use an 
alternate metadata store, it will be much simpler to do if only one component 
needs to be changed.

All told, I just don't see any benefit in this proposal.


> Producer should not require metadata.broker.list
> 
>
> Key: KAFKA-1605
> URL: https://issues.apache.org/jira/browse/KAFKA-1605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Prashant Deva
>
> Producer should just query the zookeeper and get the list of brokers from it 
> directly.
> With the current requirement the producer explicitly needs to know the ips of 
> all the brokers. So if you decide to replace them, you have to change the 
> Producer's code. 
> Isnt the whole point of using a zookeeper to avoid these kind of dependencies?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1605) Producer should not require metadata.broker.list

2014-08-20 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14104110#comment-14104110
 ] 

Todd Palino commented on KAFKA-1605:


Upcoming work for the consumer is going to move clients away from connecting to 
Zookeeper, and I think this is a very good thing. I do not think we should be 
moving producers towards Zookeeper. We've run into issues where it would be 
very nice to be able to restrict access to Zookeeper, and the only effective 
way to do that is if the brokers themselves are the only ones talking to 
Zookeeper (no clients).

> Producer should not require metadata.broker.list
> 
>
> Key: KAFKA-1605
> URL: https://issues.apache.org/jira/browse/KAFKA-1605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Prashant Deva
>
> Producer should just query the zookeeper and get the list of brokers from it 
> directly.
> With the current requirement the producer explicitly needs to know the ips of 
> all the brokers. So if you decide to replace them, you have to change the 
> Producer's code. 
> Isnt the whole point of using a zookeeper to avoid these kind of dependencies?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1599) Change preferred replica election admin command to handle large clusters

2014-08-15 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-1599:
--

 Summary: Change preferred replica election admin command to handle 
large clusters
 Key: KAFKA-1599
 URL: https://issues.apache.org/jira/browse/KAFKA-1599
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2
Reporter: Todd Palino


We ran into a problem with a cluster that has 70k partitions where we could not 
trigger a preferred replica election for all topics and partitions using the 
admin tool. Upon investigation, it was determined that this was because the 
JSON object that was being written to the admin znode to tell the controller to 
start the election was 1.8 MB in size. As the default Zookeeper data size limit 
is 1MB, and it is non-trivial to change, we should come up with a better way to 
represent the list of topics and partitions for this admin command.

I have several thoughts on this so far:
1) Trigger the command for all topics and partitions with a JSON object that 
does not include an explicit list of them (i.e. a flag that says "all 
partitions")

2) Use a more compact JSON representation. Currently, the JSON contains a 
'partitions' key which holds a list of dictionaries that each have a 'topic' 
and 'partition' key, and there must be one list item for each partition. This 
results in a lot of repetition of key names that is unneeded. Changing this to 
a format like this would be much more compact:
{'topics': {'topicName1': [0, 1, 2, 3], 'topicName2': [0,1]}, 'version': 1}

3) Use a representation other than JSON. Strings are inefficient. A binary 
format would be the most compact. This does put a greater burden on tools and 
scripts that do not use the inbuilt libraries, but it is not too high.

4) Use a representation that involves multiple znodes. A structured tree in the 
admin command would probably provide the most complete solution. However, we 
would need to make sure to not exceed the data size limit with a wide tree (the 
list of children for any single znode cannot exceed the ZK data size of 1MB)

Obviously, there could be a combination of #1 with a change in the 
representation, which would likely be appropriate as well.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2014-08-13 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14096083#comment-14096083
 ] 

Todd Palino commented on KAFKA-1588:


When working with this, I am sending my requests from a Python application that 
uses the kafka-python library. So I am not using the Java library for creating 
and sending the request across the wire. I was definitely able to put the same 
topic and partition in a single request, it is just the response that I 
received was two OffsetResponses with the same information in both (same 
offset).

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Todd Palino
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2014-08-11 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-1588:
---

Description: 
When performing an OffsetRequest, if you request the same topic and partition 
combination in a single request more than once (for example, if you want to get 
both the head and tail offsets for a partition in the same request), you will 
get a response for both, but they will be the same offset.

We identified that the problem is that when the offset response is assembled, a 
map is used to store the offset info before it is converted to the response 
format and sent to the client. Therefore, the second request for a 
topic/partition combination will overwrite the offset from the first request.

  was:
When performing an OffsetFetchRequest, if you request the same topic and 
partition combination in a single request more than once (for example, if you 
want to get both the head and tail offsets for a partition in the same 
request), you will get a response for both, but they will be the same offset.

We identified that the problem is that when the offset response is assembled, a 
map is used to store the offset info before it is converted to the response 
format and sent to the client. Therefore, the second request for a 
topic/partition combination will overwrite the offset from the first request.


Thanks, Jun, I had written that incorrectly. It is the OffsetRequest that 
manifests the problem. I've edited the description.

> Offset response does not support two requests for the same topic/partition 
> combo
> 
>
> Key: KAFKA-1588
> URL: https://issues.apache.org/jira/browse/KAFKA-1588
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Todd Palino
>
> When performing an OffsetRequest, if you request the same topic and partition 
> combination in a single request more than once (for example, if you want to 
> get both the head and tail offsets for a partition in the same request), you 
> will get a response for both, but they will be the same offset.
> We identified that the problem is that when the offset response is assembled, 
> a map is used to store the offset info before it is converted to the response 
> format and sent to the client. Therefore, the second request for a 
> topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1588) Offset response does not support two requests for the same topic/partition combo

2014-08-10 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-1588:
--

 Summary: Offset response does not support two requests for the 
same topic/partition combo
 Key: KAFKA-1588
 URL: https://issues.apache.org/jira/browse/KAFKA-1588
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Todd Palino


When performing an OffsetFetchRequest, if you request the same topic and 
partition combination in a single request more than once (for example, if you 
want to get both the head and tail offsets for a partition in the same 
request), you will get a response for both, but they will be the same offset.

We identified that the problem is that when the offset response is assembled, a 
map is used to store the offset info before it is converted to the response 
format and sent to the client. Therefore, the second request for a 
topic/partition combination will overwrite the offset from the first request.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1581) Log cleaner should have an option to ignore messages without keys

2014-08-08 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14091622#comment-14091622
 ] 

Todd Palino commented on KAFKA-1581:


I suggest that you should just skip any invalid message (compressed or without 
a key) and make sure to throw a warning in the log when it is encountered.

> Log cleaner should have an option to ignore messages without keys
> -
>
> Key: KAFKA-1581
> URL: https://issues.apache.org/jira/browse/KAFKA-1581
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>
> Right now, there is a strict requirement that compacted topics contain only 
> messages with keys. This makes sense, but the issue with a hard requirement 
> is that if it fails the cleaner quits. We should probably allow ignoring 
> these messages (with a warning). Alternatively, we can catch this scenario 
> (instead of the hard requirement) and just skip compaction for that partition.
> This came up because I saw an invalid message (compressed and without a key) 
> in the offsets topic which broke both log compaction and the offset load 
> process. I filed KAFKA-1580 to prevent that from happening in the first place 
> but KAFKA-1580 is only for internal topics. In the general case (compacted 
> non-internal topics) we would not want the cleaners to exit permanently due 
> to an invalid (key-less) message in one of the partitions since that would 
> prevent compaction for other partitions as well.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-07-18 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14066488#comment-14066488
 ] 

Todd Palino commented on KAFKA-1476:


I tend to agree, ~jkreps. We currently have kafka-topics.sh that groups a 
number of topic-centric functions in a single command, and I think that's a 
good model to continue with. On the consumer side, I can think of the following 
things we should be able to do:

List groups
Describe group (should include the hosts in the group)
List topics by group (for a given group, what topics does it consume)
Describe topic by group (for a given group and topic, list partitions, host 
owning the partition, and the committed offset)
Set offsets (for a given group and topic, explicitly set the offsets. Should 
allow setting to smallest, largest, and custom, which is explicitly setting the 
value for each partition)
List groups by topic (given a topic name, what groups consume it)

All of these functions have to work with both Zookeeper and group management 
within the brokers (once implemented). I want to know which one the data comes 
from as part of the results, but I don't want to have to specify it in advance.

As far as the offset checker goes, I could go either way on that. Ultimately, 
it combines information from two different areas (consumer and broker), which 
means it doesn't fit cleanly in either one even if we all agree that it's 
really a consumer function. I think it's fine if it stays where it is for now.


> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1476.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1440) Per-request tracing

2014-05-15 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-1440:
--

 Summary: Per-request tracing
 Key: KAFKA-1440
 URL: https://issues.apache.org/jira/browse/KAFKA-1440
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Todd Palino


Could we have a flag in requests (potentially in the header for all requests, 
but at least for produce and fetch) that would enable tracing for that request. 
Currently, if we want to debug an issue with a request, we need to turn on 
trace level logging for the entire broker. If the client could ask for the 
request to be traced, we could then log detailed information for just that 
request.

Ideally, this would be available as a flag that can be enabled in the client 
via JMX, so it could be done without needing to restart the client application.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1276) Provide a list of config overrides available when running kafka.topics

2014-02-20 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-1276:
--

 Summary: Provide a list of config overrides available when running 
kafka.topics
 Key: KAFKA-1276
 URL: https://issues.apache.org/jira/browse/KAFKA-1276
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Todd Palino


It would be helpful to have the help for kafka-topics enumerate a list of the 
per-topic configuration overrides that are available with the --config option.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Commented] (KAFKA-1182) Topic not created if number of live brokers less than # replicas

2014-02-17 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13903790#comment-13903790
 ] 

Todd Palino commented on KAFKA-1182:


I really don't think it needs to be that complicated, Jun. whether the topic is 
created under replicated because there aren't enough brokers right now or there 
never have been, it's the same situation. It has an operational solution (add a 
broker). I'm not sure I see the value in tracking brokers that used to be 
around, and even if I only ever had 3 brokers in the cluster and someone 
specified a RF of 4, I still want the topic created.

-Todd



> Topic not created if number of live brokers less than # replicas
> 
>
> Key: KAFKA-1182
> URL: https://issues.apache.org/jira/browse/KAFKA-1182
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.0
> Environment: Centos 6.3
>Reporter: Hanish Bansal
>Assignee: Jun Rao
>
> We are having kafka cluster of 2 nodes. (Using Kafka 0.8.0 version)
> Replication Factor: 2
> Number of partitions: 2
> Actual Behaviour:
> Out of two nodes, if any one node goes down then topic is not created in 
> kafka. 
> Steps to Reproduce:
> 1. Create a 2 node kafka cluster with replication factor 2
> 2. Start the Kafka cluster
> 3. Kill any one node
> 4.  Start the producer to write on a new topic
> 5. Observe the exception stated below:
> 2013-12-12 19:37:19 0 [WARN ] ClientUtils$ - Fetching topic metadata with
> correlation id 3 for topics [Set(test-topic)] from broker
> [id:0,host:122.98.12.11,port:9092] failed
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> Expected Behaviour: 
> In case of live brokers less than # replicas:
> There should be topic created so at least live brokers can receive the data.
> They can replicate data to other broker once any down broker comes up.
> Because now in case of live brokers less than # replicas, there is complete
> loss of data.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Commented] (KAFKA-1182) Topic not created if number of live brokers less than # replicas

2014-02-07 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13894781#comment-13894781
 ] 

Todd Palino commented on KAFKA-1182:


I'll have to side with Clark on this one. Given a choice between creating the 
topic in a degraded state (an undesirable, but tolerable problem) and losing 
data that is getting sent into a topic that does not yet exist (a serious 
problem, and one that is quite possibly not recoverable depending on the data), 
I'll take the degraded state.

> Topic not created if number of live brokers less than # replicas
> 
>
> Key: KAFKA-1182
> URL: https://issues.apache.org/jira/browse/KAFKA-1182
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.0
> Environment: Centos 6.3
>Reporter: Hanish Bansal
>Assignee: Jun Rao
>
> We are having kafka cluster of 2 nodes. (Using Kafka 0.8.0 version)
> Replication Factor: 2
> Number of partitions: 2
> Actual Behaviour:
> Out of two nodes, if any one node goes down then topic is not created in 
> kafka. 
> Steps to Reproduce:
> 1. Create a 2 node kafka cluster with replication factor 2
> 2. Start the Kafka cluster
> 3. Kill any one node
> 4.  Start the producer to write on a new topic
> 5. Observe the exception stated below:
> 2013-12-12 19:37:19 0 [WARN ] ClientUtils$ - Fetching topic metadata with
> correlation id 3 for topics [Set(test-topic)] from broker
> [id:0,host:122.98.12.11,port:9092] failed
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> Expected Behaviour: 
> In case of live brokers less than # replicas:
> There should be topic created so at least live brokers can receive the data.
> They can replicate data to other broker once any down broker comes up.
> Because now in case of live brokers less than # replicas, there is complete
> loss of data.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)


[jira] [Commented] (KAFKA-1125) Add options to let admin tools block until finish

2014-02-03 Thread Todd Palino (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13889898#comment-13889898
 ] 

Todd Palino commented on KAFKA-1125:


I believe an option for each of these tools to make them block would be very 
useful. As noted, in scripts where I want to perform tasks in sequence (such as 
moving partitions one topic at a time so as not to overload the cluster), I 
need to check for completion of each command by using the reassignment JSON 
file with --verify and parsing the response.

On the separate issue of creating a queue of admin tasks, this is probably a 
good idea as, for the most part, we do not want to perform two actions at the 
same time. However, I would request that if we have that queue of tasks, tools 
to view and modify the queue (removing tasks or potentially reordering them) 
would be helpful.

> Add options to let admin tools block until finish
> -
>
> Key: KAFKA-1125
> URL: https://issues.apache.org/jira/browse/KAFKA-1125
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>
> Topic config change as well as create-topic, add-partition, 
> partition-reassignment and preferred leader election are all asynchronous in 
> the sense that the admin command would return immediately and one has to 
> check himself if the process has finished. It is better to add an option to 
> make these commands blocking until the process is done.
> Also, it would be good to order admin tasks in order so that they can be 
> executed sequentially in logic.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)