[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2012-12-19 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13536705#comment-13536705
 ] 

Joel Koshy commented on KAFKA-598:
--

33.2: not really, because that would violate the new config's semantics - i.e., 
each thread shouldn't exceed it's allocated amount of memory.
That said, I just realized that this implementation has couple flaws and may 
need to be refactored or have its scope reduced.
The max mem config is not always respected in this implementation. When we do 
the serial fetches the queue will have larger chunks
(than the fair partition fetch size). However, the function that computes the 
fair partition fetch size assumes that the blocking queue only
has chunks of the fair fetch size. I think we can take care of this, but will 
think about it a bit more. Another problem is that the aggregate fetch
size is the fair size * num partitions assigned to the thread. So for example 
partition assignment happens to be very skewed and a thread
happens to have only one partition; a serial refetch will be pointless since it 
can't use a larger fetch size.



> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
>Priority: Blocker
> Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, 
> KAFKA-598-v3.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied

2012-12-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-668:
-

Attachment: KAFKA-668-v1.patch

This is a pretty straightforward change. It's slightly hacky in that I'm 
appending the :jmxport to the zk string, and it is effectively ignored in the 
Broker class. I preferred this over adding a jmxPort field to the Broker class 
as that would be cause wide-spread edits.

> Controlled shutdown admin tool should not require controller JMX url/port to 
> be supplied
> 
>
> Key: KAFKA-668
> URL: https://issues.apache.org/jira/browse/KAFKA-668
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8, 0.8.1
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-668-v1.patch
>
>
> The controlled shutdown admin command takes a zookeeper string and also 
> requires the user to supply the controller's jmx url/port. This is a bit 
> annoying since the purpose of the zookeeper string is to discover the 
> controller. The tool should require exactly one of these options. If 
> zookeeper is supplied then discover the controller and its jmx port (which 
> means we will need to add the jmx port information to zk).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied

2013-01-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-668:
-

Attachment: KAFKA-668-v2.patch

Needed a rebase.

> Controlled shutdown admin tool should not require controller JMX url/port to 
> be supplied
> 
>
> Key: KAFKA-668
> URL: https://issues.apache.org/jira/browse/KAFKA-668
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8, 0.8.1
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-668-v1.patch, KAFKA-668-v2.patch
>
>
> The controlled shutdown admin command takes a zookeeper string and also 
> requires the user to supply the controller's jmx url/port. This is a bit 
> annoying since the purpose of the zookeeper string is to discover the 
> controller. The tool should require exactly one of these options. If 
> zookeeper is supplied then discover the controller and its jmx port (which 
> means we will need to add the jmx port information to zk).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-657) Add an API to commit offsets

2013-01-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13542390#comment-13542390
 ] 

Joel Koshy commented on KAFKA-657:
--

Hey David, the patch (and the wiki) looks great.

- For error handling. I think what Jun was referring to is the giant catch 
clause in handle - i.e., the new keys
  should be added as a case. That junk block of code really needs to be cleaned 
up :)
- KafkaApis: if(offsetStr == null) : I don't think this can happen.
- Default client id should probably be "" in all the request/responses i.e., to 
follow convention.
- It would be better to use 1.toShort or val CurrentVersion: Short = 1 (instead 
of 1.shortValue); although it's more
  or less a non-issue as it's in the object.


> Add an API to commit offsets
> 
>
> Key: KAFKA-657
> URL: https://issues.apache.org/jira/browse/KAFKA-657
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>  Labels: project
> Attachments: KAFKA-657v1.patch, KAFKA-657v2.patch, KAFKA-657v3.patch, 
> KAFKA-657v4.patch, KAFKA-657v5.patch, KAFKA-657v6.patch
>
>
> Currently the consumer directly writes their offsets to zookeeper. Two 
> problems with this: (1) This is a poor use of zookeeper, and we need to 
> replace it with a more scalable offset store, and (2) it makes it hard to 
> carry over to clients in other languages. A first step towards accomplishing 
> that is to add a proper Kafka API for committing offsets. The initial version 
> of this would just write to zookeeper as we do today, but in the future we 
> would then have the option of changing this.
> This api likely needs to take a sequence of 
> consumer-group/topic/partition/offset entries and commit them all.
> It would be good to do a wiki design on how this would work and consensus on 
> that first.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied

2013-01-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-668.
--

Resolution: Fixed

Thanks for the review. Committed to 0.8.

> Controlled shutdown admin tool should not require controller JMX url/port to 
> be supplied
> 
>
> Key: KAFKA-668
> URL: https://issues.apache.org/jira/browse/KAFKA-668
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8, 0.8.1
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-668-v1.patch, KAFKA-668-v2.patch
>
>
> The controlled shutdown admin command takes a zookeeper string and also 
> requires the user to supply the controller's jmx url/port. This is a bit 
> annoying since the purpose of the zookeeper string is to discover the 
> controller. The tool should require exactly one of these options. If 
> zookeeper is supplied then discover the controller and its jmx port (which 
> means we will need to add the jmx port information to zk).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-668) Controlled shutdown admin tool should not require controller JMX url/port to be supplied

2013-01-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-668.



> Controlled shutdown admin tool should not require controller JMX url/port to 
> be supplied
> 
>
> Key: KAFKA-668
> URL: https://issues.apache.org/jira/browse/KAFKA-668
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8, 0.8.1
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-668-v1.patch, KAFKA-668-v2.patch
>
>
> The controlled shutdown admin command takes a zookeeper string and also 
> requires the user to supply the controller's jmx url/port. This is a bit 
> annoying since the purpose of the zookeeper string is to discover the 
> controller. The tool should require exactly one of these options. If 
> zookeeper is supplied then discover the controller and its jmx port (which 
> means we will need to add the jmx port information to zk).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-681) Unclean shutdown testing - truncateAndStartWithNewOffset is not invoked when it is expected to

2013-01-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13543601#comment-13543601
 ] 

Joel Koshy commented on KAFKA-681:
--

+1

> Unclean shutdown testing - truncateAndStartWithNewOffset is not invoked when 
> it is expected to
> --
>
> Key: KAFKA-681
> URL: https://issues.apache.org/jira/browse/KAFKA-681
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: John Fung
>Priority: Blocker
>  Labels: bugs
> Attachments: kafka-681-reproduce-issue.patch, kafka-681_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space

2013-01-04 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13544119#comment-13544119
 ] 

Joel Koshy commented on KAFKA-682:
--

You might need to increase your heap size. What do you have it set to right 
now? Would you be able to run the broker with -XX:+HeapDumpOnOutOfMemoryError 
to get a heap-dump?

In case you are overriding defaults - what's the replication factor for the 
topic, num-required-acks for the producer requests, and producer request 
timeout? Are any requests going through or are the produce requests expiring?


> java.lang.OutOfMemoryError: Java heap space
> ---
>
> Key: KAFKA-682
> URL: https://issues.apache.org/jira/browse/KAFKA-682
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
> Environment: $ uname -a
> Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC 
> 2012 i686 i686 i686 GNU/Linux
> $ java -version
> java version "1.7.0_09"
> OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1)
> OpenJDK Server VM (build 23.2-b09, mixed mode)
>Reporter: Ricky Ng-Adam
>
> git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856)
> ...build...
> ./sbt update
> ./sbt package
> ...run...
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> ...then configured fluentd with kafka plugin...
> gem install fluentd --no-ri --no-rdoc
> gem install fluent-plugin-kafka
> fluentd -c ./fluent/fluent.conf -vv
> ...then flood fluentd with messages inputted from syslog and outputted to 
> kafka.
> results in (after about 1 messages of 1K each in 3s):
> [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of 
> error (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space
> at 
> kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45)
> at 
> kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
> at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.Range.map(Range.scala:39)
> at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42)
> at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
> at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
> at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227)
> at scala.collection.immutable.Range.flatMap(Range.scala:39)
> at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38)
> at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
> at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
> at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
> at kafka.network.Processor.read(SocketServer.scala:298)
> at kafka.network.Processor.run(SocketServer.scala:209)
> at java.lang.Thread.run(Thread.java:722)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space

2013-01-04 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13544503#comment-13544503
 ] 

Joel Koshy commented on KAFKA-682:
--

I think that fix was merged into trunk (before 32da) so it should be there in 
trunk as well.

> java.lang.OutOfMemoryError: Java heap space
> ---
>
> Key: KAFKA-682
> URL: https://issues.apache.org/jira/browse/KAFKA-682
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
> Environment: $ uname -a
> Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC 
> 2012 i686 i686 i686 GNU/Linux
> $ java -version
> java version "1.7.0_09"
> OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1)
> OpenJDK Server VM (build 23.2-b09, mixed mode)
>Reporter: Ricky Ng-Adam
>
> git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856)
> ...build...
> ./sbt update
> ./sbt package
> ...run...
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> ...then configured fluentd with kafka plugin...
> gem install fluentd --no-ri --no-rdoc
> gem install fluent-plugin-kafka
> fluentd -c ./fluent/fluent.conf -vv
> ...then flood fluentd with messages inputted from syslog and outputted to 
> kafka.
> results in (after about 1 messages of 1K each in 3s):
> [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of 
> error (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space
> at 
> kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45)
> at 
> kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
> at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.Range.map(Range.scala:39)
> at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42)
> at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
> at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
> at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227)
> at scala.collection.immutable.Range.flatMap(Range.scala:39)
> at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38)
> at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
> at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
> at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
> at kafka.network.Processor.read(SocketServer.scala:298)
> at kafka.network.Processor.run(SocketServer.scala:209)
> at java.lang.Thread.run(Thread.java:722)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space

2013-01-08 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13547129#comment-13547129
 ] 

Joel Koshy commented on KAFKA-682:
--

That's why I asked  for the configured "num-required-acks for the producer 
requests". If it is the default (0) then it shouldn't be added to the request 
purgatory which rule out KAFKA-671 no?

> java.lang.OutOfMemoryError: Java heap space
> ---
>
> Key: KAFKA-682
> URL: https://issues.apache.org/jira/browse/KAFKA-682
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
> Environment: $ uname -a
> Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC 
> 2012 i686 i686 i686 GNU/Linux
> $ java -version
> java version "1.7.0_09"
> OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1)
> OpenJDK Server VM (build 23.2-b09, mixed mode)
>Reporter: Ricky Ng-Adam
> Attachments: java_pid22281.hprof.gz, java_pid22281_Leak_Suspects.zip
>
>
> git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856)
> ...build...
> ./sbt update
> ./sbt package
> ...run...
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> ...then configured fluentd with kafka plugin...
> gem install fluentd --no-ri --no-rdoc
> gem install fluent-plugin-kafka
> fluentd -c ./fluent/fluent.conf -vv
> ...then flood fluentd with messages inputted from syslog and outputted to 
> kafka.
> results in (after about 1 messages of 1K each in 3s):
> [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of 
> error (kafka.network.Processor)
> java.lang.OutOfMemoryError: Java heap space
> at 
> kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45)
> at 
> kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
> at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.Range.map(Range.scala:39)
> at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42)
> at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
> at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
> at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227)
> at scala.collection.immutable.Range.flatMap(Range.scala:39)
> at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38)
> at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
> at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
> at kafka.network.RequestChannel$Request.(RequestChannel.scala:47)
> at kafka.network.Processor.read(SocketServer.scala:298)
> at kafka.network.Processor.run(SocketServer.scala:209)
> at java.lang.Thread.run(Thread.java:722)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2013-01-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13551383#comment-13551383
 ] 

Joel Koshy commented on KAFKA-598:
--

The full scope should probably move out of 0.8 - i.e., as described above 
bounding the consumers memory is
basically a packing problem without knowledge of the message-size on the 
broker. One possibility is for the broker
to somehow communicate the size of the large message back to the client, but 
that would break our zero-copy
property wrt fetches.

So I would suggest we don't do the full patch (i.e., bounding consumer memory 
&& handling large messages).
Instead we can go with the simpler implementation that requires a new config 
(which is not ideal, but better IMO than
trying to half-implement the above packing problem.).

I haven't had time to look at this lately, but if people are okay with the 
above, then I can revisit one of the
earlier revisions of the patches.


> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
>Priority: Blocker
> Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, 
> KAFKA-598-v3.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-702) Livelock between request handler/processor threads

2013-01-14 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-702:


 Summary: Livelock between request handler/processor threads
 Key: KAFKA-702
 URL: https://issues.apache.org/jira/browse/KAFKA-702
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
Priority: Blocker
 Fix For: 0.8


We have seen this a couple of times in the past few days in a test cluster.

The processor thread enqueues requests into the request queue and dequeues
responses from the response queue. The reverse is true for the request handler
thread. This leads to the following livelock situation (all the 
processor/request
handler threads show this in the thread-dump):

"kafka-processor-10251-7" prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting on 
condition [0x7f46f698e000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x7f48c9dd2698> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at 
java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
at kafka.network.Processor.read(SocketServer.scala:321)
at kafka.network.Processor.run(SocketServer.scala:231)
at java.lang.Thread.run(Thread.java:619)


"kafka-request-handler-7" daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 
waiting on condition [0x7f46f5b8]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x7f48c9dd6348> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at 
java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
at java.lang.Thread.run(Thread.java:619)



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13555352#comment-13555352
 ] 

Joel Koshy commented on KAFKA-705:
--

I set up a local cluster of three brokers and created a bunch of topics, 
replication factor = 2. I was able to do multiple iterations of rolling bounces 
without
issue. Since this was local, I did not use your py script as it kills pid's 
returned by ps.

Would you by any chance be able to provide a scenario to reproduce this 
locally? That said, I believe John Fung also tried to reproduce this in a
distributed environment but was unable to do so; so I'll probably need to take 
a look at logs in your environment.


> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: shutdown_brokers_eat.py, shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13557481#comment-13557481
 ] 

Joel Koshy commented on KAFKA-705:
--

I think this is why it happens:

https://github.com/apache/kafka/blob/03eb903ce223ab55c5acbcf4243ce805aaaf4fad/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L150

It could occur as follows. Suppose there's a partition 'P' assigned to brokers 
x and y; leaderAndIsr = y, {x, y}
1. Controlled shutdown of broker x; leaderAndIsr -> y, {y}
2. After above completes, kill -15 and then restart broker x
3. Immediately do a controlled shutdown of broker y; so now y is in the list of 
shutting down brokers.

Due to the above, x will not start its follower to 'P' on broker y.

Adding sufficient wait time between (2) and (3) seems to address the issue (in 
your script there's no sleep), but we should handle it properly in the shutdown 
code.
Will think about a fix for that.


> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: shutdown_brokers_eat.py, shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-712) Controlled shutdown tool should provide a meaningful message if a controller failover occurs during the operation

2013-01-18 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-712:


 Summary: Controlled shutdown tool should provide a meaningful 
message if a controller failover occurs during the operation
 Key: KAFKA-712
 URL: https://issues.apache.org/jira/browse/KAFKA-712
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Joel Koshy
Priority: Minor
 Fix For: 0.8.1


If the controller fails over before a jmx connection can be established, the 
tool shows the following
exception:
javax.management.InstanceNotFoundException: 
kafka.controller:type=KafkaController,name=ControllerOps
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1094)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getClassLoaderFor(DefaultMBeanServerInterceptor.java:1438)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.getClassLoaderFor(JmxMBeanServer.java:1276)
at 
javax.management.remote.rmi.RMIConnectionImpl$5.run(RMIConnectionImpl.java:1326)
at java.security.AccessController.doPrivileged(Native Method)
at 
javax.management.remote.rmi.RMIConnectionImpl.getClassLoaderFor(RMIConnectionImpl.java:1323)
at 
javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:771)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305)
at sun.rmi.transport.Transport$1.run(Transport.java:159)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
at 
sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
at 
sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
at 
sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
at 
sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:255)
at 
sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:233)
at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:142)
at com.sun.jmx.remote.internal.PRef.invoke(Unknown Source)
at javax.management.remote.rmi.RMIConnectionImpl_Stub.invoke(Unknown 
Source)
at 
javax.management.remote.rmi.RMIConnector$RemoteMBeanServerConnection.invoke(RMIConnector.java:993)
at 
kafka.admin.ShutdownBroker$.kafka$admin$ShutdownBroker$$invokeShutdown(ShutdownBroker.scala:50)
at kafka.admin.ShutdownBroker$.main(ShutdownBroker.scala:105)
at kafka.admin.ShutdownBroker.main(ShutdownBroker.scala)

Using the retry option on the tool would work, but we should provide a more
meaningful message.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-18 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-705:
-

Attachment: kafka-705-v1.patch

Here's a simple fix.

I don't really see any good reason why we shouldn't allow starting
a fetcher to a broker that is shutting down but not completely
shut down yet if a leader still exists on that broker.


> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: kafka-705-v1.patch, shutdown_brokers_eat.py, 
> shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13559121#comment-13559121
 ] 

Joel Koshy commented on KAFKA-705:
--

I committed the fix to 0.8 with a small edit: used the 
liveOrShuttingDownBrokers field.

Another small issue is that we send a stop replica fetchers to the shutting 
down broker even if
controlled shutdown did not complete. This "prematurely" forces the broker out 
of the ISR of those
partitions. I think it should be safe to avoid sending the stop replica request 
if controlled shutdown
has not completely moved leadership of partitions off the shutting down broker.


> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: kafka-705-v1.patch, shutdown_brokers_eat.py, 
> shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-705:
-

Attachment: kafka-705-incremental-v2.patch

Here is what I meant in my last comment.

> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
> shutdown_brokers_eat.py, shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-22 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13559968#comment-13559968
 ] 

Joel Koshy commented on KAFKA-705:
--

Thanks for reviewing. I checked-in the incremental patch as well. Will leave 
this jira open for now until it can be verified.

> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
> shutdown_brokers_eat.py, shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-817) Implement a zookeeper path-based controlled shutdown tool

2013-03-19 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-817:


 Summary: Implement a zookeeper path-based controlled shutdown tool
 Key: KAFKA-817
 URL: https://issues.apache.org/jira/browse/KAFKA-817
 Project: Kafka
  Issue Type: Bug
  Components: controller, tools
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Neha Narkhede


The controlled shutdown tool currently depends on jmxremote.port being exposed. 
Apparently, this is often not exposed in production environments and makes the 
script unusable. We can move to a zk-based approach in which the controller 
watches a path that lists shutting down brokers. This will also make it 
consistent with the pattern used in some of the other replication-related tools.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-827) improve list topic output format

2013-03-26 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13614434#comment-13614434
 ] 

Joel Koshy commented on KAFKA-827:
--

While you are touching this, would it be reasonable to also switch from using 
the AdminUtil to a blank TopicMetadataRequest? It runs a lot quicker if there 
are a large number of topics and you run the tool from outside the ZK cluster's 
DC. Also, the topicOpt description has been misleading for a while.

> improve list topic output format
> 
>
> Key: KAFKA-827
> URL: https://issues.apache.org/jira/browse/KAFKA-827
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Blocker
> Attachments: kafka-827.patch
>
>
> We need to make the output of list topic command more readable.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-826) Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x

2013-03-28 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13616783#comment-13616783
 ] 

Joel Koshy commented on KAFKA-826:
--

Thank you for looking into this. Metrics 2.x had a few minor issues with the 
CsvReporter (which we use in the system tests) and this is why we
used 3.x.

The fixes that I'm aware of are:
- https://github.com/codahale/metrics/pull/225
- https://github.com/codahale/metrics/pull/290
- If a CSV file already exists, metrics throws an IOException and does not 
resume CSV reporting. This would be the case on a broker bounce for example. 
Someone put out a patch for this 
(https://github.com/adagios/metrics/compare/2.x-maintenance...2.x-epoch-in-csv) 
but I'd have to check if that was pulled into metrics-3.x

Unfortunately, although the above are small fixes, if we want to use the 
official 2.x metrics release we would need to copy over
the code of the metrics CsvReporter (i.e., into a new implementation of 
metrics' AbstractReporter), patch in those fixes and plug
that into KafkaMetricsCsvReporter. I don't think it is difficult, but a bit 
clunky (which is why at the time we preferred using 3.x).


> Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x
> -
>
> Key: KAFKA-826
> URL: https://issues.apache.org/jira/browse/KAFKA-826
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Jun Rao
>Priority: Blocker
>  Labels: build, kafka-0.8, metrics
>
> In order to mavenize Kafka 0.8, we have to depend on metrics 2.2.0 since 
> metrics 3.x is a huge change as well as not an officially supported release.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-872) Socket server does not set send/recv buffer sizes

2013-04-23 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-872:


 Summary: Socket server does not set send/recv buffer sizes
 Key: KAFKA-872
 URL: https://issues.apache.org/jira/browse/KAFKA-872
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.8


The socket server should set its send and receive socket buffer sizes - this is 
important in cross-DC mirroring setups where large buffer sizes are essential 
to enable the mirror-maker processes to do bulk consumption. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-872) Socket server does not set send/recv buffer sizes

2013-04-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-872:
-

Attachment: KAFKA-872-v1.patch

> Socket server does not set send/recv buffer sizes
> -
>
> Key: KAFKA-872
> URL: https://issues.apache.org/jira/browse/KAFKA-872
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Joel Koshy
>Assignee: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-872-v1.patch
>
>
> The socket server should set its send and receive socket buffer sizes - this 
> is important in cross-DC mirroring setups where large buffer sizes are 
> essential to enable the mirror-maker processes to do bulk consumption. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-872) Socket server does not set send/recv buffer sizes

2013-04-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-872.
--

Resolution: Fixed

Thanks for the review. Applied to 0.8.

> Socket server does not set send/recv buffer sizes
> -
>
> Key: KAFKA-872
> URL: https://issues.apache.org/jira/browse/KAFKA-872
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Joel Koshy
>Assignee: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-872-v1.patch
>
>
> The socket server should set its send and receive socket buffer sizes - this 
> is important in cross-DC mirroring setups where large buffer sizes are 
> essential to enable the mirror-maker processes to do bulk consumption. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-880) NoLeaderPartitionSet should be cleared before leader finder thread is started up

2013-04-25 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-880:


 Summary: NoLeaderPartitionSet should be cleared before leader 
finder thread is started up
 Key: KAFKA-880
 URL: https://issues.apache.org/jira/browse/KAFKA-880
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


This was a recent regression.

This could prevent the consumer from progressing because fetchers for the 
currently owned partitions may not be added (depending on the order that the 
map iterator yields).

I think the fix should be simple - just clear the set after stopping the leader 
finder thread and stopping fetchers.

{code}
[2013-04-25 17:06:38,377] WARN 
[sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
, Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) 
(kafka.consumer.ConsumerFetcherManager$Lead
erFinderThread)
java.util.NoSuchElementException: key not found: [sometopic,24]
at scala.collection.MapLike$class.default(MapLike.scala:223)
at scala.collection.immutable.Map$Map2.default(Map.scala:110)
at scala.collection.MapLike$class.apply(MapLike.scala:134)
at scala.collection.immutable.Map$Map2.apply(Map.scala:110)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{code}


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-880) NoLeaderPartitionSet should be cleared before leader finder thread is started up

2013-04-25 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-880:
-

Description: 
This was a recent regression.

This could prevent the consumer from progressing because fetchers for the 
currently owned partitions may not be added (depending on the order that the 
map iterator yields).

I think the fix should be simple - just clear the set after stopping the leader 
finder thread and stopping fetchers.

[2013-04-25 17:06:38,377] WARN 
[sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
, Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) 
(kafka.consumer.ConsumerFetcherManager$Lead
erFinderThread)
java.util.NoSuchElementException: key not found: [sometopic,24]
at scala.collection.MapLike$class.default(MapLike.scala:223)
at scala.collection.immutable.Map$Map2.default(Map.scala:110)
at scala.collection.MapLike$class.apply(MapLike.scala:134)
at scala.collection.immutable.Map$Map2.apply(Map.scala:110)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


  was:
This was a recent regression.

This could prevent the consumer from progressing because fetchers for the 
currently owned partitions may not be added (depending on the order that the 
map iterator yields).

I think the fix should be simple - just clear the set after stopping the leader 
finder thread and stopping fetchers.

{code}
[2013-04-25 17:06:38,377] WARN 
[sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
, Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) 
(kafka.consumer.ConsumerFetcherManager$Lead
erFinderThread)
java.util.NoSuchElementException: key not found: [sometopic,24]
at scala.collection.MapLike$class.default(MapLike.scala:223)
at scala.collection.immutable.Map$Map2.default(Map.scala:110)
at scala.collection.MapLike$class.apply(MapLike.scala:134)
at scala.collection.immutable.Map$Map2.apply(Map.scala:110)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{code}



> NoLeaderPartitionSet should be cleared before leader finder thread is started 
> up
> 
>
> Key: KAFKA-880
> URL: https://issues.apache.org/jira/browse/KAFKA-880
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
>
> This was a recent regression.
> This could prevent the consumer from progressing because fetchers for the 
> currently owned partitions may not be added (depending on the order that the 
> map iterator yields).
> I think the fix should be simple - just clear the set after stopping the 
> leader finder thread and stopping fetchers.
> [2013-04-25 17:06:38,377] WARN 
> [sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
> , Failed to find leader for Set([sometopic,11], [sometopic,25], 
> [sometopic,24]) (kafka.consumer.C

[jira] [Commented] (KAFKA-890) The list of brokers for fetching metadata should be shuffled

2013-04-30 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13645736#comment-13645736
 ] 

Joel Koshy commented on KAFKA-890:
--

+1

It is worth noting that this is useful even in the presence of a VIP since the 
consumers don't currently use a VIP to to look up metadata.

> The list of brokers for fetching metadata should be shuffled 
> -
>
> Key: KAFKA-890
> URL: https://issues.apache.org/jira/browse/KAFKA-890
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: kafka-0.8, p1
> Attachments: kafka-890.patch
>
>
> The list of brokers in the metadata request is never shuffled. Which means 
> that if some clients are not using a VIP for metadata requests, the first 
> broker ends up servicing most metadata requests, leaving imbalanced load on 
> the brokers. This issue is even more pronounced when there are several 
> thousand clients talking to a cluster each using a broker list to fetch 
> metadata.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-889) Add mbeans to track socket server's response queue size in addition to request queue size

2013-04-30 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13646080#comment-13646080
 ] 

Joel Koshy commented on KAFKA-889:
--

+1

> Add mbeans to track socket server's response queue size in addition to 
> request queue size
> -
>
> Key: KAFKA-889
> URL: https://issues.apache.org/jira/browse/KAFKA-889
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Critical
>  Labels: kafka-0.8
> Attachments: kafka-889.patch
>
>
> We only track request queue size of the socket server. However, when response 
> send time is high, it is useful to know the response queue sizes as well

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-886) Update info on Controlled shutdown and Preferred replica election tool

2013-04-30 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13646092#comment-13646092
 ] 

Joel Koshy commented on KAFKA-886:
--

Thanks for the great write-up. Couple of comments:

1) We should probably add a note on the controlled shutdown tool (script) usage 
that it is currently JMX-based and depends on the jmx.remote.port property 
being set (otherwise you won't be be able to use the script and will need to 
poke jmx through other means). We can reference KAFKA-817 which will remedy 
this and make it zookeeper-based instead of JMX.
2) Due to the above, in case people need to use local JMX operations and 
essentially do manually what the script does automatically then it is best to 
do a controlled shutdown and bounce of the controller last (as otherwise there 
would be unnecessary controller re-elections).
3) For the ListTopicCommand tool - maybe we should mention that if there are a 
lot of topics and we list info for all topics it can take a while to run unless 
it is in the same datacenter as the ZK cluster. Actually I think the 
ListTopicCommand should really be using the SimpleConsumer or producer to fetch 
metadata instead of reading ZK directly. That way, people don't have to zip up 
Kafka and copy it over to their production environment. What do you think?

> Update info on Controlled shutdown and Preferred replica election tool
> --
>
> Key: KAFKA-886
> URL: https://issues.apache.org/jira/browse/KAFKA-886
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.8
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
>Priority: Blocker
>  Labels: p1
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-892) Change request log to include request completion not handling

2013-04-30 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13646192#comment-13646192
 ] 

Joel Koshy commented on KAFKA-892:
--

+1

> Change request log to include request completion not handling
> -
>
> Key: KAFKA-892
> URL: https://issues.apache.org/jira/browse/KAFKA-892
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Critical
>  Labels: kafka-0.8
> Attachments: kafka-892.patch
>
>
> While troubleshooting a lot of 0.8 issues, what I've seen is that often times 
> than not, I've wanted the request processing latency breakdown to be part of 
> the request log. There are of course mbeans to expose this information, but 
> when you are trying to troubleshoot the root cause of few slow requests, this 
> is immensely helpful. Currently, we only include request reception in the 
> request log. We could include both but that would double the size of the 
> request log. So I'm proposing adding just the request completion information 
> in the request log. This will not just tell us which request came into the 
> server, but will also give us a breakdown of where it spent most of its 
> processing time.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests

2013-05-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13654667#comment-13654667
 ] 

Joel Koshy commented on KAFKA-901:
--

Haven't looked at the patch yet, but went through the overview. An alternate 
approach that we may want to consider is to maintain a metadata cache at every 
broker. The cache can be kept consistent by having the controller send a (new) 
update-metadata request to all brokers whenever it sends out a leaderAndIsr 
request. A new request type would avoid needing to "overload" the leader and 
isr request.

This would help avoid the herd effect of multiple clients flooding the 
controller with metadata requests (although these requests should return 
quickly with your patch).


> Kafka server can become unavailable if clients send several metadata requests
> -
>
> Key: KAFKA-901
> URL: https://issues.apache.org/jira/browse/KAFKA-901
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
> Attachments: metadata-request-improvement.patch
>
>
> Currently, if a broker is bounced without controlled shutdown and there are 
> several clients talking to the Kafka cluster, each of the clients realize the 
> unavailability of leaders for some partitions. This leads to several metadata 
> requests sent to the Kafka brokers. Since metadata requests are pretty slow, 
> all the I/O threads quickly become busy serving the metadata requests. This 
> leads to a full request queue, that stalls handling of finished responses 
> since the same network thread handles requests as well as responses. In this 
> situation, clients timeout on metadata requests and send more metadata 
> requests. This quickly makes the Kafka cluster unavailable. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-914:


 Summary: Deadlock between initial rebalance and watcher-triggered 
rebalances
 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


Summary doesn't give the full picture and the fetcher-manager/fetcher-thread


code is very complex so it's a bit hard to articulate the following very


clearly. I will try and describe the sequence that results in a deadlock


when starting up a large number of consumers at around the same time:   





- When a consumer's createMessageStream method is called, it initiates an   


  initial inline rebalance. 


- However, before the above initial rebalance actually begins, a ZK watch   


  may trigger (due to some other consumers starting up) and initiate a  


  rebalance. This happens successfully so fetchers start and start filling  


  up the chunk queues.  


- Another watch triggers and initiates yet another rebalance. This rebalance


  attempt tries to close the fetchers. Before the fetchers are stopped, we  


  shutdown the leader-finder-thread to prevent new fetchers from being  


  started.  


- The shutdown is accomplished by interrupting the leader-finder-thread and 


  then awaiting its shutdown latch. 


- If the leader-finder-thread still has a partition without leader to   


  process and tries to add a fetcher for it, it will get an exception   


  (InterruptedException if acquiring the partitionMapLock or


  ClosedByInterruptException if performing an offset request). If we get an 


  InterruptedException the thread's interrupted flag is cleared.


- However, the leader-finder-thread may have mu

[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663146#comment-13663146
 ] 

Joel Koshy commented on KAFKA-914:
--

One more point: [td3] above does not need to originate from a watcher-triggered 
rebalance. The initial rebalance can also run into the same deadlock. i.e., as 
long as one or more watcher-triggered rebalances succeed and start fetchers 
prior to the initial rebalance, we may end up in this wedged state. E.g., on 
another instance I saw [td3] but on the main thread:

2013-05-21_17:07:14.34308 "main" prio=10 tid=0x7f5e34008000 nid=0x4e49 
waiting on condition [0x7f5e3b41]
2013-05-21_17:07:14.34308java.lang.Thread.State: WAITING (parking)
2013-05-21_17:07:14.34309   at sun.misc.Unsafe.park(Native Method)
2013-05-21_17:07:14.34309   - parking to wait for  <0x7f5d36d99fa0> (a 
java.util.concurrent.CountDownLatch$Sync)
2013-05-21_17:07:14.34309   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-21_17:07:14.34311   at 
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-21_17:07:14.34312   at 
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-21_17:07:14.34316   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-21_17:07:14.34316   - locked <0x7f5d36d4b2e0> (a 
java.lang.Object)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:712)
2013-05-21_17:07:14.34318   at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.foreach(List.scala:45)
2013-05-21_17:07:14.34320   at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.map(List.scala:45)
2013-05-21_17:07:14.34321   at 
kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-21_17:07:14.34322   at 
kafka.tools.MirrorMaker.main(MirrorMaker.scala)


> Deadlock between initial rebalance and watcher-triggered rebalances
> ---
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
>

[jira] [Updated] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-914:
-

Attachment: KAFKA-914-v1.patch

Patch with the mentioned fix.

1 - I added comments with some detail since the manager/fetcher/connector 
interaction is very tricky.
2 - Passing through throwables while shutting down. The isRunning check is 
probably unnecessary, but safer to keep.
3 - Made the following changes to the mirrormaker - I can put that in a 
separate jira as well.
  a - Currently if no streams are created, the mirrormaker doesn't quit. 
Setting streams to empty/nil fixes that issue.
  b - If a consumer-side exception (e.g., iterator timeout) gets thrown the 
mirror-maker does not exit. Addressed this by awaiting on the consumer threads 
at the end of the main method.



> Deadlock between initial rebalance and watcher-triggered rebalances
> ---
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-914-v1.patch
>
>
> Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
>   
> 
> code is very complex so it's a bit hard to articulate the following very  
>   
> 
> clearly. I will try and describe the sequence that results in a deadlock  
>   
> 
> when starting up a large number of consumers at around the same time: 
>   
> 
>   
>   
> 
> - When a consumer's createMessageStream method is called, it initiates an 
>   
> 
>   initial inline rebalance.   
>   
> 
> - However, before the above initial rebalance actually begins, a ZK watch 
>   
> 
>   may trigger (due to some other consumers starting up) and initiate a
>   
> 
>   rebalance. This happens successfully so fetchers start and start filling
>   
> 
>   up the chunk queues.
>   
> 
> - Another watch triggers and initiates yet another rebalance. This rebalance  
>   
> 
>   attempt tries to close the fetchers. Before the fetchers are stopped, we
>   
> 
>   shutdown the leader-finder-thread to prevent new fetchers from being
>   
> 
>   started.
>   
> 
> - The shutdown is accomplished by interrupting the leader-finder-thread and   
>   
> 
>   then awaiting its shutdown latch.   
>   
> 
> - If the leader-finder-thread still has

[jira] [Created] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

2013-05-22 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-916:


 Summary: Deadlock between fetcher shutdown and handling partitions 
with error
 Key: KAFKA-916
 URL: https://issues.apache.org/jira/browse/KAFKA-916
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


Here is another consumer deadlock that we encountered. All consumers are
vulnerable to this during a rebalance if there happen to be partitions in
error.

On a rebalance, the fetcher manager closes all fetchers and this holds on to
the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
While the fetcher manager is iterating over fetchers to stop them, a fetcher
that is yet to be stopped hits an error on a partition and proceeds to
handle partitions with error [t2]. This handling involves looking up the
fetcher for that partition and then removing it from the fetcher's set of
partitions to consume. This requires grabbing the same map lock in [t1],
hence the deadlock.

[t1]
2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b 
waiting on condition [0x7f1b2bd38000]
2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking)
2013-05-22_20:23:11.95767   at sun.misc.Unsafe.park(Native Method)
2013-05-22_20:23:11.95767   - parking to wait for  <0x7f1a25780598> (a 
java.util.concurrent.CountDownLatch$Sync)
2013-05-22_20:23:11.95767   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-22_20:23:11.95767   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-22_20:23:11.95768   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-22_20:23:11.95768   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-22_20:23:11.95768   at 
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-22_20:23:11.95768   at 
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-22_20:23:11.95769   at 
kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
2013-05-22_20:23:11.95769   at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
2013-05-22_20:23:11.95769   at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
2013-05-22_20:23:11.95769   at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-22_20:23:11.95769   at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-22_20:23:11.95770   at 
scala.collection.Iterator$class.foreach(Iterator.scala:631)
2013-05-22_20:23:11.95770   at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
2013-05-22_20:23:11.95770   at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
2013-05-22_20:23:11.95770   at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
2013-05-22_20:23:11.95771   at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
2013-05-22_20:23:11.95771   at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
---> 2013-05-22_20:23:11.95771  - locked <0x7f1a2ae92510> (a 
java.lang.Object)
2013-05-22_20:23:11.95771   at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
2013-05-22_20:23:11.95771   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
2013-05-22_20:23:11.95772   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
2013-05-22_20:23:11.95772   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
2013-05-22_20:23:11.95772   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
2013-05-22_20:23:11.95772   at 
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-22_20:23:11.95773   at 
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-22_20:23:11.95773   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
2013-05-22_20:23:11.95773   - locked <0x7f1a2a29b450> (a 
java.lang.Object)
2013-05-22_20:23:11.95773   at 
kafka.consumer.ZookeeperConsumerConn

[jira] [Resolved] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-22 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-914.
--

Resolution: Fixed

Thanks for the review. Committed after removing the unnecessary assignment in 
MirrorMaker.

> Deadlock between initial rebalance and watcher-triggered rebalances
> ---
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-914-v1.patch
>
>
> Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
>   
> 
> code is very complex so it's a bit hard to articulate the following very  
>   
> 
> clearly. I will try and describe the sequence that results in a deadlock  
>   
> 
> when starting up a large number of consumers at around the same time: 
>   
> 
>   
>   
> 
> - When a consumer's createMessageStream method is called, it initiates an 
>   
> 
>   initial inline rebalance.   
>   
> 
> - However, before the above initial rebalance actually begins, a ZK watch 
>   
> 
>   may trigger (due to some other consumers starting up) and initiate a
>   
> 
>   rebalance. This happens successfully so fetchers start and start filling
>   
> 
>   up the chunk queues.
>   
> 
> - Another watch triggers and initiates yet another rebalance. This rebalance  
>   
> 
>   attempt tries to close the fetchers. Before the fetchers are stopped, we
>   
> 
>   shutdown the leader-finder-thread to prevent new fetchers from being
>   
> 
>   started.
>   
> 
> - The shutdown is accomplished by interrupting the leader-finder-thread and   
>   
> 
>   then awaiting its shutdown latch.   
>   
> 
> - If the leader-finder-thread still has a partition without leader to 
>   
> 
>   process and tries to add a fetcher for it, it will get an exception 
>   
> 
>   (InterruptedException if acquiring the partitionMapLock or  
>   
> 

[jira] [Closed] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-22 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-914.



> Deadlock between initial rebalance and watcher-triggered rebalances
> ---
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-914-v1.patch
>
>
> Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
>   
> 
> code is very complex so it's a bit hard to articulate the following very  
>   
> 
> clearly. I will try and describe the sequence that results in a deadlock  
>   
> 
> when starting up a large number of consumers at around the same time: 
>   
> 
>   
>   
> 
> - When a consumer's createMessageStream method is called, it initiates an 
>   
> 
>   initial inline rebalance.   
>   
> 
> - However, before the above initial rebalance actually begins, a ZK watch 
>   
> 
>   may trigger (due to some other consumers starting up) and initiate a
>   
> 
>   rebalance. This happens successfully so fetchers start and start filling
>   
> 
>   up the chunk queues.
>   
> 
> - Another watch triggers and initiates yet another rebalance. This rebalance  
>   
> 
>   attempt tries to close the fetchers. Before the fetchers are stopped, we
>   
> 
>   shutdown the leader-finder-thread to prevent new fetchers from being
>   
> 
>   started.
>   
> 
> - The shutdown is accomplished by interrupting the leader-finder-thread and   
>   
> 
>   then awaiting its shutdown latch.   
>   
> 
> - If the leader-finder-thread still has a partition without leader to 
>   
> 
>   process and tries to add a fetcher for it, it will get an exception 
>   
> 
>   (InterruptedException if acquiring the partitionMapLock or  
>   
> 
>   ClosedByInterruptException if performing an offset request). If we get an   
>

[jira] [Commented] (KAFKA-911) Bug in controlled shutdown logic in controller leads to controller not sending out some state change request

2013-05-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13666532#comment-13666532
 ] 

Joel Koshy commented on KAFKA-911:
--

I had to revisit the notes from KAFKA-340. I think this was touched upon. i.e., 
the fact that the current implementation's attempt to shrink ISR may be 
ineffective for partitions whose leadership has been moved from the current 
broker - 
https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13483478&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483478


> 3.4 What is the point of sending leader and isr request at the end of 
> shutdownBroker, since the OfflineReplica state 
> change would've taken care of that anyway. It seems like you just need to 
> send the stop replica request with the delete 
> partitions flag turned off, no ? 

I still need (as an optimization) to send the leader and isr request to the 
leaders of all partitions that are present 
on the shutting down broker so it can remove the shutting down broker from its 
inSyncReplicas cache 
(in Partition.scala) so it no longer waits for acks from the shutting down 
broker if a producer request's num-acks is 
set to -1. Otherwise, we have to wait for the leader to "organically" shrink 
the ISR. 

This also applies to partitions which are moved (i.e., partitions for which the 
shutting down broker was the leader): 
the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr 
request to the shutting down broker as well 
(to tell it that it is no longer the leader) at which point it will start up a 
replica fetcher and re-enter the ISR. 
So in fact, there is actually not much point in removing the "current leader" 
from the ISR in the 
ControlledShutdownLeaderSelector.selectLeader. 


and 

https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13484727&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13484727
(I don't think I actually filed that jira though.)


> Bug in controlled shutdown logic in controller leads to controller not 
> sending out some state change request 
> -
>
> Key: KAFKA-911
> URL: https://issues.apache.org/jira/browse/KAFKA-911
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: kafka-0.8, p1
> Attachments: kafka-911-v1.patch
>
>
> The controlled shutdown logic in the controller first tries to move the 
> leaders from the broker being shutdown. Then it tries to remove the broker 
> from the isr list. During that operation, it does not synchronize on the 
> controllerLock. This causes a race condition while dispatching data using the 
> controller's channel manager.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

2013-05-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-916:
-

Attachment: KAFKA-916-v1.patch

Agreed - I think that should fix the issue.

> Deadlock between fetcher shutdown and handling partitions with error
> 
>
> Key: KAFKA-916
> URL: https://issues.apache.org/jira/browse/KAFKA-916
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-916-v1.patch
>
>
> Here is another consumer deadlock that we encountered. All consumers are
> vulnerable to this during a rebalance if there happen to be partitions in
> error.
> On a rebalance, the fetcher manager closes all fetchers and this holds on to
> the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
> While the fetcher manager is iterating over fetchers to stop them, a fetcher
> that is yet to be stopped hits an error on a partition and proceeds to
> handle partitions with error [t2]. This handling involves looking up the
> fetcher for that partition and then removing it from the fetcher's set of
> partitions to consume. This requires grabbing the same map lock in [t1],
> hence the deadlock.
> [t1]
> 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b 
> waiting on condition [0x7f1b2bd38000]
> 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95767 - parking to wait for  <0x7f1a25780598> (a 
> java.util.concurrent.CountDownLatch$Sync)
> 2013-05-22_20:23:11.95767 at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95767 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 2013-05-22_20:23:11.95768 at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
> 2013-05-22_20:23:11.95769 at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95769 at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 2013-05-22_20:23:11.95771 at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 2013-05-22_20:23:11.95771 at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
> ---> 2013-05-22_20:23:11.95771- locked <0x7f1a2ae92510> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95771 at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
> 2013-05-22_20:23:11.95771 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> 2013-05-22_20:23:11.95772 at 
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> 2013

[jira] [Closed] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

2013-05-28 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-916.



Thanks for the review. Committed to 0.8

> Deadlock between fetcher shutdown and handling partitions with error
> 
>
> Key: KAFKA-916
> URL: https://issues.apache.org/jira/browse/KAFKA-916
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-916-v1.patch
>
>
> Here is another consumer deadlock that we encountered. All consumers are
> vulnerable to this during a rebalance if there happen to be partitions in
> error.
> On a rebalance, the fetcher manager closes all fetchers and this holds on to
> the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
> While the fetcher manager is iterating over fetchers to stop them, a fetcher
> that is yet to be stopped hits an error on a partition and proceeds to
> handle partitions with error [t2]. This handling involves looking up the
> fetcher for that partition and then removing it from the fetcher's set of
> partitions to consume. This requires grabbing the same map lock in [t1],
> hence the deadlock.
> [t1]
> 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b 
> waiting on condition [0x7f1b2bd38000]
> 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95767 - parking to wait for  <0x7f1a25780598> (a 
> java.util.concurrent.CountDownLatch$Sync)
> 2013-05-22_20:23:11.95767 at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95767 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 2013-05-22_20:23:11.95768 at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
> 2013-05-22_20:23:11.95769 at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95769 at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 2013-05-22_20:23:11.95771 at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 2013-05-22_20:23:11.95771 at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
> ---> 2013-05-22_20:23:11.95771- locked <0x7f1a2ae92510> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95771 at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
> 2013-05-22_20:23:11.95771 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> 2013-05-22_20:23:11.95772 at 
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> 2013-05-22_20:23:11.95773 at 
> scala.col

[jira] [Resolved] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

2013-05-28 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-916.
--

Resolution: Fixed

> Deadlock between fetcher shutdown and handling partitions with error
> 
>
> Key: KAFKA-916
> URL: https://issues.apache.org/jira/browse/KAFKA-916
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-916-v1.patch
>
>
> Here is another consumer deadlock that we encountered. All consumers are
> vulnerable to this during a rebalance if there happen to be partitions in
> error.
> On a rebalance, the fetcher manager closes all fetchers and this holds on to
> the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
> While the fetcher manager is iterating over fetchers to stop them, a fetcher
> that is yet to be stopped hits an error on a partition and proceeds to
> handle partitions with error [t2]. This handling involves looking up the
> fetcher for that partition and then removing it from the fetcher's set of
> partitions to consume. This requires grabbing the same map lock in [t1],
> hence the deadlock.
> [t1]
> 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x7f1b24007800 nid=0x573b 
> waiting on condition [0x7f1b2bd38000]
> 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95767 - parking to wait for  <0x7f1a25780598> (a 
> java.util.concurrent.CountDownLatch$Sync)
> 2013-05-22_20:23:11.95767 at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95767 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 2013-05-22_20:23:11.95768 at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 2013-05-22_20:23:11.95768 at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
> 2013-05-22_20:23:11.95769 at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
> 2013-05-22_20:23:11.95769 at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95769 at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 2013-05-22_20:23:11.95770 at 
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 2013-05-22_20:23:11.95771 at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 2013-05-22_20:23:11.95771 at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
> ---> 2013-05-22_20:23:11.95771- locked <0x7f1a2ae92510> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95771 at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
> 2013-05-22_20:23:11.95771 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
> 2013-05-22_20:23:11.95772 at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> 2013-05-22_20:23:11.95772 at 
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> 2013-05-22_20:23:11.95773 at 
> scala.collection.immutable.Range$$an

[jira] [Created] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-28 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-921:


 Summary: Expose max lag mbean for consumers and replica fetchers
 Key: KAFKA-921
 URL: https://issues.apache.org/jira/browse/KAFKA-921
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


We have a ton of consumer mbeans with names that are derived from the consumer 
id, broker being fetched from, fetcher id, etc. This makes it difficult to do 
basic monitoring of consumer/replica fetcher lag - since the mbean to monitor 
can change. A more useful metric for monitoring purposes is the maximum lag 
across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-28 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Attachment: KAFKA-921-v1.patch

This provides a max lag mbean for both consumer fetcher manager and replica 
fetcher manager; although I think it is more useful for monitoring consumers. 
For replica fetchers we need to closely monitor all replica fetchers anyway. 
i.e., the set of mbeans is static. I can reduce the scope to just consumers if 
others agree.

> Expose max lag mbean for consumers and replica fetchers
> ---
>
> Key: KAFKA-921
> URL: https://issues.apache.org/jira/browse/KAFKA-921
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-921-v1.patch
>
>
> We have a ton of consumer mbeans with names that are derived from the 
> consumer id, broker being fetched from, fetcher id, etc. This makes it 
> difficult to do basic monitoring of consumer/replica fetcher lag - since the 
> mbean to monitor can change. A more useful metric for monitoring purposes is 
> the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-29 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Attachment: KAFKA-921-v2.patch

Yes - I think that would be better. Moved it to AbstractFetcherManager. So 
depending on whether you are looking at replica fetchers or consumer fetchers, 
the MaxLag mbean will show up in ReplicaFetcherManager or 
ConsumerFetcherManager respectively.

> Expose max lag mbean for consumers and replica fetchers
> ---
>
> Key: KAFKA-921
> URL: https://issues.apache.org/jira/browse/KAFKA-921
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch
>
>
> We have a ton of consumer mbeans with names that are derived from the 
> consumer id, broker being fetched from, fetcher id, etc. This makes it 
> difficult to do basic monitoring of consumer/replica fetcher lag - since the 
> mbean to monitor can change. A more useful metric for monitoring purposes is 
> the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Attachment: KAFKA-921-v3.patch

One caveat in this approach is that if a fetcher is wedged for any reason, then 
the reported lag is inaccurate since it depends on getting the high watermark 
from fetch responses. i.e., to check on the health of a consumer you would need 
to look at both the max lag and min fetch rate across all fetchers.


> Expose max lag mbean for consumers and replica fetchers
> ---
>
> Key: KAFKA-921
> URL: https://issues.apache.org/jira/browse/KAFKA-921
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, 
> KAFKA-921-v3.patch
>
>
> We have a ton of consumer mbeans with names that are derived from the 
> consumer id, broker being fetched from, fetcher id, etc. This makes it 
> difficult to do basic monitoring of consumer/replica fetcher lag - since the 
> mbean to monitor can change. A more useful metric for monitoring purposes is 
> the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Status: Patch Available  (was: Open)

> Expose max lag mbean for consumers and replica fetchers
> ---
>
> Key: KAFKA-921
> URL: https://issues.apache.org/jira/browse/KAFKA-921
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, 
> KAFKA-921-v3.patch
>
>
> We have a ton of consumer mbeans with names that are derived from the 
> consumer id, broker being fetched from, fetcher id, etc. This makes it 
> difficult to do basic monitoring of consumer/replica fetcher lag - since the 
> mbean to monitor can change. A more useful metric for monitoring purposes is 
> the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-31 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-921.



> Expose max lag mbean for consumers and replica fetchers
> ---
>
> Key: KAFKA-921
> URL: https://issues.apache.org/jira/browse/KAFKA-921
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, 
> KAFKA-921-v3.patch
>
>
> We have a ton of consumer mbeans with names that are derived from the 
> consumer id, broker being fetched from, fetcher id, etc. This makes it 
> difficult to do basic monitoring of consumer/replica fetcher lag - since the 
> mbean to monitor can change. A more useful metric for monitoring purposes is 
> the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-31 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the reviews. Committed with the minor change - i.e., Replica instead 
of Replica-

> Expose max lag mbean for consumers and replica fetchers
> ---
>
> Key: KAFKA-921
> URL: https://issues.apache.org/jira/browse/KAFKA-921
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, 
> KAFKA-921-v3.patch
>
>
> We have a ton of consumer mbeans with names that are derived from the 
> consumer id, broker being fetched from, fetcher id, etc. This makes it 
> difficult to do basic monitoring of consumer/replica fetcher lag - since the 
> mbean to monitor can change. A more useful metric for monitoring purposes is 
> the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook

2013-06-04 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13674598#comment-13674598
 ] 

Joel Koshy commented on KAFKA-927:
--

+1 - sorry I got to this late.

Small nit: the scaladoc for shutdown broker needs an edit which we will clean 
up later.
We probably don't need the adminTest's testShutdownBroker given that the 
rolling bounce test exercises the same logic.

Also, I think we can close KAFKA-817 - another approach with similar goals.



> Integrate controlled shutdown into kafka shutdown hook
> --
>
> Key: KAFKA-927
> URL: https://issues.apache.org/jira/browse/KAFKA-927
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
> Fix For: 0.8
>
> Attachments: KAFKA-927.patch, KAFKA-927-v2.patch, 
> KAFKA-927-v2-revised.patch, KAFKA-927-v3.patch, 
> KAFKA-927-v3-removeimports.patch, KAFKA-927-v4.patch
>
>
> The controlled shutdown mechanism should be integrated into the software for 
> better operational benefits. Also few optimizations can be done to reduce 
> unnecessary rpc and zk calls. This patch has been tested on a prod like 
> environment by doing rolling bounces continuously for a day. The average time 
> of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
> without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
> Also it ensures correctness in scenarios where the controller shrinks the isr 
> and the new leader could place the broker to be shutdown back into the isr.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-938) High CPU usage when more or less idle

2013-06-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681479#comment-13681479
 ] 

Joel Koshy commented on KAFKA-938:
--

Excellent catch Sam!

One comment: I think the DelayedItem class was intended to support arbitrary 
(non-millisecond) timunits but that was buggy in two ways:
(i)  The getDelay's 'unit' parameter shadowed the DelayedItem's 'unit' member
(ii) The delayMs val assumes that the delay is always in ms (which prevents 
DelayedItem from supporting arbitrary time units).
Also, I think we must have missed the bit of the DelayQueue documentation that 
says getDelay is called with TimeUnit.NANOSECONDS

I think we can tweak this a bit to make it support arbitrary timeunits - 
otherwise, the "unit" parameter of DelayedItem is of no use. I can attach a 
patch to make this clearer.

> High CPU usage when more or less idle
> -
>
> Key: KAFKA-938
> URL: https://issues.apache.org/jira/browse/KAFKA-938
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Sam Meder
>Priority: Critical
> Fix For: 0.8
>
> Attachments: timeunit.patch
>
>
> We've noticed Kafka using a lot of CPU in a pretty much idle environment and 
> tracked it down to it's DelayedItem implementation. In particular, the time 
> conversion for how much longer to wait:
>   def getDelay(unit: TimeUnit): Long = {
> val elapsedMs = (SystemTime.milliseconds - createdMs)
> unit.convert(max(delayMs - elapsedMs, 0), unit)
>   }
> does not actually convert, so Kafka ends up treating a ms value like 
> nanoseconds, e.g. waking up every 100 ns or so. The above code should really 
> be:
>   def getDelay(unit: TimeUnit): Long = {
> val elapsedMs = (SystemTime.milliseconds - createdMs)
> unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS)
>   }
> I'll attach a patch.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-938) High CPU usage when more or less idle

2013-06-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681497#comment-13681497
 ] 

Joel Koshy commented on KAFKA-938:
--

Actually, small correction: I overlooked that the delayMs val converts the 
given delay from its source unit to milliseconds. So the only caveat is that 
precision will be lost if the desired timeunit is nanos - which we don't really 
need so I don't think we need any further changes here. Thanks again!

> High CPU usage when more or less idle
> -
>
> Key: KAFKA-938
> URL: https://issues.apache.org/jira/browse/KAFKA-938
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Sam Meder
>Priority: Critical
> Fix For: 0.8
>
> Attachments: timeunit.patch
>
>
> We've noticed Kafka using a lot of CPU in a pretty much idle environment and 
> tracked it down to it's DelayedItem implementation. In particular, the time 
> conversion for how much longer to wait:
>   def getDelay(unit: TimeUnit): Long = {
> val elapsedMs = (SystemTime.milliseconds - createdMs)
> unit.convert(max(delayMs - elapsedMs, 0), unit)
>   }
> does not actually convert, so Kafka ends up treating a ms value like 
> nanoseconds, e.g. waking up every 100 ns or so. The above code should really 
> be:
>   def getDelay(unit: TimeUnit): Long = {
> val elapsedMs = (SystemTime.milliseconds - createdMs)
> unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS)
>   }
> I'll attach a patch.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681787#comment-13681787
 ] 

Joel Koshy commented on KAFKA-937:
--

+1 on the patch.

Additionally, can you make this small (unrelated change) -  make the console 
consumer's autoCommitIntervalOpt default to ConsumerConfig.AutoCommitInterval ?

I think it is worth documenting the typical path of getting into the above 
deadlock:
- Assume at least two fetchers F1, F2
- One or more partitions on F1 go into error and leader finder thread L is 
notified
- L unblocks and proceeds to handle partitions without leader. It holds the 
ConsumerFetcherManager's lock at this point.
- All partitions on F2 go into error.
- F2's handlePartitionsWithError removes partitions from its fetcher's 
partitionMap. (At this point, F2 is by definition an idle fetcher thread.)
- L tries to shutdown idle fetcher threads - i.e., tries to shutdown F2.
- However, F2 at this point is trying to addPartitionsWithError which needs to 
acquire the ConsumerFetcherManager's lock (which is currently held by L).

It is relatively rare in the sense that it can happen only if all partitions on 
the fetcher are in error. This could happen for example if all the leaders for 
those partitions move or become unavailable. Another instance where this may be 
seen in practice is mirroring: we ran into it when running the mirror maker 
with a very large number of producers and ran out of file handles. Running out 
of file handles could easily lead to exceptions on most/all fetches and result 
in an error state for all partitions.


> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-940) Scala match error in javaapi.Implicits

2013-06-12 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-940:


 Summary: Scala match error in javaapi.Implicits
 Key: KAFKA-940
 URL: https://issues.apache.org/jira/browse/KAFKA-940
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


This would affect javaapi users who (correctly) test for null on API calls 
(e.g., if (partitionMetadata.leader == null))

Right now, we actually get a match error:
scala.MatchError: null
at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:38)
at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:40)
at kafka.javaapi.PartitionMetadata.leader(TopicMetadata.scala:51)



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-940) Scala match error in javaapi.Implicits

2013-06-12 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-940:
-

Attachment: KAFKA-940-v1.patch

Simple fix.

> Scala match error in javaapi.Implicits
> --
>
> Key: KAFKA-940
> URL: https://issues.apache.org/jira/browse/KAFKA-940
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-940-v1.patch
>
>
> This would affect javaapi users who (correctly) test for null on API calls 
> (e.g., if (partitionMetadata.leader == null))
> Right now, we actually get a match error:
> scala.MatchError: null
>   at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:38)
>   at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:40)
>   at kafka.javaapi.PartitionMetadata.leader(TopicMetadata.scala:51)
> 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-947) isr-expiration-thread may block LeaderAndIsr request for a relatively long period

2013-06-19 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13688719#comment-13688719
 ] 

Joel Koshy commented on KAFKA-947:
--

+1 thanks for the patch!

> isr-expiration-thread may block LeaderAndIsr request for a relatively long 
> period 
> --
>
> Key: KAFKA-947
> URL: https://issues.apache.org/jira/browse/KAFKA-947
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-947.patch
>
>
> If there are lots of partitions whose isr needs to be shrank, 
> isr-expiration-thread will hold a long lock on leaderPartitionsLock, which 
> will delay LeaderAndIsr requests.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Assigned] (KAFKA-559) Garbage collect old consumer metadata entries

2013-07-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy reassigned KAFKA-559:


Assignee: Tejas Patil

Assigning to Tejas, since he has done some work on this recently.



> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Tejas Patil
>  Labels: project
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2013-07-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13698307#comment-13698307
 ] 

Joel Koshy commented on KAFKA-559:
--

One additional recommendation: support a --dry-run option.



> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Tejas Patil
>  Labels: project
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Assigned] (KAFKA-958) Please compile list of key metrics on the broker and client side and put it on a wiki

2013-07-03 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy reassigned KAFKA-958:


Assignee: Joel Koshy

> Please compile list of key metrics on the broker and client side and put it 
> on a wiki
> -
>
> Key: KAFKA-958
> URL: https://issues.apache.org/jira/browse/KAFKA-958
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.8
>Reporter: Vadim
>Assignee: Joel Koshy
>Priority: Minor
>
> Please compile list of important metrics that need to be monitored by 
> companies  to insure healthy operation of the kafka service

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699211#comment-13699211
 ] 

Joel Koshy commented on KAFKA-960:
--

Given that there are API changes and mbean name changes between 2.x and 3.x my 
preference would be to defer this to a few months later (after the official 3.x 
release has proven to be stable).

> Upgrade Metrics to 3.x
> --
>
> Key: KAFKA-960
> URL: https://issues.apache.org/jira/browse/KAFKA-960
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Cosmin Lehene
> Fix For: 0.8
>
>
> Now that metrics 3.0 has been released 
> (http://metrics.codahale.com/about/release-notes/) we can upgrade back

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699545#comment-13699545
 ] 

Joel Koshy commented on KAFKA-960:
--

Not really. However, my point is that given that going both directions (upgrade 
and downgrade) are a bit painful due to the API and mbean changes we should let 
3.x prove itself to be stable in other contexts for a period of time before we 
switch to it.


> Upgrade Metrics to 3.x
> --
>
> Key: KAFKA-960
> URL: https://issues.apache.org/jira/browse/KAFKA-960
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8
>Reporter: Cosmin Lehene
> Fix For: 0.8
>
>
> Now that metrics 3.0 has been released 
> (http://metrics.codahale.com/about/release-notes/) we can upgrade back

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699567#comment-13699567
 ] 

Joel Koshy commented on KAFKA-961:
--

Passing in null for time would definitely lead to that NPE as you found. I 
think we only needed a time interface to support a mocktime for tests. Also, we 
probably didn't anticipate that KafkaServer's would need to be embedded in Java 
code. If you are okay with your work-around, then great. Another (ugly) way to 
do it would be to pass in a dynamically instantiated SystemTime - so something 
like (Time) Class.forName(SystemTime.class.getName()).newInstance() - not sure 
if it will work though. We can also provide an explicit constructor without the 
time argument and get rid of the scala default arg.


> state.change.logger: Error on broker 1 while processing LeaderAndIsr request 
> correlationId 6 received from controller 1 epoch 1 for partition 
> (page_visits,0)
> -
>
> Key: KAFKA-961
> URL: https://issues.apache.org/jira/browse/KAFKA-961
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
> Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 
> 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Garrett Barton
>
> Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I 
> just pulled the latest from git, did a ./sbt +package, followed by ./sbt 
> assembly-package-dependency. And pushed 
> core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo.  
> Here is sample code ripped out to little classes to show my error:
> Starting up a broker embedded in java, with the following code:
> ...
>   Properties props = new Properties();
>   // dont set so it binds to all interfaces
>   // props.setProperty("hostname", hostName);
>   props.setProperty("port", );
>   props.setProperty("broker.id", "1");
>   props.setProperty("log.dir", "/tmp/embeddedkafka/" + 
> randId);
>   // TODO: hardcode bad
>   props.setProperty("zookeeper.connect", 
> "localhost:2181/" + randId);
>   KafkaConfig kconf = new KafkaConfig(props);
>   
>   server = new KafkaServer(kconf, null);
>   server.startup();
>   LOG.info("Broker online");
> Sample Producer has the following code:
> ...
>   Properties props = new Properties();
>   props.put("metadata.broker.list", "gman-minty:");
>   props.put("serializer.class", "kafka.serializer.StringEncoder");
>   props.put("partitioner.class", 
> "com.gman.broker.SimplePartitioner");
>   props.put("request.required.acks", "1");
>   ProducerConfig config = new ProducerConfig(props);
>   
>   Producer producer = new Producer String>(config);
>   LOG.info("producer created");
>   KeyedMessage data = new KeyedMessage String>("page_visits", "key1", "value1");
>   producer.send(data);
>   LOG.info("wrote message: " + data);
> And here is the server log:
> INFO  2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: 
> Verifying properties
> INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
> Property port is overridden to 
> INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
> Property broker.id is overridden to 1
> INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
> Property zookeeper.connect is overridden to localhost:2181/kafkatest
> INFO  2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: 
> Property log.dir is overridden to \tmp\embeddedkafka\1372873650268
> INFO  2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka 
> Server 1], Starting
> INFO  2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager 
> on Broker 1] Log directory 
> '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' 
> not found, creating it.
> INFO  2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager 
> on Broker 1] Starting log cleaner every 60 ms
> INFO  2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager 
> on Broker 1] Starting log flusher every 3000 ms with the following overrides 
> Map()
> INFO  2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting 
> socket connections on 0.0.0.0:.
> INFO  2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket

[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699641#comment-13699641
 ] 

Joel Koshy commented on KAFKA-915:
--

This failure is due to the fact that the leaderAndIsr request has not yet made 
it to the brokers until after the mirror maker's rebalance completes. This is 
related to the issue reported in KAFKA-956. Previously (before we started 
caching metadata at the brokers) the partition information was retrieved 
directly from zk.

The fix for now would be to use the create topic admin before starting the 
mirror maker (or move the producer performance start up to well before the 
mirror maker startup).


> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2013-07-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703007#comment-13703007
 ] 

Joel Koshy commented on KAFKA-559:
--

Thanks for the patch. Overall, looks good.  Couple of comments, mostly minor
in no particular order:

* I think dry-run does not need any further qualifier such as withOptionalArg, 
describedAs, ofType - it's just a flag.
* For --since, I prefer the seconds since epoch over some fixed input format 
which then brings in ambiguity such as timezone 24h vs 12h, etc. A better 
alternative would be to accept date strings and use  the DateFormat class with 
lenient parsing turned on or something like that. --before may be more 
intuitive than --since.
* Can use CommandLineUtils.checkRequiredArgs
* deleteBy matching - prefer to use case match and thereby avoid the explicit 
check on valid values. Also the message on invalid value of deleteBy should 
inform what the valid values are.
* Right now you support the following modes: delete stale topics across all 
groups, delete stale topics in a specific group. I think it would be useful to 
make deleteBy optional - if unspecified, it scans all groups and gets rid of 
stale groups.
* line 75: warn ("msg", e)
* line 101: should provide a reason for aborting
* line 110: doesn't gropudirs have an offset path? if not maybe we should add it
* Logging should include last mtime as that may be useful information reported 
by the dry-run
* No need to add a wrapper shell script for the tool.
* make all of the methods except main private.
* The return statements can be dropped - i.e., just write the return value.
* Several vars can be vals instead.
* removeBrokerPartitionpairs: I don't think you would want to do a partial 
delete under a topic directory. You can check that all the partition offset 
paths are <= since and if so, just delete the topic path. With that the method 
would be better named something like deleteUnconsumedTopicsFromGroup?
* Finally, you are probably aware that there are a bunch of race conditions - 
e.g., checkIfLiveConsumers is a helpful check to have but not guaranteed to be 
correct as some consumers may creep in while the tool is running. However, I 
think it is reasonable for a tool like this to ignore that since a "since" 
value way back would mean the probability of that occuring is very low. Similar 
note for deleteGroupIfNoTopicExists.


> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Tejas Patil
>  Labels: project
> Attachments: KAFKA-559.v1.patch, KAFKA-559.v2.patch
>
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

2013-07-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13703755#comment-13703755
 ] 

Joel Koshy commented on KAFKA-966:
--

One way to accomplish this is to turn off autocommit and checkpoint offsets 
only after a message (or batch of messages) have been written to the DB.

One caveat though is that rebalances (e.g., if a new consumer instance shows 
up) will result in offsets being committed so there would be an issue if the DB 
is unavailable and a rebalance occurs simultaneously and there are unprocessed 
messages that have already been pulled out of the iterator.


> Allow high level consumer to 'nak' a message and force Kafka to close the 
> KafkaStream without losing that message
> -
>
> Key: KAFKA-966
> URL: https://issues.apache.org/jira/browse/KAFKA-966
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.8
>Reporter: Chris Curtin
>Assignee: Neha Narkhede
>Priority: Minor
>
> Enhancement request.
> The high level consumer is very close to handling a lot of situations a 
> 'typical' client would need. Except for when the message received from Kafka 
> is valid, but the business logic that wants to consume it has a problem.
> For example if I want to write the value to a MongoDB or Cassandra database 
> and the database is not available. I won't know until I go to do the write 
> that the database isn't available, but by then it is too late to NOT read the 
> message from Kafka. Thus if I call shutdown() to stop reading, that message 
> is lost since the offset Kafka writes to ZooKeeper is the next offset.
> Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the 
> next offset to read for this partition to this message when I start up again. 
> And if there are any messages in the BlockingQueue for other partitions, find 
> the lowest # and use it for that partitions offset since I haven't consumed 
> them yet.
> Thus I can cleanly shutdown my processing, resolve whatever the issue is and 
> restart the process.
> Another idea might be to allow a 'peek' into the next message and if I 
> succeed in writing to the database call 'next' to remove it from the queue. 
> I understand this won't deal with a 'kill -9' or hard failure of the JVM 
> leading to the latest offsets not being written to ZooKeeper but it addresses 
> a likely common scenario for consumers. Nor will it add true transactional 
> support since the ZK update could fail.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

2013-07-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13705272#comment-13705272
 ] 

Joel Koshy commented on KAFKA-966:
--

Yes if you need to implement support for transactions across partitions that 
are potentially owned by different consumer instances then this approach 
wouldn't work. Not sure if it is feasible in your case but if there are a group 
of messages that need to be committed together then you could send them with a 
key and partition those messages into the same partition. So exactly one 
consumer thread will be responsible for those messages.


> Allow high level consumer to 'nak' a message and force Kafka to close the 
> KafkaStream without losing that message
> -
>
> Key: KAFKA-966
> URL: https://issues.apache.org/jira/browse/KAFKA-966
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.8
>Reporter: Chris Curtin
>Assignee: Neha Narkhede
>Priority: Minor
>
> Enhancement request.
> The high level consumer is very close to handling a lot of situations a 
> 'typical' client would need. Except for when the message received from Kafka 
> is valid, but the business logic that wants to consume it has a problem.
> For example if I want to write the value to a MongoDB or Cassandra database 
> and the database is not available. I won't know until I go to do the write 
> that the database isn't available, but by then it is too late to NOT read the 
> message from Kafka. Thus if I call shutdown() to stop reading, that message 
> is lost since the offset Kafka writes to ZooKeeper is the next offset.
> Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the 
> next offset to read for this partition to this message when I start up again. 
> And if there are any messages in the BlockingQueue for other partitions, find 
> the lowest # and use it for that partitions offset since I haven't consumed 
> them yet.
> Thus I can cleanly shutdown my processing, resolve whatever the issue is and 
> restart the process.
> Another idea might be to allow a 'peek' into the next message and if I 
> succeed in writing to the database call 'next' to remove it from the queue. 
> I understand this won't deal with a 'kill -9' or hard failure of the JVM 
> leading to the latest offsets not being written to ZooKeeper but it addresses 
> a likely common scenario for consumers. Nor will it add true transactional 
> support since the ZK update could fail.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-969) Need to prevent failure of rebalance when there are no brokers available when consumer comes up

2013-07-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13705340#comment-13705340
 ] 

Joel Koshy commented on KAFKA-969:
--

This seems reasonable, but I'm not fully convinced about it. E.g., a test
framework should ensure external dependencies are up before attempting to
make service calls to those dependencies. That said, it is perhaps also
reasonable from a consumer's perspective to expect that returned streams be
empty at first, and whenever brokers and topics show up, then events should
just show up.

I'm +1 on this patch except for the if-else formatting issue.  Also, I think
this patch alone would be insufficient to meet the above.  There are two
other issues:

- We should register a watcher under the topics path (currently done only if
  a wildcard is specified)
- KAFKA-956 is also related. I need to give that one some thought.




> Need to prevent failure of rebalance when there are no brokers available when 
> consumer comes up
> ---
>
> Key: KAFKA-969
> URL: https://issues.apache.org/jira/browse/KAFKA-969
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
> Attachments: emptybrokeronrebalance-1.patch
>
>
> There are some rare instances when a consumer would be up before bringing up 
> the Kafka brokers. This would usually happen in a test scenario. In such 
> conditions, during rebalance instead of failing the rebalance we just log the 
> error and subscribe to broker changes. When the broker comes back up, we 
> trigger the rebalance.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-969) Need to prevent failure of rebalance when there are no brokers available when consumer comes up

2013-07-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13705490#comment-13705490
 ] 

Joel Koshy commented on KAFKA-969:
--

As I already said, I'm +1 on this patch for what it intends to address. Those 
two issues I mentioned are orthogonal. By "above" in my comment I was referring 
to the possible expectation from consumers: ".. from a consumer's perspective 
to expect that returned streams be empty at first, and whenever brokers and 
topics show up, then events should just show up." - not the "failed to 
rebalance issue".


> Need to prevent failure of rebalance when there are no brokers available when 
> consumer comes up
> ---
>
> Key: KAFKA-969
> URL: https://issues.apache.org/jira/browse/KAFKA-969
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
> Fix For: 0.8
>
> Attachments: emptybrokeronrebalance-1.patch
>
>
> There are some rare instances when a consumer would be up before bringing up 
> the Kafka brokers. This would usually happen in a test scenario. In such 
> conditions, during rebalance instead of failing the rebalance we just log the 
> error and subscribe to broker changes. When the broker comes back up, we 
> trigger the rebalance.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-07-15 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-705.
--

Resolution: Fixed

> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
> shutdown_brokers_eat.py, shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-07-15 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13709525#comment-13709525
 ] 

Joel Koshy commented on KAFKA-705:
--

Yes we can close this.

> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
> shutdown_brokers_eat.py, shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-07-15 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-705.



> Controlled shutdown doesn't seem to work on more than one broker in a cluster
> -
>
> Key: KAFKA-705
> URL: https://issues.apache.org/jira/browse/KAFKA-705
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: bugs
> Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
> shutdown_brokers_eat.py, shutdown-command
>
>
> I wrote a script (attached here) to basically round robin through the brokers 
> in a cluster doing the following 2 operations on each of them -
> 1. Send the controlled shutdown admin command. If it succeeds
> 2. Restart the broker
> What I've observed is that only one broker is able to finish the above 
> successfully the first time around. For the rest of the iterations, no broker 
> is able to shutdown using the admin command and every single time it fails 
> with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-973) Messages From Producer Not being Partitioned

2013-07-15 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13709531#comment-13709531
 ] 

Joel Koshy commented on KAFKA-973:
--

Can you try sending more messages? The default partitioner is random so all the 
partitions should get messages (as long as you send enough messages - three 
messages ending up on one partition can happen).

> Messages From Producer Not being Partitioned 
> -
>
> Key: KAFKA-973
> URL: https://issues.apache.org/jira/browse/KAFKA-973
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8
> Environment: Linux
>Reporter: Subbu Srinivasan
>Assignee: Neha Narkhede
>  Labels: newbie
>
> I created a two node cluster.
> 2 zoo keepers
> 2 brokers
> 1 topic with replication factor (2) and no of partition 2.
> my consumer group has two threads
> 1) From my Java client - I send few  messages to the topic. I have set 
> multiple brokers
> kafka2:9092,kafka1:9092.
> Only one thread in my consumer always gets the messages. It looks like 
> producer is not
> partitioning the requests properly.
> 2) However if I send some sample using the simple console producer, I see 
> multiple threads getting
> requests and is load balanced.
> What am I doing wrong in my client?
> public class KafkaProducer {
> 
> private final Properties props = new Properties();
> private static AtomicLong counter = new AtomicLong(0);
> kafka.javaapi.producer.Producer producer = null;
> 
> public KafkaProducer() 
> {
>   props.put("serializer.class", "kafka.serializer.StringEncoder");
>   props.put("metadata.broker.list", 
> ConfigurationUtility.getKafkaHost());
>   producer = new kafka.javaapi.producer.Producer(new 
> ProducerConfig(props));
> } 
> 
> public void sendMessage(String msg) throws Exception
> {
> producer.send(new KeyedMessage String>(ConfigurationUtility.getTopicName(), msg));
> }   
> 
> 
> public static void main(String arg[]) throws Exception
> {
> 
> ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092");
>   ConfigurationUtility.setTopicName("dnslog");
>   
> ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181");
>   ConfigurationUtility.setConsumerGroupId("dnslog");
>   
>   for(int i = 0 ; i < 2 ; ++i)
>   {
>   (new 
> KafkaProducer()).sendMessage(UUID.randomUUID().toString());
>   }
> }
> }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13710571#comment-13710571
 ] 

Joel Koshy commented on KAFKA-957:
--

Thanks for incorporating 5 and 6. Couple additional comments:
- For the two match statements you have it is probably sufficient and
  clearer to just use if (key == null)  and if (props.contains(..))
- I'm not so sure if the trace is required but it could be useful. Would
  prefer the following format: "Sending message with key " - no need to
  show the payload. Also, may want to use java.util.Arrays.toString on the
  byte array.
- Per Jay's offline comments, hashCode in general is a bit unsafe to "rely".
  For e.g., it could be a non-uniform distribution or the underlying
  function could change. That said, your usage is safe. Still, it should be
  straightforward to do a custom hash function that we can rely on for
  consistency.


> MirrorMaker needs to preserve the key in the source cluster
> ---
>
> Key: KAFKA-957
> URL: https://issues.apache.org/jira/browse/KAFKA-957
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Guozhang Wang
> Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
> KAFKA-957.v2.patch
>
>
> Currently, MirrorMaker only propagates the message to the target cluster, but 
> not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-967) Use key range in ProducerPerformance

2013-07-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13710589#comment-13710589
 ] 

Joel Koshy commented on KAFKA-967:
--

+1 - thanks for the patch.

> Use key range in ProducerPerformance
> 
>
> Key: KAFKA-967
> URL: https://issues.apache.org/jira/browse/KAFKA-967
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Attachments: KAFKA-967.v1.patch, KAFKA-967.v2.patch
>
>
> Currently in ProducerPerformance, the key of the message is set to MessageID. 
> It would better to set it to a specific key within a key range (Integer type) 
> so that we can test the semantic partitioning case. This is related to 
> KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-347) change number of partitions of a topic online

2013-07-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13713045#comment-13713045
 ] 

Joel Koshy commented on KAFKA-347:
--

Thank you for the patch. Couple of comments, all very minor:

AddPartitionsCommand:
- IMO it is more intuitive for the option to be: "total partitions desired"
  as opposed to "num partitions to add"
- It is a bit odd that we can allow some partitions with a different
  replication factor from what's already there. I don't see any issues with
  it though. I just think it's a bit odd. One potential issue is if
  producers explicitly want to set acks to say 3 when there are some
  partitions with a replication factor of 2 and some with 3 (However,
  producers really should be using -1 in which case it would be fine).
- I think the command can currently allow an unintentional reassignment of
  replicas since the persistent path is always updated. (or no?) I think
  this can be easily checked for and avoided.
- Apart from start partition id I think getManualReplicaAssignment is
  identical to CreateTopicCommand's - maybe that code can be moved into
  AdminUtils?

KafkaController:
- nitpick: ZkUtils.getAllTopics(zkClient).foreach(p =>
  partitionStateMachine.registerPartitionChangeListener(p)) (can you change
  p to t :) - p really looks like a partition but it is a topic )

AdminUtils:
- the //"for testing only" comment is now misplaced.
- This code is pre-existing, but would prefer changing secondReplicaShift to
  nextReplicaShift.

- Any reason why AddPartitionsTest should not be within AdminTest?
- Finally, can you rebase? Sorry for not getting to this patch sooner :(



> change number of partitions of a topic online
> -
>
> Key: KAFKA-347
> URL: https://issues.apache.org/jira/browse/KAFKA-347
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Sriram Subramanian
>  Labels: features
> Fix For: 0.8.1
>
> Attachments: kafka-347.patch
>
>
> We will need an admin tool to change the number of partitions of a topic 
> online.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-347) change number of partitions of a topic online

2013-07-19 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13714259#comment-13714259
 ] 

Joel Koshy commented on KAFKA-347:
--

Thanks for patch v2. I'm +1 on this as is, but if you can address some of these 
minor comments that would be great.

v2.1 - For "num partitions to add" vs "partitions desired" - all I meant was 
that most of the time users would think
of "desired number of partitions" vs "how many more to add". E.g., I have eight 
partitions for a topic, I now want
20 instead. It is more convenient to just say I want "20" partitions instead of 
thinking of how many to add. OTOH since
we don't support reducing partitions treating it as a "num partitions to add" 
is safer. So I don't feel very strongly
about it either way.

v2.2 - Re: unintentional reassignment of partitions. Yes you are right.

v2.3 - Your patch still has ZookeeperConsumerConnector changes in it, so it did 
not apply cleanly.

v2.4 - On checking the replication factor: if we don't allow having a different 
replication factor for the new partitions
we should not even expose it as an option.

v2.5 - AddPartitionsListener: no need to change it now, but just a comment: we 
can directly parse the replica assignment
from the data object (instead of reading from zookeeper again) right?

v2.6 - On moving getManualReplicaAssignment to AdminUtils - I think it would be 
good to do that here, but either way is
fine.


> change number of partitions of a topic online
> -
>
> Key: KAFKA-347
> URL: https://issues.apache.org/jira/browse/KAFKA-347
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Sriram Subramanian
>  Labels: features
> Fix For: 0.8.1
>
> Attachments: kafka-347.patch, kafka-347-v2.patch
>
>
> We will need an admin tool to change the number of partitions of a topic 
> online.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-982) Logo for Kafka

2013-07-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13716285#comment-13716285
 ] 

Joel Koshy commented on KAFKA-982:
--

+1 for 298, and I like Jakob's recursive suggestion as well (can you add 
feedback on that on the 99designs contest?).

294 seems interesting/deep (pen, two k's, I kind of see a person with hands 
raised, etc.), but I just prefer 298 wrt overall appearance. 296 is also good, 
but between 296 and 298 I prefer 298.


> Logo for Kafka
> --
>
> Key: KAFKA-982
> URL: https://issues.apache.org/jira/browse/KAFKA-982
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Attachments: 289.jpeg, 294.jpeg, 296.png, 298.jpeg
>
>
> We should have a logo for kafka.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-925) Add optional partition key override in producer

2013-07-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13716593#comment-13716593
 ] 

Joel Koshy commented on KAFKA-925:
--

+1 , looks good to me.

DefaultPartitioner: Do we need the type parameter anymore?

Guozhang has a good point about tools such as mirror maker not having access to 
the original partitioning key.
However, I can see that it would be clunky as we would then need a partition 
key serializer as well. Also,
for something like offset-preserving mirrors we would anyway have the source 
cluster's partition available,
so I don't see it as a major issue.

ConsoleProducer: the enqueue timeout change seems reasonable - I'm assuming it 
was done to avoid dropping
messages when piping into ConsoleProducer. Correct?


> Add optional partition key override in producer
> ---
>
> Key: KAFKA-925
> URL: https://issues.apache.org/jira/browse/KAFKA-925
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Affects Versions: 0.8.1
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-925-v1.patch, KAFKA-925-v2.patch
>
>
> We have a key that is used for partitioning in the producer and stored with 
> the message. Actually these uses, though often the same, could be different. 
> The two meanings are effectively:
> 1. Assignment to a partition
> 2. Deduplication within a partition
> In cases where we want to allow the client to take advantage of both of these 
> and they aren't the same it would be nice to allow them to be specified 
> separately.
> To implement this I added an optional partition key to KeyedMessage. When 
> specified this key is used for partitioning rather than the message key. This 
> key is of type Any and the parametric typing is removed from the partitioner 
> to allow it to work with either key.
> An alternative would be to allow the partition id to specified in the 
> KeyedMessage. This would be slightly more convenient in the case where there 
> is no partition key but instead you know a priori the partition number--this 
> case must be handled by giving the partition id as the partition key and 
> using an identity partitioner which is slightly more roundabout. However this 
> is inconsistent with the normal partitioning which requires a key in the case 
> where the partition is determined by a key--in that case you would be 
> manually calling your partitioner in user code. It seems best to me to either 
> use a key or always a partition and since we currently take a key I stuck 
> with that.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-957:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Committed to 0.8

> MirrorMaker needs to preserve the key in the source cluster
> ---
>
> Key: KAFKA-957
> URL: https://issues.apache.org/jira/browse/KAFKA-957
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Guozhang Wang
> Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
> KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch
>
>
> Currently, MirrorMaker only propagates the message to the target cluster, but 
> not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-957.



> MirrorMaker needs to preserve the key in the source cluster
> ---
>
> Key: KAFKA-957
> URL: https://issues.apache.org/jira/browse/KAFKA-957
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Guozhang Wang
> Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
> KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch
>
>
> Currently, MirrorMaker only propagates the message to the target cluster, but 
> not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13718690#comment-13718690
 ] 

Joel Koshy commented on KAFKA-957:
--

+1

> MirrorMaker needs to preserve the key in the source cluster
> ---
>
> Key: KAFKA-957
> URL: https://issues.apache.org/jira/browse/KAFKA-957
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Guozhang Wang
> Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
> KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch
>
>
> Currently, MirrorMaker only propagates the message to the target cluster, but 
> not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-983) Expose cleanshutdown method in MirrorMaker at the object level

2013-07-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13719121#comment-13719121
 ] 

Joel Koshy commented on KAFKA-983:
--

+1 - can you rebase? Also, may be better to have an if null check in the 
shutdown statements.

> Expose cleanshutdown method in MirrorMaker at the object level
> --
>
> Key: KAFKA-983
> URL: https://issues.apache.org/jira/browse/KAFKA-983
> Project: Kafka
>  Issue Type: Bug
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>  Labels: bugs
> Attachments: KAFKA-983.patch
>
>
> Making clean shutdown in MirrorMaker public at the object level will be 
> useful. Currently if MirrorMaker is run in a container process, the only way 
> to stop it seems to be triggering the shutdown hook (System.exit(0)) which 
> may have unwarranted side effects on the other threads running in that 
> container process. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-983) Expose cleanshutdown method in MirrorMaker at the object level

2013-07-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-983:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Expose cleanshutdown method in MirrorMaker at the object level
> --
>
> Key: KAFKA-983
> URL: https://issues.apache.org/jira/browse/KAFKA-983
> Project: Kafka
>  Issue Type: Bug
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>  Labels: bugs
> Attachments: KAFKA-983.patch, KAFKA-983-rebased.patch
>
>
> Making clean shutdown in MirrorMaker public at the object level will be 
> useful. Currently if MirrorMaker is run in a container process, the only way 
> to stop it seems to be triggering the shutdown hook (System.exit(0)) which 
> may have unwarranted side effects on the other threads running in that 
> container process. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-983) Expose cleanshutdown method in MirrorMaker at the object level

2013-07-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-983.



Thanks for the patch. Committed to 0.8

> Expose cleanshutdown method in MirrorMaker at the object level
> --
>
> Key: KAFKA-983
> URL: https://issues.apache.org/jira/browse/KAFKA-983
> Project: Kafka
>  Issue Type: Bug
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>  Labels: bugs
> Attachments: KAFKA-983.patch, KAFKA-983-rebased.patch
>
>
> Making clean shutdown in MirrorMaker public at the object level will be 
> useful. Currently if MirrorMaker is run in a container process, the only way 
> to stop it seems to be triggering the shutdown hook (System.exit(0)) which 
> may have unwarranted side effects on the other threads running in that 
> container process. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13726958#comment-13726958
 ] 

Joel Koshy commented on KAFKA-991:
--

+1

Committed to 0.8

Minor comment:
- queue size is unintuitive. sounds like number of messages, but it is bytes
- The totalSize > queueSize check should ideally be done before adding it to 
msgList.

> Reduce the queue size in hadoop producer
> 
>
> Key: KAFKA-991
> URL: https://issues.apache.org/jira/browse/KAFKA-991
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-991-v1.patch
>
>
> Currently the queue.size in hadoop producer is 10MB. This means that the 
> KafkaRecordWriter will hit the send button on kafka producer after the size 
> of uncompressed queued messages becomes greater than 10MB. (The other 
> condition on which the messages are sent is if their number exceeds 
> SHORT.MAX_VALUE).
> Considering that the server accepts a (compressed) batch of messages of 
> sizeupto 1 million bytes minus the log overhead, we should probably reduce 
> the queue size in hadoop producer. We should do two things:
> 1. change max message size on the broker to 1 million + log overhead, because 
> that will make the client message size easy to remember. Right now the 
> maximum number of bytes that can be accepted from a client in a batch of 
> messages is an awkward 88. (I don't have a stronger reason). We have set 
> fetch size on the consumer to 1MB, this gives us a lot of room even if the 
> log overhead increased with further versions.
> 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
> Anyone who wants higher throughput can override this config using 
> kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13727092#comment-13727092
 ] 

Joel Koshy commented on KAFKA-991:
--

Thanks for the follow-up patch. totalBytes is set to zero in sendMsgList so the 
next batch totalBytes will less (incorrect) by valBytes.

> Reduce the queue size in hadoop producer
> 
>
> Key: KAFKA-991
> URL: https://issues.apache.org/jira/browse/KAFKA-991
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-991-followup.patch, kafka-991-followup-v2.patch, 
> kafka-991-v1.patch
>
>
> Currently the queue.size in hadoop producer is 10MB. This means that the 
> KafkaRecordWriter will hit the send button on kafka producer after the size 
> of uncompressed queued messages becomes greater than 10MB. (The other 
> condition on which the messages are sent is if their number exceeds 
> SHORT.MAX_VALUE).
> Considering that the server accepts a (compressed) batch of messages of 
> sizeupto 1 million bytes minus the log overhead, we should probably reduce 
> the queue size in hadoop producer. We should do two things:
> 1. change max message size on the broker to 1 million + log overhead, because 
> that will make the client message size easy to remember. Right now the 
> maximum number of bytes that can be accepted from a client in a batch of 
> messages is an awkward 88. (I don't have a stronger reason). We have set 
> fetch size on the consumer to 1MB, this gives us a lot of room even if the 
> log overhead increased with further versions.
> 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
> Anyone who wants higher throughput can override this config using 
> kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-991:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Reduce the queue size in hadoop producer
> 
>
> Key: KAFKA-991
> URL: https://issues.apache.org/jira/browse/KAFKA-991
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, 
> kafka-991-followup-v2.patch, kafka-991-v1.patch
>
>
> Currently the queue.size in hadoop producer is 10MB. This means that the 
> KafkaRecordWriter will hit the send button on kafka producer after the size 
> of uncompressed queued messages becomes greater than 10MB. (The other 
> condition on which the messages are sent is if their number exceeds 
> SHORT.MAX_VALUE).
> Considering that the server accepts a (compressed) batch of messages of 
> sizeupto 1 million bytes minus the log overhead, we should probably reduce 
> the queue size in hadoop producer. We should do two things:
> 1. change max message size on the broker to 1 million + log overhead, because 
> that will make the client message size easy to remember. Right now the 
> maximum number of bytes that can be accepted from a client in a batch of 
> messages is an awkward 88. (I don't have a stronger reason). We have set 
> fetch size on the consumer to 1MB, this gives us a lot of room even if the 
> log overhead increased with further versions.
> 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
> Anyone who wants higher throughput can override this config using 
> kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-991.



+1

Committed to 0.8

> Reduce the queue size in hadoop producer
> 
>
> Key: KAFKA-991
> URL: https://issues.apache.org/jira/browse/KAFKA-991
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>  Labels: bugs
> Fix For: 0.8
>
> Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, 
> kafka-991-followup-v2.patch, kafka-991-v1.patch
>
>
> Currently the queue.size in hadoop producer is 10MB. This means that the 
> KafkaRecordWriter will hit the send button on kafka producer after the size 
> of uncompressed queued messages becomes greater than 10MB. (The other 
> condition on which the messages are sent is if their number exceeds 
> SHORT.MAX_VALUE).
> Considering that the server accepts a (compressed) batch of messages of 
> sizeupto 1 million bytes minus the log overhead, we should probably reduce 
> the queue size in hadoop producer. We should do two things:
> 1. change max message size on the broker to 1 million + log overhead, because 
> that will make the client message size easy to remember. Right now the 
> maximum number of bytes that can be accepted from a client in a batch of 
> messages is an awkward 88. (I don't have a stronger reason). We have set 
> fetch size on the consumer to 1MB, this gives us a lot of room even if the 
> log overhead increased with further versions.
> 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
> Anyone who wants higher throughput can override this config using 
> kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-984) Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same

2013-08-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728209#comment-13728209
 ] 

Joel Koshy commented on KAFKA-984:
--

Thanks for the patch - this will help a *lot* especially for mirroring.
However, I share Jun's concern about making such a non-trivial change to
0.8. In any event, here are some comments on
scala.kafka.consumer.ZookeeperConsumerConnector

- We should definitely abstract out the common code - syncedPartialRebalance
  and WildcardStreamsHandler. I think with some thought we can refactor it
  or we end up with copies of relatively complex code.
- The filters on lines 432/433 will not have any effect (I think) since the
  maps are immutable. You should probably apply the filter on assignments on
  lines 428/429. So metadata for other topics will be fetched unnecessarily,
  and fetchers for other topics may be stopped unnecessarily.
- Also, there are topic variables inside the method that shadow the
  parameter which makes it harder to determine which variable is in effect
  for which scope.
- Logging can be improved/made more concise: few typos and inconsistencies
  in capitalization.
- Why do this only if # added topics == 1? Can accept a list of topics to
  rebalance for instead right? I do see your note on Sriram's comments, but
  I don't see it in this jira. Can you include those comments?



> Avoid a full rebalance in cases when a new topic is discovered but 
> container/broker set stay the same
> -
>
> Key: KAFKA-984
> URL: https://issues.apache.org/jira/browse/KAFKA-984
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.8
>
> Attachments: KAFKA-984.v1.patch, KAFKA-984.v2.patch, 
> KAFKA-984.v2.patch
>
>
> Currently a full rebalance will be triggered on high level consumers even 
> when just a new topic is added to ZK. Better avoid this behavior but only 
> rebalance on this newly added topic.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13728247#comment-13728247
 ] 

Joel Koshy commented on KAFKA-915:
--

+1 on the patch. I actually could not reproduce the other failures, so I'll 
check this in.



_test_case_name  :  testcase_5001
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  false
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  1
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  500
 Unique messages from producer on [test_1]  :  500
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED



_test_case_name  :  testcase_5002
_test_class_name  :  MirrorMakerTest
validation_status  :



_test_case_name  :  testcase_5003
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  true
arg : bounced_entity_downtime_sec  :  30
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  1
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  2200
 Unique messages from producer on [test_1]  :  2200
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED



_test_case_name  :  testcase_5004
_test_class_name  :  MirrorMakerTest
validation_status  :



_test_case_name  :  testcase_5005
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  true
arg : bounced_entity_downtime_sec  :  30
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  2
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  1400
 Unique messages from consumer on [test_2]  :  1400
 Unique messages from producer on [test_1]  :  1400
 Unique messages from producer on [test_2]  :  1400
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for data matched on topic [test_2]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED




> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-915.



> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-915.
--

Resolution: Fixed

> System Test - Mirror Maker testcase_5001 failed
> ---
>
> Key: KAFKA-915
> URL: https://issues.apache.org/jira/browse/KAFKA-915
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Fung
>Assignee: Joel Koshy
>Priority: Critical
>  Labels: kafka-0.8, replication-testing
> Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz
>
>
> This case passes if brokers are set to partition = 1, replicas = 1
> It fails if brokers are set to partition = 5, replicas = 3 (consistently 
> reproducible)
> This test case is set up as shown below.
> 1. Start 2 ZK as a cluster in Source
> 2. Start 2 ZK as a cluster in Target
> 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
> 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
> 5. Start 1 MM
> 6. Start ProducerPerformance to send some data
> 7. After Producer is done, start ConsoleConsumer to consume data
> 8. Stop all processes and validate if there is any data loss.
> 9. No failure is introduced to any process in this test
> Attached a tar file which contains the logs and system test output for both 
> cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-998) Producer should not retry on non-recoverable error codes

2013-08-02 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-998:


 Summary: Producer should not retry on non-recoverable error codes
 Key: KAFKA-998
 URL: https://issues.apache.org/jira/browse/KAFKA-998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Joel Koshy


Based on a discussion with Guozhang. The producer currently retries on all 
error codes (including messagesizetoolarge which is pointless to retry on). 
This can slow down the producer unnecessarily.

If at all we want to retry on that error code we would need to retry with a 
smaller batch size, but that's a separate discussion.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13731187#comment-13731187
 ] 

Joel Koshy commented on KAFKA-992:
--

Delayed review - looks good to me, although I still don't see a benefit in  


storing the timestamp. i.e., the approach to retry on nodeexists if the host


and port are the same would remain the same. i.e., it seems more for


informative purposes. Let me know if I'm missing something. 





@Jun, you have a point about the controller. It seems it may not be a   


problem there since controller re-election will happen only after the data  


is actually deleted. For consumers it may not be an issue either given that 


the consumer id string includes a random uuid.  



> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
> KAFKA-992.v3.patch, KAFKA-992.v4.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13731282#comment-13731282
 ] 

Joel Koshy commented on KAFKA-992:
--

ok nm the comment about timestamp. I had forgotten that nodeexists wouldn't be 
thrown if the data is the same.

> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
> KAFKA-992.v3.patch, KAFKA-992.v4.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13731337#comment-13731337
 ] 

Joel Koshy commented on KAFKA-992:
--

and nm for my comments about controller/consumers as well. For consumers, we
don't regenerate the consumer id string.

For controller, what can end up happening is:
- controller session expires and becomes the controller again (with the
  stale ephemeral node)
- another broker (whose session may not have expired) receives a watch when the
  stale ephemeral node is actually deleted
- so we can end up with two controllers in this scenario.



> Double Check on Broker Registration to Avoid False NodeExist Exception
> --
>
> Key: KAFKA-992
> URL: https://issues.apache.org/jira/browse/KAFKA-992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: Guozhang Wang
> Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
> KAFKA-992.v3.patch, KAFKA-992.v4.patch
>
>
> The current behavior of zookeeper for ephemeral nodes is that session 
> expiration and ephemeral node deletion is not an atomic operation. 
> The side-effect of the above zookeeper behavior in Kafka, for certain corner 
> cases, is that ephemeral nodes can be lost even if the session is not 
> expired. The sequence of events that can lead to lossy ephemeral nodes is as 
> follows -
> 1. The session expires on the client, it assumes the ephemeral nodes are 
> deleted, so it establishes a new session with zookeeper and tries to 
> re-create the ephemeral nodes. 
> 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
> back a NodeExists error code. Now this is legitimate during a session 
> disconnect event (since zkclient automatically retries the
> operation and raises a NodeExists error). Also by design, Kafka server 
> doesn't have multiple zookeeper clients create the same ephemeral node, so 
> Kafka server assumes the NodeExists is normal. 
> 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
> from the client's perspective, even though the client has a new valid 
> session, its ephemeral node is gone.
> This behavior is triggered due to very long fsync operations on the zookeeper 
> leader. When the leader wakes up from such a long fsync operation, it has 
> several sessions to expire. And the time between the session expiration and 
> the ephemeral node deletion is magnified. Between these 2 operations, a 
> zookeeper client can issue a ephemeral node creation operation, that could've 
> appeared to have succeeded, but the leader later deletes the ephemeral node 
> leading to permanent ephemeral node loss from the client's perspective. 
> Thread from zookeeper mailing list: 
> http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-08 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13733837#comment-13733837
 ] 

Joel Koshy commented on KAFKA-990:
--

- Topics to move json file format seems unnecessarily complicated. Why not just 
a JSON array?
- Use CommandLineUtils.checkRequiredArgs
- May be helpful to also print out the existing partition assignment and the 
final assignment.
- "dryrun" to "dry-run" which I think is the spelling unix tools like patch 
tend to use.
- line 88: use head instead of assuming 0 exists (start partition id could be 
!= 0)

I did not finish going through all the changes in controller, but thought I 
would put in my comments so far :)


> Fix ReassignPartitionCommand and improve usability
> --
>
> Key: KAFKA-990
> URL: https://issues.apache.org/jira/browse/KAFKA-990
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
> Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch
>
>
> 1. The tool does not register for IsrChangeListener on controller failover.
> 2. There is a race condition where the previous listener can fire on 
> controller failover and the replicas can be in ISR. Even after re-registering 
> the ISR listener after failover, it will never be triggered.
> 3. The input the tool is a static list which is very hard to use. To improve 
> this, as a first step the tool needs to take a list of topics and list of 
> brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13735120#comment-13735120
 ] 

Joel Koshy commented on KAFKA-990:
--

Can you elaborate on the change to shutdownBroker in KafkaController? I
think we need to include shutting down brokers because the previous shutdown
attempt may have been incomplete due to no other brokers in ISR for some
partition which would have prevented leader movement. Subsequent attempts
would now be rejected.

Good catches on the controller failover. Agree with Neha that #2 is not a
problem for replicas that are in ISR, however, we do need to re-register the
ISR change listener for those replicas that are in ISR.

Finally, we should probably open a separate jira to implement a feature to
cancel an ongoing reassignment given that it is a long-running operation.
The dry-run option reduces the need for this but nevertheless I think it's a
good feature to support in the future.



> Fix ReassignPartitionCommand and improve usability
> --
>
> Key: KAFKA-990
> URL: https://issues.apache.org/jira/browse/KAFKA-990
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
> Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch
>
>
> 1. The tool does not register for IsrChangeListener on controller failover.
> 2. There is a race condition where the previous listener can fire on 
> controller failover and the replicas can be in ISR. Even after re-registering 
> the ISR listener after failover, it will never be triggered.
> 3. The input the tool is a static list which is very hard to use. To improve 
> this, as a first step the tool needs to take a list of topics and list of 
> brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13735653#comment-13735653
 ] 

Joel Koshy commented on KAFKA-990:
--

Looks like I might have looked at the wrong patch. I'll review this again this 
weekend.

> Fix ReassignPartitionCommand and improve usability
> --
>
> Key: KAFKA-990
> URL: https://issues.apache.org/jira/browse/KAFKA-990
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
> Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch
>
>
> 1. The tool does not register for IsrChangeListener on controller failover.
> 2. There is a race condition where the previous listener can fire on 
> controller failover and the replicas can be in ISR. Even after re-registering 
> the ISR listener after failover, it will never be triggered.
> 3. The input the tool is a static list which is very hard to use. To improve 
> this, as a first step the tool needs to take a list of topics and list of 
> brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13737010#comment-13737010
 ] 

Joel Koshy commented on KAFKA-990:
--

The rebased patch looks good - the shutdown changes I was referring to were in 
v1.

+1 on the rebased patch - we can fix the minor comments either on check-in or 
in a separate jira.

> Fix ReassignPartitionCommand and improve usability
> --
>
> Key: KAFKA-990
> URL: https://issues.apache.org/jira/browse/KAFKA-990
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram Subramanian
>Assignee: Sriram Subramanian
> Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch
>
>
> 1. The tool does not register for IsrChangeListener on controller failover.
> 2. There is a race condition where the previous listener can fire on 
> controller failover and the replicas can be in ISR. Even after re-registering 
> the ISR listener after failover, it will never be triggered.
> 3. The input the tool is a static list which is very hard to use. To improve 
> this, as a first step the tool needs to take a list of topics and list of 
> brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


<    3   4   5   6   7   8   9   >