Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

2015-02-23 Thread Jay Kreps
I handle this by synchronizing flush()--only one thread can be flushing at
a time. This isn't too much of a drawback since the first flush will drain
everything anyway the second flush likely won't do too much, so sequencing
them shouldn't hurt too much.

However your idea of using a counter may actually be better and could
possibly remove the synchronization entirely. Let's both think that through
and see if we can think of any corner cases. If not I'll change to that.

-Jay

On Mon, Feb 23, 2015 at 8:46 PM, Jiangjie Qin  wrote:

>
>
> > On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote:
> > > LGTM. Thanks, Jay.
> > > I actually tried just putting a synchronized block around the line
> where we copy the imcomplete set and it seems worked. Maybe we can do that
> if you prefer less code.
> >
> > Jay Kreps wrote:
> > I think that depends on the lock the add/remove uses in the
> internals of Collections.syncronizedSet which could vary by JVM and
> version. I also think that whenever possible ad hoc synchronization should
> be encapsulated in a small class rather than sprinkled here and there in a
> larger class just so it is easy to verify correctness, even when that is
> slightly more code.
>
> Makes sense. It just occurred to me that current approach might causing a
> flush() wait up to linger.ms.
>
> Imagine there are two threads and with the following sequence:
> 1. thread 1 call flush
> 2. accumulator.flushing = true
> 3. sender thread woke up and did one drain.
> 4. thread 1 started wating on callback 1
> 5. thread 2 call send and followed by a flush
> 6. sender thread finished callback 1 and thread 1 set flushing to false.
> 7. sender thread will not be able to continue to honor the flush for
> thread 2 because flushing flag has been turned off.
> The message sent by thread 2 in step 5 will sitting in accumulator for
> linger.ms and thread 2 will be blocked.
>
> I think we can make the flushing to be an atomic interger instead of
> boolean, so each thread just increment it when begins flush and decrement
> it after flush finishes. As long as flushing > 0 the accumulator should
> flush the data.
>
>
> - Jiangjie
>
>
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/#review73749
> ---
>
>
> On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> >
> > ---
> > This is an automatically generated e-mail. To reply, visit:
> > https://reviews.apache.org/r/30763/
> > ---
> >
> > (Updated Feb. 24, 2015, 2:31 a.m.)
> >
> >
> > Review request for kafka.
> >
> >
> > Bugs: KAFKA-1865
> > https://issues.apache.org/jira/browse/KAFKA-1865
> >
> >
> > Repository: kafka
> >
> >
> > Description
> > ---
> >
> > KAFKA-1865 Add a flush() method to the producer.
> >
> >
> > Diffs
> > -
> >
> >   clients/src/main/java/org/apache/kafka/clients/Metadata.java
> e8afecda956303a6ee116499fd443a54c018e17d
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> 1fd6917c8a5131254c740abad7f7228a47e3628c
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
> 84530f2b948f9abd74203db48707e490dd9c81a5
> >   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
> 17fe541588d462c68c33f6209717cc4015e9b62f
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9
> >
>  
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
> 4a2da41f47994f778109e3c4107ffd90195f0bae
> >
>  
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
> ecfe2144d778a5d9b614df5278b9f0a15637f10b
> >
>  
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
> dd0af8aee98abed5d4a0dc50989e37888bb353fe
> >
>  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
> PRE-CREATION
> >   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
> 4ae43ed47e31ad8052b4348a731da11120968508
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
> 75513b0bdd439329c5771d87436ef83fda853bfb
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
> 29c8417422c0cf0d29bf2405c77fd05e35350259
> >
>  
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
> 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
> 558942aaecd1b9f7098435d39aa4b

Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

2015-02-23 Thread Jiangjie Qin


> On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote:
> > LGTM. Thanks, Jay.
> > I actually tried just putting a synchronized block around the line where we 
> > copy the imcomplete set and it seems worked. Maybe we can do that if you 
> > prefer less code.
> 
> Jay Kreps wrote:
> I think that depends on the lock the add/remove uses in the internals of 
> Collections.syncronizedSet which could vary by JVM and version. I also think 
> that whenever possible ad hoc synchronization should be encapsulated in a 
> small class rather than sprinkled here and there in a larger class just so it 
> is easy to verify correctness, even when that is slightly more code.

Makes sense. It just occurred to me that current approach might causing a 
flush() wait up to linger.ms.

Imagine there are two threads and with the following sequence:
1. thread 1 call flush
2. accumulator.flushing = true
3. sender thread woke up and did one drain.
4. thread 1 started wating on callback 1
5. thread 2 call send and followed by a flush
6. sender thread finished callback 1 and thread 1 set flushing to false.
7. sender thread will not be able to continue to honor the flush for thread 2 
because flushing flag has been turned off.
The message sent by thread 2 in step 5 will sitting in accumulator for 
linger.ms and thread 2 will be blocked.

I think we can make the flushing to be an atomic interger instead of boolean, 
so each thread just increment it when begins flush and decrement it after flush 
finishes. As long as flushing > 0 the accumulator should flush the data.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73749
---


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 
> 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I 
> could verify everything else passes and (2) moves some unit tests I found 
> that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kre

[jira] [Updated] (KAFKA-1757) Can not delete Topic index on Windows

2015-02-23 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1757:
--
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Can not delete Topic index on Windows
> -
>
> Key: KAFKA-1757
> URL: https://issues.apache.org/jira/browse/KAFKA-1757
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.0
>Reporter: Lukáš Vyhlídka
>Assignee: Sriharsha Chintalapani
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-1757.patch, lucky-v.patch
>
>
> When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to 
> delete the Topic throwed an error:
> ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; 
> Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; 
> ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] 
> (kafka.server.KafkaApis)
> kafka.common.KafkaStorageException: Delete of index 
> .index failed.
> at kafka.log.LogSegment.delete(LogSegment.scala:283)
> at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608)
> at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.log.Log.delete(Log.scala:608)
> at kafka.log.LogManager.deleteLog(LogManager.scala:375)
> at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144)
> at 
> kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139)
> at 
> kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
> at kafka.cluster.Partition.delete(Partition.scala:139)
> at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158)
> at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191)
> at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190)
> at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> at java.lang.Thread.run(Thread.java:744)
> When I have investigated the issue I figured out that the index file (in my 
> environment it was 
> C:\tmp\kafka-logs\----0014-0\.index)
>  was locked by the kafka process and the OS did not allow to delete that file.
> I tried to fix the problem in source codes and when I added close() method 
> call into LogSegment.delete(), the Topic deletion started to work.
> I will add here (not sure how to upload the file during issue creation) a 
> diff with the changes I have made so You can take a look on that whether it 
> is reasonable or not. It would be perfect if it could make it into the 
> product...
> In the end I would like to say that on Linux the deletion works just fine...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1757) Can not delete Topic index on Windows

2015-02-23 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334389#comment-14334389
 ] 

Sriharsha Chintalapani commented on KAFKA-1757:
---

patch merged by [~jkreps] closing this as fixed.

> Can not delete Topic index on Windows
> -
>
> Key: KAFKA-1757
> URL: https://issues.apache.org/jira/browse/KAFKA-1757
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.0
>Reporter: Lukáš Vyhlídka
>Assignee: Sriharsha Chintalapani
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-1757.patch, lucky-v.patch
>
>
> When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to 
> delete the Topic throwed an error:
> ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; 
> Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; 
> ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] 
> (kafka.server.KafkaApis)
> kafka.common.KafkaStorageException: Delete of index 
> .index failed.
> at kafka.log.LogSegment.delete(LogSegment.scala:283)
> at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608)
> at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.log.Log.delete(Log.scala:608)
> at kafka.log.LogManager.deleteLog(LogManager.scala:375)
> at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144)
> at 
> kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139)
> at 
> kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
> at kafka.cluster.Partition.delete(Partition.scala:139)
> at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158)
> at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191)
> at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190)
> at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> at java.lang.Thread.run(Thread.java:744)
> When I have investigated the issue I figured out that the index file (in my 
> environment it was 
> C:\tmp\kafka-logs\----0014-0\.index)
>  was locked by the kafka process and the OS did not allow to delete that file.
> I tried to fix the problem in source codes and when I added close() method 
> call into LogSegment.delete(), the Topic deletion started to work.
> I will add here (not sure how to upload the file during issue creation) a 
> diff with the changes I have made so You can take a look on that whether it 
> is reasonable or not. It would be perfect if it could make it into the 
> product...
> In the end I would like to say that on Linux the deletion works just fine...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

2015-02-23 Thread Jay Kreps


> On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote:
> > LGTM. Thanks, Jay.
> > I actually tried just putting a synchronized block around the line where we 
> > copy the imcomplete set and it seems worked. Maybe we can do that if you 
> > prefer less code.

I think that depends on the lock the add/remove uses in the internals of 
Collections.syncronizedSet which could vary by JVM and version. I also think 
that whenever possible ad hoc synchronization should be encapsulated in a small 
class rather than sprinkled here and there in a larger class just so it is easy 
to verify correctness, even when that is slightly more code.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73749
---


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 
> 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I 
> could verify everything else passes and (2) moves some unit tests I found 
> that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

2015-02-23 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73749
---

Ship it!


LGTM. Thanks, Jay.
I actually tried just putting a synchronized block around the line where we 
copy the imcomplete set and it seems worked. Maybe we can do that if you prefer 
less code.

- Jiangjie Qin


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 
> 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I 
> could verify everything else passes and (2) moves some unit tests I found 
> that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

2015-02-23 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
---

(Updated Feb. 24, 2015, 2:31 a.m.)


Review request for kafka.


Bugs: KAFKA-1865
https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
---

KAFKA-1865 Add a flush() method to the producer.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
 4a2da41f47994f778109e3c4107ffd90195f0bae 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 
29c8417422c0cf0d29bf2405c77fd05e35350259 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing (updated)
---

New patch addresses feedback. Also (1) comments out the consumer tests so I 
could verify everything else passes and (2) moves some unit tests I found that 
were in the wrong packages.


Thanks,

Jay Kreps



[jira] [Updated] (KAFKA-1865) Add a flush() call to the new producer API

2015-02-23 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1865:
-
Attachment: KAFKA-1865_2015-02-23_18:29:16.patch

> Add a flush() call to the new producer API
> --
>
> Key: KAFKA-1865
> URL: https://issues.apache.org/jira/browse/KAFKA-1865
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1865.patch, KAFKA-1865_2015-02-21_15:36:54.patch, 
> KAFKA-1865_2015-02-22_16:26:46.patch, KAFKA-1865_2015-02-23_18:29:16.patch
>
>
> The postconditions of this would be that any record enqueued prior to flush() 
> would have completed being sent (either successfully or not).
> An open question is whether you can continue sending new records while this 
> call is executing (on other threads).
> We should only do this if it doesn't add inefficiencies for people who don't 
> use it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1865) Add a flush() call to the new producer API

2015-02-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334281#comment-14334281
 ] 

Jay Kreps commented on KAFKA-1865:
--

Updated reviewboard https://reviews.apache.org/r/30763/diff/
 against branch trunk

> Add a flush() call to the new producer API
> --
>
> Key: KAFKA-1865
> URL: https://issues.apache.org/jira/browse/KAFKA-1865
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1865.patch, KAFKA-1865_2015-02-21_15:36:54.patch, 
> KAFKA-1865_2015-02-22_16:26:46.patch, KAFKA-1865_2015-02-23_18:29:16.patch
>
>
> The postconditions of this would be that any record enqueued prior to flush() 
> would have completed being sent (either successfully or not).
> An open question is whether you can continue sending new records while this 
> call is executing (on other threads).
> We should only do this if it doesn't add inefficiencies for people who don't 
> use it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

2015-02-23 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
---

(Updated Feb. 24, 2015, 2:29 a.m.)


Review request for kafka.


Summary (updated)
-

KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, 
try again.


Bugs: KAFKA-1865
https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
---

KAFKA-1865 Add a flush() method to the producer.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
 4a2da41f47994f778109e3c4107ffd90195f0bae 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 
29c8417422c0cf0d29bf2405c77fd05e35350259 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
---


Thanks,

Jay Kreps



Re: Review Request 30763: Second attempt at flush()

2015-02-23 Thread Jay Kreps


> On Feb. 23, 2015, 5 a.m., Jiangjie Qin wrote:
> > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java,
> >  line 210
> > 
> >
> > It probably does not matter, but here we are only sending 10 messages 
> > which can be put into one batch. Should we test the case where accumulator 
> > has more than one batch for a partition?

Fair point.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73505
---


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



Re: Review Request 30763: Second attempt at flush()

2015-02-23 Thread Jay Kreps


> On Feb. 23, 2015, 8:53 p.m., Jiangjie Qin wrote:
> > Hi Jay, I applied the patch and tried to run it in our test environment. I 
> > got this exception:
> > 
> > java.util.ConcurrentModificationException
> > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
> > at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
> > at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> > at java.util.HashSet.(HashSet.java:119)
> > at 
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:348)
> > at 
> > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:498)
> > at 
> > kafka.tools.MirrorMaker$MirrorMakerProducer.flush(MirrorMaker.scala:373)
> > at 
> > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:314)
> > 
> > It looks that we should synchronize on the access to incomplete. i.e. when 
> > one thread is making a copy of imcomplete set, other thread should not add 
> > batches into it.
> 
> Jiangjie Qin wrote:
> From the java doc of Collection.synchronizedSet()
> 
> It is imperative that the user manually synchronize on the returned set 
> when iterating over it:
> Set s = Collections.synchronizedSet(new HashSet());
> ...
> synchronized (s) {
> Iterator i = s.iterator(); // Must be in the synchronized 
> block
> while (i.hasNext())
> foo(i.next());
> }

Ack, nice catch. Didn't realize that. Will fix.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73665
---


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



Re: Review Request 30763: Second attempt at flush()

2015-02-23 Thread Jay Kreps


> On Feb. 23, 2015, 6:44 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
> > line 376
> > 
> >
> > Add @throws KafkaException

I added the other exceptions I know about. I didn't add KafkaException as I'm 
not sure if we ever directly throw that and I just want to comment things the 
user may need to be aware of (i.e. there are many programming errors like 
invalid partitions that will give IllegalArgumentException, but I don't cover 
those). Let me know if you see an exception I missed.


> On Feb. 23, 2015, 6:44 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java,
> >  line 91
> > 
> >
> > One possible optimization is to keep a RecordMetadata field in the 
> > FutureRecordMetadata, and value() call will then only create the object 
> > once. Here we could then call
> > 
> > callback.onCompletion(thunk.future.value());

Tried to do this but it gets ugly because you have to split done() too and add 
another volatile access. I think in practice you either use the callback or the 
future so these should generally not duplicate.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73475
---


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



[jira] [Commented] (KAFKA-1664) Kafka does not properly parse multiple ZK nodes with non-root chroot

2015-02-23 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334266#comment-14334266
 ] 

Ashish Kumar Singh commented on KAFKA-1664:
---

[~nehanarkhede] could you review this when you get a chance.

> Kafka does not properly parse multiple ZK nodes with non-root chroot
> 
>
> Key: KAFKA-1664
> URL: https://issues.apache.org/jira/browse/KAFKA-1664
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Ricky Saltzer
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-1664.1.patch, KAFKA-1664.2.patch, 
> KAFKA-1664.patch, KAFKA-1664_2015-01-29_10:26:20.patch
>
>
> When using a non-root ZK directory for Kafka, if you specify multiple ZK 
> servers, Kafka does not seem to properly parse the connection string. 
> *Error*
> {code}
> [root@hodor-001 bin]# ./kafka-console-consumer.sh --zookeeper 
> baelish-001.edh.cloudera.com:2181/kafka,baelish-002.edh.cloudera.com:2181/kafka,baelish-003.edh.cloudera.com:2181/kafka
>  --topic test-topic
> [2014-10-01 15:31:04,629] ERROR Error processing message, stopping consumer:  
> (kafka.consumer.ConsoleConsumer$)
> java.lang.IllegalArgumentException: Path length must be > 0
>   at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
>   at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
>   at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766)
>   at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
>   at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
>   at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
>   at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>   at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>   at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:245)
>   at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:256)
>   at 
> kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:268)
>   at 
> kafka.utils.ZkUtils$.createEphemeralPathExpectConflictHandleZKBug(ZkUtils.scala:306)
>   at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:226)
>   at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:755)
>   at 
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:145)
>   at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
>   at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}
> *Working*
> {code}
> [root@hodor-001 bin]# ./kafka-console-consumer.sh --zookeeper 
> baelish-001.edh.cloudera.com:2181/kafka --topic test-topic
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31226: KAFKA-1615 Generating group ID in ZookeeperConsumerConnector shouldn't require local hostname to resolve

2015-02-23 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31226/#review73733
---


Thank you! 

Give me few days to reproduce the issue and verify that the patch indeed fixes 
it.


core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala


I think hostname will not be null because you set it to "localhost" when 
defining?



core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala


since this test can't fail, why do we need it?


- Gwen Shapira


On Feb. 20, 2015, 3:07 p.m., Jonathan Rafalski wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31226/
> ---
> 
> (Updated Feb. 20, 2015, 3:07 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> adding try/catch to getting hostname to allow the use of the first 
> non-loopback IP address when hostname is not resolvable.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718b 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b 
> 
> Diff: https://reviews.apache.org/r/31226/diff/
> 
> 
> Testing
> ---
> 
> for testing this code I included the unit test that forces the generation of 
> the ID however if you cannot get your local to not resovle you will need to 
> manually throw the exception in the try/catch in order to test.
> 
> 
> Thanks,
> 
> Jonathan Rafalski
> 
>



[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-02-23 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334264#comment-14334264
 ] 

Ashish Kumar Singh commented on KAFKA-313:
--

[~gwenshap], [~nehanarkhede] I have updated the patch and the JIRA. Could you 
guys take a look.

> Add JSON/CSV output and looping options to ConsumerGroupCommand
> ---
>
> Key: KAFKA-313
> URL: https://issues.apache.org/jira/browse/KAFKA-313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dave DeMaagd
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie, patch
> Fix For: 0.8.3
>
> Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
> KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch
>
>
> Adds:
> * '--loop N' - causes the program to loop forever, sleeping for up to N 
> seconds between loops (loop time minus collection time, unless that's less 
> than 0, at which point it will just run again immediately)
> * '--asjson' - display as a JSON string instead of the more human readable 
> output format.
> Neither of the above  depend on each other (you can loop in the human 
> readable output, or do a single shot execution with JSON output).  Existing 
> behavior/output maintained if neither of the above are used.  Diff Attached.
> Impacted files:
> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-02-23 Thread Ashish Kumar Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Kumar Singh updated KAFKA-313:
-
Description: 
Adds:
* '--loop N' - causes the program to loop forever, sleeping for up to N seconds 
between loops (loop time minus collection time, unless that's less than 0, at 
which point it will just run again immediately)
* '--asjson' - display as a JSON string instead of the more human readable 
output format.

Neither of the above  depend on each other (you can loop in the human readable 
output, or do a single shot execution with JSON output).  Existing 
behavior/output maintained if neither of the above are used.  Diff Attached.

Impacted files:

core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

  was:
Adds:
* '--loop N' - causes the program to loop forever, sleeping for up to N seconds 
between loops (loop time minus collection time, unless that's less than 0, at 
which point it will just run again immediately)
* '--asjson' - display as a JSON string instead of the more human readable 
output format.

Neither of the above  depend on each other (you can loop in the human readable 
output, or do a single shot execution with JSON output).  Existing 
behavior/output maintained if neither of the above are used.  Diff Attached.

Impacted files:

core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala


> Add JSON/CSV output and looping options to ConsumerGroupCommand
> ---
>
> Key: KAFKA-313
> URL: https://issues.apache.org/jira/browse/KAFKA-313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dave DeMaagd
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie, patch
> Fix For: 0.8.3
>
> Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
> KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch
>
>
> Adds:
> * '--loop N' - causes the program to loop forever, sleeping for up to N 
> seconds between loops (loop time minus collection time, unless that's less 
> than 0, at which point it will just run again immediately)
> * '--asjson' - display as a JSON string instead of the more human readable 
> output format.
> Neither of the above  depend on each other (you can loop in the human 
> readable output, or do a single shot execution with JSON output).  Existing 
> behavior/output maintained if neither of the above are used.  Diff Attached.
> Impacted files:
> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerGroupCommand

2015-02-23 Thread Ashish Kumar Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Kumar Singh updated KAFKA-313:
-
Summary: Add JSON/CSV output and looping options to ConsumerGroupCommand  
(was: Add JSON/CSV output and looping options to ConsumerOffsetChecker)

> Add JSON/CSV output and looping options to ConsumerGroupCommand
> ---
>
> Key: KAFKA-313
> URL: https://issues.apache.org/jira/browse/KAFKA-313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dave DeMaagd
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie, patch
> Fix For: 0.8.3
>
> Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
> KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch
>
>
> Adds:
> * '--loop N' - causes the program to loop forever, sleeping for up to N 
> seconds between loops (loop time minus collection time, unless that's less 
> than 0, at which point it will just run again immediately)
> * '--asjson' - display as a JSON string instead of the more human readable 
> output format.
> Neither of the above  depend on each other (you can loop in the human 
> readable output, or do a single shot execution with JSON output).  Existing 
> behavior/output maintained if neither of the above are used.  Diff Attached.
> Impacted files:
> core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerOffsetChecker

2015-02-23 Thread Ashish Kumar Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Kumar Singh updated KAFKA-313:
-
Attachment: KAFKA-313_2015-02-23_18:11:32.patch

> Add JSON/CSV output and looping options to ConsumerOffsetChecker
> 
>
> Key: KAFKA-313
> URL: https://issues.apache.org/jira/browse/KAFKA-313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dave DeMaagd
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie, patch
> Fix For: 0.8.3
>
> Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
> KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch
>
>
> Adds:
> * '--loop N' - causes the program to loop forever, sleeping for up to N 
> seconds between loops (loop time minus collection time, unless that's less 
> than 0, at which point it will just run again immediately)
> * '--asjson' - display as a JSON string instead of the more human readable 
> output format.
> Neither of the above  depend on each other (you can loop in the human 
> readable output, or do a single shot execution with JSON output).  Existing 
> behavior/output maintained if neither of the above are used.  Diff Attached.
> Impacted files:
> core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerOffsetChecker

2015-02-23 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334260#comment-14334260
 ] 

Ashish Kumar Singh commented on KAFKA-313:
--

Updated reviewboard https://reviews.apache.org/r/28096/
 against branch trunk

> Add JSON/CSV output and looping options to ConsumerOffsetChecker
> 
>
> Key: KAFKA-313
> URL: https://issues.apache.org/jira/browse/KAFKA-313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dave DeMaagd
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie, patch
> Fix For: 0.8.3
>
> Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
> KAFKA-313.patch, KAFKA-313_2015-02-23_18:11:32.patch
>
>
> Adds:
> * '--loop N' - causes the program to loop forever, sleeping for up to N 
> seconds between loops (loop time minus collection time, unless that's less 
> than 0, at which point it will just run again immediately)
> * '--asjson' - display as a JSON string instead of the more human readable 
> output format.
> Neither of the above  depend on each other (you can loop in the human 
> readable output, or do a single shot execution with JSON output).  Existing 
> behavior/output maintained if neither of the above are used.  Diff Attached.
> Impacted files:
> core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28096: Patch for KAFKA-313

2015-02-23 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28096/
---

(Updated Feb. 24, 2015, 2:11 a.m.)


Review request for kafka, Gwen Shapira, Jarek Cecho, and Joel Koshy.


Summary (updated)
-

Patch for KAFKA-313


Bugs: KAFKA-313
https://issues.apache.org/jira/browse/KAFKA-313


Repository: kafka


Description (updated)
---

KAFKA-313: Add JSON/CSV output and looping options to ConsumerGroupCommand


Diffs (updated)
-

  core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
89fa29a882ae1f2be512e1ae469631c02adeeddb 

Diff: https://reviews.apache.org/r/28096/diff/


Testing
---

Ran ConsumerOffsetChecker with different combinations of --output.format and 
--loop options.


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-1379) Partition reassignment resets clock for time-based retention

2015-02-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334253#comment-14334253
 ] 

Joel Koshy commented on KAFKA-1379:
---

We have been thinking through various alternatives and this is included in a 
proposal here: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

> Partition reassignment resets clock for time-based retention
> 
>
> Key: KAFKA-1379
> URL: https://issues.apache.org/jira/browse/KAFKA-1379
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>
> Since retention is driven off mod-times reassigned partitions will result in
> data that has been on a leader to be retained for another full retention
> cycle. E.g., if retention is seven days and you reassign partitions on the
> sixth day then those partitions will remain on the replicas for another
> seven days.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1982) change kafka.examples.Producer to use the new java producer

2015-02-23 Thread Ashish Kumar Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Kumar Singh reassigned KAFKA-1982:
-

Assignee: Ashish Kumar Singh

> change kafka.examples.Producer to use the new java producer
> ---
>
> Key: KAFKA-1982
> URL: https://issues.apache.org/jira/browse/KAFKA-1982
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Ashish Kumar Singh
>  Labels: newbie
>
> We need to change the example to use the new java producer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1984) java producer may miss an available partition

2015-02-23 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1984:
--

 Summary: java producer may miss an available partition
 Key: KAFKA-1984
 URL: https://issues.apache.org/jira/browse/KAFKA-1984
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Jun Rao
Assignee: Jun Rao


In Partitioner, we cycle through each partition to find one whose leader is 
available. However, since the counter is shared among different caller threads, 
the logic may not iterate through every partition. The impact is that we could 
return an unavailable partition to the caller when there are partitions 
available. If the partition is unavailable for a long time, the producer may 
block due to bufferpool being full.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1724) Errors after reboot in single node setup

2015-02-23 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334148#comment-14334148
 ] 

Sriharsha Chintalapani commented on KAFKA-1724:
---

[~junrao] Yes. We've isStarted in KafkaScheduler which gets set after its 
started and in shutdown we check isStarted and go through shutdown process.
Tested it  in a cluster to reproduce don't see any errors.

> Errors after reboot in single node setup
> 
>
> Key: KAFKA-1724
> URL: https://issues.apache.org/jira/browse/KAFKA-1724
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Ciprian Hacman
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1724.patch
>
>
> In a single node setup, after reboot, Kafka logs show the following:
> {code}
> [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up 
> (kafka.controller.KafkaController)
> [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete 
> (kafka.controller.KafkaController)
> [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data: 
> {"jmx_port":-1,"timestamp":"1413995842465","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}
>  stored data: 
> {"jmx_port":-1,"timestamp":"1413994171579","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}
>  (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node 
> [{"jmx_port":-1,"timestamp":"1413995842465","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}]
>  at /brokers/ids/0 a while back in a different session, hence I will backoff 
> for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of 
> /controller changed sent to 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.IllegalStateException: Kafka scheduler has not been started
> at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> at 
> kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0 
> with address ip-10-91-142-54.eu-west-1.compute.internal:9092. 
> (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started 
> (kafka.server.KafkaServer)
> [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> {code}
> The last log line repeats forever and is correlated with errors on the app 
> side.
> Restarting Kafka fixes the errors.
> Steps to reproduce (with help from the mailing list):
> # start zookeeper
> # start kafka-broker
> # create topic or start a producer writing to a topic
> # stop zookeeper
> # stop kafka-broker( kafka broker shutdown goes into  WARN Session
> 0x14938d9dc010001 for server null, unexpected error, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) 
> java.net.ConnectException: Connection refused)
> # kill -9 kafka-broker
> # restart zookeeper and than kafka-broker leads into the the error above



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: Kafka-trunk #404

2015-02-23 Thread Apache Jenkins Server
See 

Changes:

[junrao] kafka-1971; starting a broker with a conflicting id will delete the 
previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede

--
[...truncated 273 lines...]
kafka.admin.AdminTest > testReassigningNonExistingPartition PASSED

kafka.admin.AdminTest > testResumePartitionReassignmentThatWasCompleted PASSED

kafka.admin.AdminTest > testPreferredReplicaJsonData PASSED

kafka.admin.AdminTest > testBasicPreferredReplicaElection PASSED

kafka.admin.AdminTest > testShutdownBroker PASSED

kafka.admin.AdminTest > testTopicConfigChange PASSED

kafka.admin.DeleteConsumerGroupTest > testGroupWideDeleteInZK PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingOneTopic PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics 
PASSED

kafka.admin.DeleteConsumerGroupTest > testTopicWideDeleteInZK PASSED

kafka.admin.DeleteConsumerGroupTest > 
testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.RollingBounceTest > testRollingBounce PASSED

kafka.integration.TopicMetadataTest > testTopicMetadataRequest PASSED

kafka.integration.TopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.TopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.TopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.FetcherTest > testFetcher PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionEnabledByTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest > 
testCleanLeaderElectionDisabledByTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionInvalidTopicOverride PASSED

kafka.zk.ZKEphemeralTest > testEphemeralNodeCleanup PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.message.MessageCompressionTest > testSimpleCompressDecompress PASSED

kafka.message.ByteBufferMessageSetTest > testIterator PASSED

kafka.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.message.ByteBufferMessageSetTest > testWriteTo PASSED

kafka.message.ByteBufferMessageSetTest > testValidBytes PASSED

kafka.message.ByteBufferMessageSetTest > testValidBytesWithCompression PASSED

kafka.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.message.ByteBufferMessageSetTest > testOffsetAssignment PASSED

kafka.message.MessageTest > testFieldValues PASSED

kafka.message.MessageTest > testChecksum PASSED

kafka.message.MessageTest > testEquality PASSED

kafka.message.MessageTest > testIsHashable PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeHoursProvided PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMsProvided PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeNoConfigProvided PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndHoursProvided 
PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
PASSED

kafka.server.KafkaConfigTest > testAdvertiseDefaults PASSED

kafka.server.KafkaConfigTest > testAdvertiseConfigured PASSED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault PASSED

kafka.server.KafkaConfigTest > testUncleanElectionDisabled PASSED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled PASSED

kafka.server.KafkaConfigTest > testUnc

[jira] [Commented] (KAFKA-1724) Errors after reboot in single node setup

2015-02-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334130#comment-14334130
 ] 

Jun Rao commented on KAFKA-1724:


[~sriharsha], so, this is fixed as part of KAFKA-1760?

> Errors after reboot in single node setup
> 
>
> Key: KAFKA-1724
> URL: https://issues.apache.org/jira/browse/KAFKA-1724
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Ciprian Hacman
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1724.patch
>
>
> In a single node setup, after reboot, Kafka logs show the following:
> {code}
> [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up 
> (kafka.controller.KafkaController)
> [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete 
> (kafka.controller.KafkaController)
> [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data: 
> {"jmx_port":-1,"timestamp":"1413995842465","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}
>  stored data: 
> {"jmx_port":-1,"timestamp":"1413994171579","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}
>  (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node 
> [{"jmx_port":-1,"timestamp":"1413995842465","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}]
>  at /brokers/ids/0 a while back in a different session, hence I will backoff 
> for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of 
> /controller changed sent to 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.IllegalStateException: Kafka scheduler has not been started
> at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> at 
> kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0 
> with address ip-10-91-142-54.eu-west-1.compute.internal:9092. 
> (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started 
> (kafka.server.KafkaServer)
> [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> {code}
> The last log line repeats forever and is correlated with errors on the app 
> side.
> Restarting Kafka fixes the errors.
> Steps to reproduce (with help from the mailing list):
> # start zookeeper
> # start kafka-broker
> # create topic or start a producer writing to a topic
> # stop zookeeper
> # stop kafka-broker( kafka broker shutdown goes into  WARN Session
> 0x14938d9dc010001 for server null, unexpected error, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) 
> java.net.ConnectException: Connection refused)
> # kill -9 kafka-broker
> # restart zookeeper and than kafka-broker leads into the the error above



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2015-02-23 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334110#comment-14334110
 ] 

Ashish Kumar Singh commented on KAFKA-1476:
---

[~nehanarkhede] I guess you missed out {{bin/kafka-consumer-groups.sh}} while 
committing the patch.

> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>Assignee: Onur Karaman
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, 
> KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, 
> KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch, 
> KAFKA-1476_2014-11-10_11:58:26.patch, KAFKA-1476_2014-11-10_12:04:01.patch, 
> KAFKA-1476_2014-11-10_12:06:35.patch, KAFKA-1476_2014-12-05_12:00:12.patch, 
> KAFKA-1476_2015-01-12_16:22:26.patch, KAFKA-1476_2015-01-12_16:31:20.patch, 
> KAFKA-1476_2015-01-13_10:36:18.patch, KAFKA-1476_2015-01-15_14:30:04.patch, 
> KAFKA-1476_2015-01-22_02:32:52.patch, KAFKA-1476_2015-01-30_11:09:59.patch, 
> KAFKA-1476_2015-02-04_15:41:50.patch, KAFKA-1476_2015-02-04_18:03:15.patch, 
> KAFKA-1476_2015-02-05_03:01:09.patch, KAFKA-1476_2015-02-09_14:37:30.patch, 
> sample-kafka-consumer-groups-sh-output-1-23-2015.txt, 
> sample-kafka-consumer-groups-sh-output-2-5-2015.txt, 
> sample-kafka-consumer-groups-sh-output-2-9-2015.txt, 
> sample-kafka-consumer-groups-sh-output.txt
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1983) TestEndToEndLatency can be unreliable after hard kill

2015-02-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334096#comment-14334096
 ] 

Jun Rao commented on KAFKA-1983:


An easy solution is probably to avoid committing offset at all. This way, the 
consumer will always resume consumption from the tail of the log on restart.

> TestEndToEndLatency can be unreliable after hard kill
> -
>
> Key: KAFKA-1983
> URL: https://issues.apache.org/jira/browse/KAFKA-1983
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>  Labels: newbie
>
> If you hard kill TestEndToEndLatency, the committed offset remains the last 
> checkpointed one. However, more messages are now appended after the last 
> checkpointed offset. When restarting TestEndToEndLatency, the consumer 
> resumes from the last checkpointed offset and will report really low latency 
> since it doesn't need to wait for a new message to be produced to read the 
> next message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1983) TestEndToEndLatency can be unreliable after hard kill

2015-02-23 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1983:
--

 Summary: TestEndToEndLatency can be unreliable after hard kill
 Key: KAFKA-1983
 URL: https://issues.apache.org/jira/browse/KAFKA-1983
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


If you hard kill TestEndToEndLatency, the committed offset remains the last 
checkpointed one. However, more messages are now appended after the last 
checkpointed offset. When restarting TestEndToEndLatency, the consumer resumes 
from the last checkpointed offset and will report really low latency since it 
doesn't need to wait for a new message to be produced to read the next message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1978) Replication test_0131 system test has been failing.

2015-02-23 Thread Abhishek Nigam (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Nigam updated KAFKA-1978:
--
Status: Open  (was: Patch Available)

> Replication test_0131 system test has been failing.
> ---
>
> Key: KAFKA-1978
> URL: https://issues.apache.org/jira/browse/KAFKA-1978
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Abhishek Nigam
>Assignee: Abhishek Nigam
>
> Issue is an out of bounds exception due to mis-configuration of the test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1978) Replication test_0131 system test has been failing.

2015-02-23 Thread Abhishek Nigam (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Nigam updated KAFKA-1978:
--
Reviewer: Guozhang Wang
  Status: Patch Available  (was: Open)

> Replication test_0131 system test has been failing.
> ---
>
> Key: KAFKA-1978
> URL: https://issues.apache.org/jira/browse/KAFKA-1978
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Abhishek Nigam
>Assignee: Abhishek Nigam
>
> Issue is an out of bounds exception due to mis-configuration of the test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1724) Errors after reboot in single node setup

2015-02-23 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334088#comment-14334088
 ] 

Sriharsha Chintalapani commented on KAFKA-1724:
---

[~junrao] Thanks for the comments on the patch. So it looks like this is 
already fixed in the trunk.  We can close this JIRA.

> Errors after reboot in single node setup
> 
>
> Key: KAFKA-1724
> URL: https://issues.apache.org/jira/browse/KAFKA-1724
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Ciprian Hacman
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1724.patch
>
>
> In a single node setup, after reboot, Kafka logs show the following:
> {code}
> [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up 
> (kafka.controller.KafkaController)
> [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete 
> (kafka.controller.KafkaController)
> [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data: 
> {"jmx_port":-1,"timestamp":"1413995842465","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}
>  stored data: 
> {"jmx_port":-1,"timestamp":"1413994171579","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}
>  (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node 
> [{"jmx_port":-1,"timestamp":"1413995842465","host":"ip-10-91-142-54.eu-west-1.compute.internal","version":1,"port":9092}]
>  at /brokers/ids/0 a while back in a different session, hence I will backoff 
> for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of 
> /controller changed sent to 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.IllegalStateException: Kafka scheduler has not been started
> at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> at 
> kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0 
> with address ip-10-91-142-54.eu-west-1.compute.internal:9092. 
> (kafka.utils.ZkUtils$)
> [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started 
> (kafka.server.KafkaServer)
> [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> {code}
> The last log line repeats forever and is correlated with errors on the app 
> side.
> Restarting Kafka fixes the errors.
> Steps to reproduce (with help from the mailing list):
> # start zookeeper
> # start kafka-broker
> # create topic or start a producer writing to a topic
> # stop zookeeper
> # stop kafka-broker( kafka broker shutdown goes into  WARN Session
> 0x14938d9dc010001 for server null, unexpected error, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) 
> java.net.ConnectException: Connection refused)
> # kill -9 kafka-broker
> # restart zookeeper and than kafka-broker leads into the the error above



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1982) change kafka.examples.Producer to use the new java producer

2015-02-23 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1982:
--

 Summary: change kafka.examples.Producer to use the new java 
producer
 Key: KAFKA-1982
 URL: https://issues.apache.org/jira/browse/KAFKA-1982
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.3
Reporter: Jun Rao


We need to change the example to use the new java producer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1460) NoReplicaOnlineException: No replica for partition

2015-02-23 Thread Stevo Slavic (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334066#comment-14334066
 ] 

Stevo Slavic commented on KAFKA-1460:
-

Just encountered this issue in an integration test, which starts embedded 
ZooKeeper and Kafka, on same node.

> NoReplicaOnlineException: No replica for partition
> --
>
> Key: KAFKA-1460
> URL: https://issues.apache.org/jira/browse/KAFKA-1460
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1.1
>Reporter: Artur Denysenko
>Priority: Critical
> Attachments: state-change.log
>
>
> We have a standalone kafka server.
> After several days of running we get:
> {noformat}
> kafka.common.NoReplicaOnlineException: No replica for partition 
> [gk.q.module,1] is alive. Live brokers are: [Set()], Assigned replicas are: 
> [List(0)]
>   at 
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
>   at 
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
>   at 
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
>   at 
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>   at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>   at 
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
>   at 
> kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
>   at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:312)
>   at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:162)
>   at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
>   at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
>   at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1066)
>   at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1066)
>   at kafka.utils.Utils$.inLock(Utils.scala:538)
>   at 
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1066)
>   at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> Please see attached [state-change.log]
> You can find all server logs (450mb) here: 
> http://46.4.114.35:/deploy/kafka-logs.2014-05-14-16.tgz
> On client we get:
> {noformat}
> 16:28:36,843 [ool-12-thread-2] WARN  ZookeeperConsumerConnector - 
> [dev_dev-1400257716132-e7b8240c], no brokers found when trying to rebalance.
> {noformat}
> If we try to send message using 'kafka-console-producer.sh':
> {noformat}
> [root@dev kafka]# /srv/kafka/bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic test
> message
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> [2014-05-16 19:45:30,950] WARN Fetching topic metadata with correlation id 0 
> for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed 
> (kafka.client.ClientUtils$)
> java.net.SocketTimeoutException
> at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at 
> kafka.network.BoundedByteBufferRec

[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing

2015-02-23 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334046#comment-14334046
 ] 

Ashish Kumar Singh commented on KAFKA-1856:
---

Sounds good [~charmalloc]!

> Add PreCommit Patch Testing
> ---
>
> Key: KAFKA-1856
> URL: https://issues.apache.org/jira/browse/KAFKA-1856
> Project: Kafka
>  Issue Type: Task
>Reporter: Ashish Kumar Singh
>Assignee: Ashish Kumar Singh
> Attachments: KAFKA-1845.result.txt, KAFKA-1856.patch, 
> KAFKA-1856_2015-01-18_21:43:56.patch, KAFKA-1856_2015-02-04_14:57:05.patch, 
> KAFKA-1856_2015-02-04_15:44:47.patch
>
>
> h1. Kafka PreCommit Patch Testing - *Don't wait for it to break*
> h2. Motivation
> *With great power comes great responsibility* - Uncle Ben. As Kafka user list 
> is growing, mechanism to ensure quality of the product is required. Quality 
> becomes hard to measure and maintain in an open source project, because of a 
> wide community of contributors. Luckily, Kafka is not the first open source 
> project and can benefit from learnings of prior projects.
> PreCommit tests are the tests that are run for each patch that gets attached 
> to an open JIRA. Based on tests results, test execution framework, test bot, 
> +1 or -1 the patch. Having PreCommit tests take the load off committers to 
> look at or test each patch.
> h2. Tests in Kafka
> h3. Unit and Integraiton Tests
> [Unit and Integration 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests]
>  are cardinal to help contributors to avoid breaking existing functionalities 
> while adding new functionalities or fixing older ones. These tests, atleast 
> the ones relevant to the changes, must be run by contributors before 
> attaching a patch to a JIRA.
> h3. System Tests
> [System 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] 
> are much wider tests that, unlike unit tests, focus on end-to-end scenarios 
> and not some specific method or class.
> h2. Apache PreCommit tests
> Apache provides a mechanism to automatically build a project and run a series 
> of tests whenever a patch is uploaded to a JIRA. Based on test execution, the 
> test framework will comment with a +1 or -1 on the JIRA.
> You can read more about the framework here:
> http://wiki.apache.org/general/PreCommitBuilds
> h2. Plan
> # Create a test-patch.py script (similar to the one used in Flume, Sqoop and 
> other projects) that will take a jira as a parameter, apply on the 
> appropriate branch, build the project, run tests and report results. This 
> script should be committed into the Kafka code-base. To begin with, this will 
> only run unit tests. We can add code sanity checks, system_tests, etc in the 
> future.
> # Create a jenkins job for running the test (as described in 
> http://wiki.apache.org/general/PreCommitBuilds) and validate that it works 
> manually. This must be done by a committer with Jenkins access.
> # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ 
> to add Kafka to the list of projects PreCommit-Admin triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: KafkaPreCommit #17

2015-02-23 Thread Apache Jenkins Server
See 

Changes:

[junrao] kafka-1971; starting a broker with a conflicting id will delete the 
previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede

--
[...truncated 1024 lines...]
kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.log.CleanerTest > testCleanSegments PASSED

kafka.log.CleanerTest > testCleaningWithDeletes PASSED

kafka.log.CleanerTest > testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest > testSegmentGrouping PASSED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.LogManagerTest > testCreateLog PASSED

kafka.log.LogManagerTest > testGetNonExistentLog PASSED

kafka.log.LogManagerTest > testCleanupExpiredSegments PASSED

kafka.log.LogManagerTest > testCleanupSegmentsToMaintainSize PASSED

kafka.log.LogManagerTest > testTimeBasedFlush PASSED

kafka.log.LogManagerTest > testLeastLoadedAssignment PASSED

kafka.log.LogManagerTest > testTwoLogManagersUsingSameDirFails PASSED

kafka.log.LogManagerTest > testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.LogConfigTest > testFromPropsDefaults PASSED

kafka.log.LogConfigTest > testFromPropsEmpty PASSED

kafka.log.LogConfigTest > testFromPropsToProps PASSED

kafka.log.LogConfigTest > testFromPropsInvalid PASSED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.FileMessageSetTest > testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest > testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest > testSizeInBytes PASSED

kafka.log.FileMessageSetTest > testWriteTo PASSED

kafka.log.FileMessageSetTest > testFileSize PASSED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest > testRead PASSED

kafka.log.FileMessageSetTest > testSearch PASSED

kafka.log.FileMessageSetTest > testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest > testTruncate PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLo

Re: [kafka-clients] Re: [VOTE] 0.8.2.1 Candidate 1

2015-02-23 Thread Jun Rao
Hi, Joe,

Left a comment on KAFKA-1724. Not sure if it's a true blocker. However, we
can probably include it if it can be fixed in the next couple of days.

Thanks,

Jun

On Sun, Feb 22, 2015 at 2:12 PM, Joe Stein  wrote:

> Jun,
>
> Can we also add https://issues.apache.org/jira/browse/KAFKA-1724 to the
> next RC please?
>
> Thanks!
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - -
>
>   http://www.stealth.ly
> - - - - - - - - - - - - - - - - -
>
> On Sun, Feb 22, 2015 at 11:59 AM, Jun Rao  wrote:
>
>> We identified at least one more blocker issue KAFKA-1971 during testing.
>> So, we will have to roll another RC for 0.8.2.1.
>>
>> Thanks,
>>
>> Jun
>>
>> On Sat, Feb 21, 2015 at 6:04 PM, Joe Stein  wrote:
>>
>>> Source verified, tests pass, quick start ok.
>>>
>>> Binaries verified, tests on scala
>>> https://github.com/stealthly/scala-kafka/pull/27 and go clients
>>> https://github.com/stealthly/go_kafka_client/pull/55 passing.
>>>
>>> If the release passes we should update the release notes to include the
>>> change from KAFKA-1729 please.
>>>
>>> +1 (binding)
>>>
>>> ~ Joe Stein
>>>
>>> On Fri, Feb 20, 2015 at 9:08 PM, ted won  wrote:
>>>
 +1

 On Friday, February 20, 2015, Guozhang Wang  wrote:

 > +1 binding.
 >
 > Checked the md5, and quick start.
 >
 > Some minor comments:
 >
 > 1. The quickstart section would better include the building step after
 > download and before starting server.
 >
 > 2. There seems to be a bug in Gradle 1.1x with Java 8 causing the
 "gradle"
 > initialization to fail:
 >
 > -
 >
 > FAILURE: Build failed with an exception.
 >
 > * Where:
 > Build file '/home/guwang/Workspace/temp/kafka/build.gradle' line: 199
 >
 > * What went wrong:
 > A problem occurred evaluating root project 'kafka'.
 > > Could not create task of type 'ScalaDoc'.
 > --
 >
 > Downgrading Java to 1.7 resolve this issue.
 >
 > Guozhang
 >
 > On Wed, Feb 18, 2015 at 7:56 PM, Connie Yang >>> > > wrote:
 >
 > > +1
 > > On Feb 18, 2015 7:23 PM, "Matt Narrell" >>> > > wrote:
 > >
 > > > +1
 > > >
 > > > > On Feb 18, 2015, at 7:56 PM, Jun Rao >>> > > wrote:
 > > > >
 > > > > This is the first candidate for release of Apache Kafka
 0.8.2.1. This
 > > > > only fixes one critical issue (KAFKA-1952) in 0.8.2.0.
 > > > >
 > > > > Release Notes for the 0.8.2.1 release
 > > > >
 > > >
 > >
 >
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html
 > > > >
 > > > > *** Please download, test and vote by Saturday, Feb 21, 7pm PT
 > > > >
 > > > > Kafka's KEYS file containing PGP keys we use to sign the
 release:
 > > > > http://kafka.apache.org/KEYS in addition to the md5, sha1
 > > > > and sha2 (SHA256) checksum.
 > > > >
 > > > > * Release artifacts to be voted upon (source and binary):
 > > > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/
 > > > >
 > > > > * Maven artifacts to be voted upon prior to release:
 > > > > https://repository.apache.org/content/groups/staging/
 > > > >
 > > > > * scala-doc
 > > > >
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/
 > > > >
 > > > > * java-doc
 > > > >
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/
 > > > >
 > > > > * The tag to be voted upon (off the 0.8.2 branch) is the
 0.8.2.1 tag
 > > > >
 > > >
 > >
 >
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6
 > > > >
 > > > > /***
 > > > >
 > > > > Thanks,
 > > > >
 > > > > Jun
 > > >
 > > >
 > >
 >
 >
 >
 > --
 > -- Guozhang
 >

>>>
>>>  --
>>> You received this message because you are subscribed to the Google
>>> Groups "kafka-clients" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to kafka-clients+unsubscr...@googlegroups.com.
>>> To post to this group, send email to kafka-clie...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/kafka-clients.
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/kafka-clients/CAA7ooCDvUNQx2B351P3LaOYAejoxR9M_PbzfmWo5-ssgEJ_%2Bpw%40mail.gmail.com
>>> 
>>> .
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>


Re: Review Request 28027: Patch for KAFKA-1724

2015-02-23 Thread Jun Rao


> On Nov. 14, 2014, 2:39 a.m., Jun Rao wrote:
> > Thanks for the patch. I am not sure if that's the right place to fix it 
> > though. The issue is that when KafkaController.onControllerResignation() is 
> > called, the controller wasn't actually active. A better fix is probably to 
> > guard this in onControllerResignation() and only go through the logic if 
> > the controller is active.
> 
> Sriharsha Chintalapani wrote:
> Jun, Thanks for the review. When KafkaController.startup() calls it sets 
> the isRunning to true before calling controllerElector.startup which calls 
> elect and if the /controller from previous run still exists it sets leaderId 
> and returns, meanwhile the /controller gets deleted triggering 
> handleDataDelete which calls onControllerResgination(). As per these events 
> KafkaController is started . I tried setting kafkaController.isRunning to 
> false , not sure why its true as default and only do onControllerResignation 
> if isRunning is true. This doesn't work because KafkaController.startup() 
> makes it true before calling controllerElector.startup() which is going to 
> trigger onControllerResignation().  
> Regarding the patch I don't think Kafkascheduler should throw an 
> exception when a shutdown is called.
> On a side note onControllerResignation should be a wrapper around if 
> (isRunning) and isRunning by default should be false.
> Please let me know your thoughts.

Sorry for the late response. It seems that currently onControllerResignation() 
can be called even when the broker is not a controller (e.g. from 
SessionExpirationListener). So, we probably should just make the logic in 
onControllerResignation() more robust. I was thinking that before we shut down 
the KafkaScheduler, we can first check if it's started already (trunk already 
has such a method) and only shut down the scheduler if it's started. This is 
probably better than making the change at the scheduler level since in some 
other use cases, we do expect the shutdown call to only happen after startup.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28027/#review61391
---


On Nov. 14, 2014, 1:34 a.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28027/
> ---
> 
> (Updated Nov. 14, 2014, 1:34 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1724
> https://issues.apache.org/jira/browse/KAFKA-1724
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1724. Errors after reboot in single node setup.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/utils/KafkaScheduler.scala 
> 9a16343d2ff7192b741f0b23a6bdf58d8f2bbd3e 
> 
> Diff: https://reviews.apache.org/r/28027/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/#review73692
---



core/src/main/scala/kafka/message/ByteBufferMessageSet.scala


(I added this because we were using the sizeInBytes just to check the 
buffer limit above and it looked confusing)



core/src/test/scala/unit/kafka/log/LogTest.scala


(unintentional)


- Joel Koshy


On Feb. 23, 2015, 10:29 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31306/
> ---
> 
> (Updated Feb. 23, 2015, 10:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1755
> https://issues.apache.org/jira/browse/KAFKA-1755
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1755 v2
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/Log.scala 
> 846023bb98d0fa0603016466360c97071ac935ea 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> fd87d90597981c867a9b23731fca3b555bf85b7f 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 83d52643028c5628057dc0aa29819becfda61fdb 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 
> d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
>   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
> 73a26377eb63ab9989698e0491049434f032cba2 
> 
> Diff: https://reviews.apache.org/r/31306/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



[jira] [Updated] (KAFKA-1755) Improve error handling in log cleaner

2015-02-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1755:
--
Attachment: KAFKA-1755_2015-02-23_14:29:54.patch

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1755.patch, KAFKA-1755_2015-02-23_14:29:54.patch
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1979) LogCleaner delete file asynchronously and can cause inconsistency between log cleaner checkpoint and first dirty log segment file.

2015-02-23 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-1979.
-
Resolution: Invalid

> LogCleaner delete file asynchronously and can cause inconsistency between log 
> cleaner checkpoint and first dirty log segment file.
> --
>
> Key: KAFKA-1979
> URL: https://issues.apache.org/jira/browse/KAFKA-1979
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>
> In LogCleaner we delete the log segment file asynchronously and then 
> checkpoint the cleaned offsets. If broker exit after checkpoint is written 
> but before log segment got deleted, when the broker start again, it will have 
> the first dirty log offset different from the check-pointed offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1755) Improve error handling in log cleaner

2015-02-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334000#comment-14334000
 ] 

Joel Koshy commented on KAFKA-1755:
---

Updated reviewboard https://reviews.apache.org/r/31306/diff/
 against branch origin/trunk

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1755.patch, KAFKA-1755_2015-02-23_14:29:54.patch
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/
---

(Updated Feb. 23, 2015, 10:29 p.m.)


Review request for kafka.


Bugs: KAFKA-1755
https://issues.apache.org/jira/browse/KAFKA-1755


Repository: kafka


Description (updated)
---

KAFKA-1755 v2


Diffs (updated)
-

  core/src/main/scala/kafka/log/Log.scala 
846023bb98d0fa0603016466360c97071ac935ea 
  core/src/main/scala/kafka/log/LogCleaner.scala 
f8e7cd5fabce78c248a9027c4bb374a792508675 
  core/src/main/scala/kafka/log/LogCleanerManager.scala 
fd87d90597981c867a9b23731fca3b555bf85b7f 
  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
  core/src/main/scala/kafka/server/OffsetManager.scala 
83d52643028c5628057dc0aa29819becfda61fdb 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 
d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
  core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
73a26377eb63ab9989698e0491049434f032cba2 

Diff: https://reviews.apache.org/r/31306/diff/


Testing
---


Thanks,

Joel Koshy



[jira] [Resolved] (KAFKA-1980) Console consumer throws OutOfMemoryError with large max-messages

2015-02-23 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1980.
--
Resolution: Won't Fix

Best to discuss this on the mailing list first. 
(http://kafka.apache.org/contact.html)

> Console consumer throws OutOfMemoryError with large max-messages
> 
>
> Key: KAFKA-1980
> URL: https://issues.apache.org/jira/browse/KAFKA-1980
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1, 0.8.2.0
>Reporter: Håkon Hitland
>Priority: Minor
>
> Tested on kafka_2.11-0.8.2.0
> Steps to reproduce:
> - Have any topic with at least 1 GB of data.
> - Use kafka-console-consumer.sh on the topic passing a large number to 
> --max-messages, e.g.:
> $ bin/kafka-console-consumer.sh --zookeeper localhost --topic test.large 
> --from-beginning --max-messages  | head -n 40
> Expected result:
> Should stream messages up to max-messages
> Result:
> Out of memory error:
> [2015-02-23 19:41:35,006] ERROR OOME with size 1048618 
> (kafka.network.BoundedByteBufferReceive)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> As a first guess I'd say that this is caused by slice() taking more memory 
> than expected. Perhaps because it is called on an Iterable and not an 
> Iterator?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1979) LogCleaner delete file asynchronously and can cause inconsistency between log cleaner checkpoint and first dirty log segment file.

2015-02-23 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333995#comment-14333995
 ] 

Jiangjie Qin commented on KAFKA-1979:
-

Hey Jay. I checked it again. You are right, I missed that deleted suffix. There 
are some other theories I got, but I'll close this ticket for now and open 
another one after I verify it. Thanks.

> LogCleaner delete file asynchronously and can cause inconsistency between log 
> cleaner checkpoint and first dirty log segment file.
> --
>
> Key: KAFKA-1979
> URL: https://issues.apache.org/jira/browse/KAFKA-1979
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>
> In LogCleaner we delete the log segment file asynchronously and then 
> checkpoint the cleaned offsets. If broker exit after checkpoint is written 
> but before log segment got deleted, when the broker start again, it will have 
> the first dirty log offset different from the check-pointed offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Joel Koshy


> On Feb. 23, 2015, 7:05 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/message/ByteBufferMessageSet.scala, line 209
> > 
> >
> > In doing !compactedTopic here I'm forcing iteration over the messages 
> > below. I can also do an in-place verification here to avoid iteration (and 
> > creation of message objects).

I'll upload another patch in a minute which explains this more clearly. Not 
sure if it is worth it - let me know what you guys think.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/#review73629
---


On Feb. 23, 2015, 2:43 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31306/
> ---
> 
> (Updated Feb. 23, 2015, 2:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1755
> https://issues.apache.org/jira/browse/KAFKA-1755
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Add compacted topic constraint checks - reject unkeyed messages; reject 
> compressed messages if topic's broker-side compression is not uncompressed
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/Log.scala 
> 846023bb98d0fa0603016466360c97071ac935ea 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> fd87d90597981c867a9b23731fca3b555bf85b7f 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 83d52643028c5628057dc0aa29819becfda61fdb 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 
> d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
>   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
> 73a26377eb63ab9989698e0491049434f032cba2 
> 
> Diff: https://reviews.apache.org/r/31306/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



Re: Review Request 30763: Second attempt at flush()

2015-02-23 Thread Jiangjie Qin


> On Feb. 23, 2015, 8:53 p.m., Jiangjie Qin wrote:
> > Hi Jay, I applied the patch and tried to run it in our test environment. I 
> > got this exception:
> > 
> > java.util.ConcurrentModificationException
> > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
> > at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
> > at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> > at java.util.HashSet.(HashSet.java:119)
> > at 
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:348)
> > at 
> > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:498)
> > at 
> > kafka.tools.MirrorMaker$MirrorMakerProducer.flush(MirrorMaker.scala:373)
> > at 
> > kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:314)
> > 
> > It looks that we should synchronize on the access to incomplete. i.e. when 
> > one thread is making a copy of imcomplete set, other thread should not add 
> > batches into it.

>From the java doc of Collection.synchronizedSet()

It is imperative that the user manually synchronize on the returned set when 
iterating over it:
Set s = Collections.synchronizedSet(new HashSet());
...
synchronized (s) {
Iterator i = s.iterator(); // Must be in the synchronized block
while (i.hasNext())
foo(i.next());
}


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73665
---


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



Re: Review Request 30763: Second attempt at flush()

2015-02-23 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73665
---


Hi Jay, I applied the patch and tried to run it in our test environment. I got 
this exception:

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.(HashSet.java:119)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:348)
at 
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:498)
at 
kafka.tools.MirrorMaker$MirrorMakerProducer.flush(MirrorMaker.scala:373)
at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:314)

It looks that we should synchronize on the access to incomplete. i.e. when one 
thread is making a copy of imcomplete set, other thread should not add batches 
into it.

- Jiangjie Qin


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
>  4a2da41f47994f778109e3c4107ffd90195f0bae 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java 
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



Anyone interested in speaking at Bay Area Kafka meetup @ LinkedIn on March 24?

2015-02-23 Thread Ed Yakabosky
Hi Kafka Open Source -

LinkedIn will host another Bay Area Kafka meetup in Mountain View on March 24.  
We are planning to present on Offset Management but are looking for additional 
speakers.  If you’re interested in presenting a use case, operational plan, or 
your experience with a particular feature (REST interface, WebConsole), please 
reply-all to let us know.

[BCC: Open Source lists]

Thanks,
Ed


[jira] [Updated] (KAFKA-1971) starting a broker with a conflicting id will delete the previous broker registration

2015-02-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1971:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the review. Double committed to 0.8.2 and trunk.

> starting a broker with a conflicting id will delete the previous broker 
> registration
> 
>
> Key: KAFKA-1971
> URL: https://issues.apache.org/jira/browse/KAFKA-1971
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Blocker
> Fix For: 0.8.2.1
>
> Attachments: kafka-1971_2015-02-22_21:11:52.patch
>
>
> This issue can be easily reproduced by the following steps.
> 1. Start broker 1.
> 2. Start broker 2 with the same id as broker 1 (configure different port, log 
> dir).
> Broker 2's registration will fail. However, broker 1's registration in ZK is 
> now deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1974) NPE in SelectorTest on trunk

2015-02-23 Thread Aditya Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333735#comment-14333735
 ] 

Aditya Auradkar commented on KAFKA-1974:


[~nehanarkhede] I agree that a null check is not sufficient but I think that we 
should add one anyway. The Selector throws a KafkaException and the EchoServer 
throws a raw Exception. In either of those cases, the "After" will throw an 
NPE.. so we should guard against a known issue.

> NPE in SelectorTest on trunk
> 
>
> Key: KAFKA-1974
> URL: https://issues.apache.org/jira/browse/KAFKA-1974
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Neha Narkhede
>  Labels: newbie
>
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.network.SelectorTest.teardown(SelectorTest.java:57)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:37)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
>   at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
>   at 
> org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1981) Make log compaction point configurable

2015-02-23 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1981:
-
Labels: newbie++  (was: )

> Make log compaction point configurable
> --
>
> Key: KAFKA-1981
> URL: https://issues.apache.org/jira/browse/KAFKA-1981
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>  Labels: newbie++
>
> Currently if you enable log compaction the compactor will kick in whenever 
> you hit a certain "dirty ratio", i.e. when 50% of your data is uncompacted. 
> Other than this we don't give you fine-grained control over when compaction 
> occurs. In addition we never compact the active segment (since it is still 
> being written to).
> Other than this we don't really give you much control over when compaction 
> will happen. The result is that you can't really guarantee that a consumer 
> will get every update to a compacted topic--if the consumer falls behind a 
> bit it might just get the compacted version.
> This is usually fine, but it would be nice to make this more configurable so 
> you could set either a # messages, size, or time bound for compaction.
> This would let you say, for example, "any consumer that is no more than 1 
> hour behind will get every message."
> This should be relatively easy to implement since it just impacts the 
> end-point the compactor considers available for compaction. I think we 
> already have that concept, so this would just be some other overrides to add 
> in when calculating that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1981) Make log compaction point configurable

2015-02-23 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1981:


 Summary: Make log compaction point configurable
 Key: KAFKA-1981
 URL: https://issues.apache.org/jira/browse/KAFKA-1981
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps


Currently if you enable log compaction the compactor will kick in whenever you 
hit a certain "dirty ratio", i.e. when 50% of your data is uncompacted. Other 
than this we don't give you fine-grained control over when compaction occurs. 
In addition we never compact the active segment (since it is still being 
written to).

Other than this we don't really give you much control over when compaction will 
happen. The result is that you can't really guarantee that a consumer will get 
every update to a compacted topic--if the consumer falls behind a bit it might 
just get the compacted version.

This is usually fine, but it would be nice to make this more configurable so 
you could set either a # messages, size, or time bound for compaction.

This would let you say, for example, "any consumer that is no more than 1 hour 
behind will get every message."

This should be relatively easy to implement since it just impacts the end-point 
the compactor considers available for compaction. I think we already have that 
concept, so this would just be some other overrides to add in when calculating 
that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1979) LogCleaner delete file asynchronously and can cause inconsistency between log cleaner checkpoint and first dirty log segment file.

2015-02-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333703#comment-14333703
 ] 

Jay Kreps commented on KAFKA-1979:
--

Can you expand on this a bit. The delete protocol is to first rename the 
segment to .deleted so if we crash and recovery we should just delete it then. 
But I'm not sure if that covers the issue you see.

> LogCleaner delete file asynchronously and can cause inconsistency between log 
> cleaner checkpoint and first dirty log segment file.
> --
>
> Key: KAFKA-1979
> URL: https://issues.apache.org/jira/browse/KAFKA-1979
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>
> In LogCleaner we delete the log segment file asynchronously and then 
> checkpoint the cleaned offsets. If broker exit after checkpoint is written 
> but before log segment got deleted, when the broker start again, it will have 
> the first dirty log offset different from the check-pointed offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Joel Koshy


> On Feb. 23, 2015, 6:36 p.m., Mayuresh Gharat wrote:
> > core/src/main/scala/kafka/message/ByteBufferMessageSet.scala, line 221
> > 
> >
> > We may also need to check this right:
> > sourceCodec != NoCompressionCodec

Thanks for checking. I actually think this is okay since the target codec is 
what matters - i.e., if target==uncompressed then source codec does not matter.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/#review73618
---


On Feb. 23, 2015, 2:43 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31306/
> ---
> 
> (Updated Feb. 23, 2015, 2:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1755
> https://issues.apache.org/jira/browse/KAFKA-1755
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Add compacted topic constraint checks - reject unkeyed messages; reject 
> compressed messages if topic's broker-side compression is not uncompressed
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/Log.scala 
> 846023bb98d0fa0603016466360c97071ac935ea 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> fd87d90597981c867a9b23731fca3b555bf85b7f 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 83d52643028c5628057dc0aa29819becfda61fdb 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 
> d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
>   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
> 73a26377eb63ab9989698e0491049434f032cba2 
> 
> Diff: https://reviews.apache.org/r/31306/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/#review73629
---



core/src/main/scala/kafka/message/ByteBufferMessageSet.scala


In doing !compactedTopic here I'm forcing iteration over the messages 
below. I can also do an in-place verification here to avoid iteration (and 
creation of message objects).


- Joel Koshy


On Feb. 23, 2015, 2:43 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31306/
> ---
> 
> (Updated Feb. 23, 2015, 2:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1755
> https://issues.apache.org/jira/browse/KAFKA-1755
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Add compacted topic constraint checks - reject unkeyed messages; reject 
> compressed messages if topic's broker-side compression is not uncompressed
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/Log.scala 
> 846023bb98d0fa0603016466360c97071ac935ea 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> fd87d90597981c867a9b23731fca3b555bf85b7f 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 83d52643028c5628057dc0aa29819becfda61fdb 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 
> d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
>   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
> 73a26377eb63ab9989698e0491049434f032cba2 
> 
> Diff: https://reviews.apache.org/r/31306/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



[jira] [Created] (KAFKA-1980) Console consumer throws OutOfMemoryError with large max-messages

2015-02-23 Thread JIRA
Håkon Hitland created KAFKA-1980:


 Summary: Console consumer throws OutOfMemoryError with large 
max-messages
 Key: KAFKA-1980
 URL: https://issues.apache.org/jira/browse/KAFKA-1980
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1, 0.8.2.0
Reporter: Håkon Hitland
Priority: Minor


Tested on kafka_2.11-0.8.2.0
Steps to reproduce:
- Have any topic with at least 1 GB of data.
- Use kafka-console-consumer.sh on the topic passing a large number to 
--max-messages, e.g.:
$ bin/kafka-console-consumer.sh --zookeeper localhost --topic test.large 
--from-beginning --max-messages  | head -n 40

Expected result:
Should stream messages up to max-messages

Result:
Out of memory error:
[2015-02-23 19:41:35,006] ERROR OOME with size 1048618 
(kafka.network.BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at 
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


As a first guess I'd say that this is caused by slice() taking more memory than 
expected. Perhaps because it is called on an Iterable and not an Iterator?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/#review73618
---



core/src/main/scala/kafka/message/ByteBufferMessageSet.scala


We may also need to check this right:
sourceCodec != NoCompressionCodec


- Mayuresh Gharat


On Feb. 23, 2015, 2:43 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31306/
> ---
> 
> (Updated Feb. 23, 2015, 2:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1755
> https://issues.apache.org/jira/browse/KAFKA-1755
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Add compacted topic constraint checks - reject unkeyed messages; reject 
> compressed messages if topic's broker-side compression is not uncompressed
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/Log.scala 
> 846023bb98d0fa0603016466360c97071ac935ea 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> fd87d90597981c867a9b23731fca3b555bf85b7f 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 83d52643028c5628057dc0aa29819becfda61fdb 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 
> d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
>   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
> 73a26377eb63ab9989698e0491049434f032cba2 
> 
> Diff: https://reviews.apache.org/r/31306/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Joel Koshy


> On Feb. 23, 2015, 5:08 p.m., Neha Narkhede wrote:
> > It also makes a lot of sense to disallow setting a compacted topic to 
> > uncompacted and vice versa without deleting the topic. Was there a reason 
> > to not include that change here or are you planning on including it in your 
> > follow-up patch?

Thanks for the quick review. Re: preventing compact<->non-compact retention 
changes: I think it is better to do that as a separate patch or a separate jira 
altogether.


> On Feb. 23, 2015, 5:08 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 257
> > 
> >
> > Is it necessary to log this in WARN? It seems like if you hit this 
> > issue on the broker, you will know through the jmx value anyway and the 
> > WARN message will just keep polluting the logs till the issue is fixed. 
> > Maybe turn it down to DEBUG?

I think it is better to keep this at warn or even make it error - for better 
visibility for operations. This only warns if there are invalid messages and it 
only logs one line at the end of the cleaner line.

(BTW, which jmx value are you referring to?)


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/#review73584
---


On Feb. 23, 2015, 2:43 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31306/
> ---
> 
> (Updated Feb. 23, 2015, 2:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1755
> https://issues.apache.org/jira/browse/KAFKA-1755
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Add compacted topic constraint checks - reject unkeyed messages; reject 
> compressed messages if topic's broker-side compression is not uncompressed
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/Log.scala 
> 846023bb98d0fa0603016466360c97071ac935ea 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> fd87d90597981c867a9b23731fca3b555bf85b7f 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 83d52643028c5628057dc0aa29819becfda61fdb 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 
> d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
>   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
> 73a26377eb63ab9989698e0491049434f032cba2 
> 
> Diff: https://reviews.apache.org/r/31306/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



Apache Samza Meetup Announced (March 4 @6PM hosted at LinkedIn¹s campus in Mountain View CA)

2015-02-23 Thread Ed Yakabosky
Hi all -

I would like to announce the first Bay Area Apache Samza 
Meetup hosted at 
LinkedIn in Mountain View, CA on March 4, 2015 @6PM.  We plan to host the event 
every 2-months to encourage knowledge sharing & collaboration in Samza’s 
usage and open 
source community.

The agenda for the meetup is::

  *   6:00 – 6:15PM: Doors open, sign NDAs, networking, food & drinks
  *   6:15 - 6:45PM: Naveen Somasundaram (LinkedIn) – Getting Started with 
Apache Samza
  *   6:45 - 7:30PM: Shekar Tippur (Intuit) – Powering Contextual Alerts for 
Intuit’s Operations Center with Apache Samza
  *   7:30 - 8:00PM: Chris Riccomini (LinkedIn) – Where we’re headed next: 
Apache Samza Roadmap

We plan to provide food & drinks so please RSVP 
here to help us 
with estimation.  Please let me know if you have any questions or ideas for 
future meet ups.

We plan to announce a live stream the day of the event for remote attendance.

Excited to see you there!
Ed Yakabosky

[BCC:
Kafka Open Source
Samza Open Source
LinkedIn’s DDS and DAI teams
Linkedin’s Samza customers]


[jira] [Created] (KAFKA-1979) LogCleaner delete file asynchronously and can cause inconsistency between log cleaner checkpoint and first dirty log segment file.

2015-02-23 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-1979:
---

 Summary: LogCleaner delete file asynchronously and can cause 
inconsistency between log cleaner checkpoint and first dirty log segment file.
 Key: KAFKA-1979
 URL: https://issues.apache.org/jira/browse/KAFKA-1979
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin


In LogCleaner we delete the log segment file asynchronously and then checkpoint 
the cleaned offsets. If broker exit after checkpoint is written but before log 
segment got deleted, when the broker start again, it will have the first dirty 
log offset different from the check-pointed offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1978) Replication test_0131 system test has been failing.

2015-02-23 Thread Abhishek Nigam (JIRA)
Abhishek Nigam created KAFKA-1978:
-

 Summary: Replication test_0131 system test has been failing.
 Key: KAFKA-1978
 URL: https://issues.apache.org/jira/browse/KAFKA-1978
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Reporter: Abhishek Nigam
Assignee: Abhishek Nigam


Issue is an out of bounds exception due to mis-configuration of the test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing

2015-02-23 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333560#comment-14333560
 ] 

Joe Stein commented on KAFKA-1856:
--

[~singhashish] I didn't want to make these changes while the build on trunk is 
breaking currently in jenkins. There is a new jenkins job ready to go once that 
is fixed we can do this.

> Add PreCommit Patch Testing
> ---
>
> Key: KAFKA-1856
> URL: https://issues.apache.org/jira/browse/KAFKA-1856
> Project: Kafka
>  Issue Type: Task
>Reporter: Ashish Kumar Singh
>Assignee: Ashish Kumar Singh
> Attachments: KAFKA-1845.result.txt, KAFKA-1856.patch, 
> KAFKA-1856_2015-01-18_21:43:56.patch, KAFKA-1856_2015-02-04_14:57:05.patch, 
> KAFKA-1856_2015-02-04_15:44:47.patch
>
>
> h1. Kafka PreCommit Patch Testing - *Don't wait for it to break*
> h2. Motivation
> *With great power comes great responsibility* - Uncle Ben. As Kafka user list 
> is growing, mechanism to ensure quality of the product is required. Quality 
> becomes hard to measure and maintain in an open source project, because of a 
> wide community of contributors. Luckily, Kafka is not the first open source 
> project and can benefit from learnings of prior projects.
> PreCommit tests are the tests that are run for each patch that gets attached 
> to an open JIRA. Based on tests results, test execution framework, test bot, 
> +1 or -1 the patch. Having PreCommit tests take the load off committers to 
> look at or test each patch.
> h2. Tests in Kafka
> h3. Unit and Integraiton Tests
> [Unit and Integration 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests]
>  are cardinal to help contributors to avoid breaking existing functionalities 
> while adding new functionalities or fixing older ones. These tests, atleast 
> the ones relevant to the changes, must be run by contributors before 
> attaching a patch to a JIRA.
> h3. System Tests
> [System 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] 
> are much wider tests that, unlike unit tests, focus on end-to-end scenarios 
> and not some specific method or class.
> h2. Apache PreCommit tests
> Apache provides a mechanism to automatically build a project and run a series 
> of tests whenever a patch is uploaded to a JIRA. Based on test execution, the 
> test framework will comment with a +1 or -1 on the JIRA.
> You can read more about the framework here:
> http://wiki.apache.org/general/PreCommitBuilds
> h2. Plan
> # Create a test-patch.py script (similar to the one used in Flume, Sqoop and 
> other projects) that will take a jira as a parameter, apply on the 
> appropriate branch, build the project, run tests and report results. This 
> script should be committed into the Kafka code-base. To begin with, this will 
> only run unit tests. We can add code sanity checks, system_tests, etc in the 
> future.
> # Create a jenkins job for running the test (as described in 
> http://wiki.apache.org/general/PreCommitBuilds) and validate that it works 
> manually. This must be done by a committer with Jenkins access.
> # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ 
> to add Kafka to the list of projects PreCommit-Admin triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31260: Patch for kafka-1971

2015-02-23 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31260/#review73592
---

Ship it!


+1 if the comment below is fixed.


core/src/test/scala/unit/kafka/server/ConflictBrokerRegistrationTest.scala


I'd move this to existing tests for server startup/shutdown. For example, 
ServerStartupTest. Let's not create an entire test suite just for this purpose.


- Neha Narkhede


On Feb. 23, 2015, 5:11 a.m., Jun Rao wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31260/
> ---
> 
> (Updated Feb. 23, 2015, 5:11 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: kafka-1971
> https://issues.apache.org/jira/browse/kafka-1971
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> address review comments
> 
> 
> remove unused util method
> 
> 
> add unit test
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 4acdd70fe9c1ee78d6510741006c2ece65450671 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 7e5ddcb9be8fcef3df6ebc82a13ef44ef95f73ae 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> c78a1b6ff4213e13cabccd21a7b40cfeddbfb237 
>   core/src/test/scala/unit/kafka/server/ConflictBrokerRegistrationTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31260/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jun Rao
> 
>



Re: Review Request 30196: Patch for KAFKA-1886

2015-02-23 Thread Aditya Auradkar


> On Feb. 7, 2015, 4:22 p.m., Neha Narkhede wrote:
> > core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala, line 295
> > 
> >
> > Why do you need the sleep here? We try to avoid blindly sleeping in 
> > Kafka tests since it almost always leads to transient test failures. 
> > Consider using TestUtils.waitUntilTrue().

Thanks Neha. I missed this review comment.

I agree sleeping isn't ideal here but I don't think there is a condition I can 
wait on to trigger this specific exception. The client has to be waiting on a 
response from the server. I'm not even sure that this testcase needs to exist 
in PrimitiveApiTest since it isn't testing an API. Can you suggest a better 
place to put it, if it makes sense to keep it at all?


- Aditya


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30196/#review71556
---


On Feb. 2, 2015, 9:57 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30196/
> ---
> 
> (Updated Feb. 2, 2015, 9:57 p.m.)
> 
> 
> Review request for kafka and Joel Koshy.
> 
> 
> Bugs: KAFKA-1886
> https://issues.apache.org/jira/browse/KAFKA-1886
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Fixing KAFKA-1886. SimpleConsumer should not swallow 
> ClosedByInterruptException
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> cbef84ac76e62768981f74e71d451f2bda995275 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> aeb7a19acaefabcc161c2ee6144a56d9a8999a81 
> 
> Diff: https://reviews.apache.org/r/30196/diff/
> 
> 
> Testing
> ---
> 
> Added an integration test to PrimitiveAPITest.scala.
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



[jira] [Commented] (KAFKA-1796) Sanity check partition command line tools

2015-02-23 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333536#comment-14333536
 ] 

Mayuresh Gharat commented on KAFKA-1796:


[~nehanarkhede] Sure. Actually I went through the code and saw that the case 
for duplicate replicas for the same topic partition case has been handled (we 
were using the older one and came across this bug). But of course we can test 
this new reassignment tool once its ready. Thanks.

> Sanity check partition command line tools
> -
>
> Key: KAFKA-1796
> URL: https://issues.apache.org/jira/browse/KAFKA-1796
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>  Labels: newbie
> Fix For: 0.8.3
>
>
> We need to sanity check the input json has the valid values before triggering 
> the admin process. For example, we have seen a scenario where the json input 
> for partition reassignment tools have partition replica info as {broker-1, 
> broker-1, broker-2} and it is still accepted in ZK and eventually lead to 
> under replicated count, etc. This is partially because we use a Map rather 
> than a Set reading the json input for this case; but in general we need to 
> make sure the input parameters like Json  needs to be valid before writing it 
> to ZK.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing

2015-02-23 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333528#comment-14333528
 ] 

Ashish Kumar Singh commented on KAFKA-1856:
---

[~charmalloc] were you able to get this work? I do not have access to see 
configuration of the job, https://builds.apache.org/job/KafkaPreCommit/.

> Add PreCommit Patch Testing
> ---
>
> Key: KAFKA-1856
> URL: https://issues.apache.org/jira/browse/KAFKA-1856
> Project: Kafka
>  Issue Type: Task
>Reporter: Ashish Kumar Singh
>Assignee: Ashish Kumar Singh
> Attachments: KAFKA-1845.result.txt, KAFKA-1856.patch, 
> KAFKA-1856_2015-01-18_21:43:56.patch, KAFKA-1856_2015-02-04_14:57:05.patch, 
> KAFKA-1856_2015-02-04_15:44:47.patch
>
>
> h1. Kafka PreCommit Patch Testing - *Don't wait for it to break*
> h2. Motivation
> *With great power comes great responsibility* - Uncle Ben. As Kafka user list 
> is growing, mechanism to ensure quality of the product is required. Quality 
> becomes hard to measure and maintain in an open source project, because of a 
> wide community of contributors. Luckily, Kafka is not the first open source 
> project and can benefit from learnings of prior projects.
> PreCommit tests are the tests that are run for each patch that gets attached 
> to an open JIRA. Based on tests results, test execution framework, test bot, 
> +1 or -1 the patch. Having PreCommit tests take the load off committers to 
> look at or test each patch.
> h2. Tests in Kafka
> h3. Unit and Integraiton Tests
> [Unit and Integration 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests]
>  are cardinal to help contributors to avoid breaking existing functionalities 
> while adding new functionalities or fixing older ones. These tests, atleast 
> the ones relevant to the changes, must be run by contributors before 
> attaching a patch to a JIRA.
> h3. System Tests
> [System 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] 
> are much wider tests that, unlike unit tests, focus on end-to-end scenarios 
> and not some specific method or class.
> h2. Apache PreCommit tests
> Apache provides a mechanism to automatically build a project and run a series 
> of tests whenever a patch is uploaded to a JIRA. Based on test execution, the 
> test framework will comment with a +1 or -1 on the JIRA.
> You can read more about the framework here:
> http://wiki.apache.org/general/PreCommitBuilds
> h2. Plan
> # Create a test-patch.py script (similar to the one used in Flume, Sqoop and 
> other projects) that will take a jira as a parameter, apply on the 
> appropriate branch, build the project, run tests and report results. This 
> script should be committed into the Kafka code-base. To begin with, this will 
> only run unit tests. We can add code sanity checks, system_tests, etc in the 
> future.
> # Create a jenkins job for running the test (as described in 
> http://wiki.apache.org/general/PreCommitBuilds) and validate that it works 
> manually. This must be done by a committer with Jenkins access.
> # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ 
> to add Kafka to the list of projects PreCommit-Admin triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1964) testPartitionReassignmentCallback hangs occasionally

2015-02-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333520#comment-14333520
 ] 

Guozhang Wang commented on KAFKA-1964:
--

Another reason for this issue is that the IllegalGeneration error is not 
implemented, and hence when coordinator migrates, the new heartbeat sent to the 
new coordinator will not return this error, and hence not triggering a new 
rebalance.

>  testPartitionReassignmentCallback hangs occasionally
> -
>
> Key: KAFKA-1964
> URL: https://issues.apache.org/jira/browse/KAFKA-1964
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>  Labels: newbie++
> Attachments: stack.out
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/#review73584
---


It also makes a lot of sense to disallow setting a compacted topic to 
uncompacted and vice versa without deleting the topic. Was there a reason to 
not include that change here or are you planning on including it in your 
follow-up patch?


core/src/main/scala/kafka/log/LogCleaner.scala


Is it necessary to log this in WARN? It seems like if you hit this issue on 
the broker, you will know through the jmx value anyway and the WARN message 
will just keep polluting the logs till the issue is fixed. Maybe turn it down 
to DEBUG?


- Neha Narkhede


On Feb. 23, 2015, 2:43 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31306/
> ---
> 
> (Updated Feb. 23, 2015, 2:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1755
> https://issues.apache.org/jira/browse/KAFKA-1755
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Add compacted topic constraint checks - reject unkeyed messages; reject 
> compressed messages if topic's broker-side compression is not uncompressed
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/Log.scala 
> 846023bb98d0fa0603016466360c97071ac935ea 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/log/LogCleanerManager.scala 
> fd87d90597981c867a9b23731fca3b555bf85b7f 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 83d52643028c5628057dc0aa29819becfda61fdb 
>   core/src/test/scala/unit/kafka/log/CleanerTest.scala 
> d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
>   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
> 73a26377eb63ab9989698e0491049434f032cba2 
> 
> Diff: https://reviews.apache.org/r/31306/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



[jira] [Commented] (KAFKA-1971) starting a broker with a conflicting id will delete the previous broker registration

2015-02-23 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333482#comment-14333482
 ] 

Neha Narkhede commented on KAFKA-1971:
--

Excellent, thanks for looking into this!

> starting a broker with a conflicting id will delete the previous broker 
> registration
> 
>
> Key: KAFKA-1971
> URL: https://issues.apache.org/jira/browse/KAFKA-1971
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Jun Rao
>Assignee: Jun Rao
>Priority: Blocker
> Fix For: 0.8.2.1
>
> Attachments: kafka-1971_2015-02-22_21:11:52.patch
>
>
> This issue can be easily reproduced by the following steps.
> 1. Start broker 1.
> 2. Start broker 2 with the same id as broker 1 (configure different port, log 
> dir).
> Broker 2's registration will fail. However, broker 1's registration in ZK is 
> now deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1907) ZkClient can block controlled shutdown indefinitely

2015-02-23 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333474#comment-14333474
 ] 

Neha Narkhede commented on KAFKA-1907:
--

Thanks for the update! No rush.

> ZkClient can block controlled shutdown indefinitely
> ---
>
> Key: KAFKA-1907
> URL: https://issues.apache.org/jira/browse/KAFKA-1907
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 0.8.2.0
>Reporter: Ewen Cheslack-Postava
>Assignee: jaikiran pai
> Attachments: KAFKA-1907.patch
>
>
> There are some calls to ZkClient via ZkUtils in 
> KafkaServer.controlledShutdown() that can block indefinitely because they 
> internally call waitUntilConnected. The ZkClient API doesn't provide an 
> alternative with timeouts, so fixing this will require enforcing timeouts in 
> some other way.
> This may be a more general issue if there are any non daemon threads that 
> also call ZkUtils methods.
> Stacktrace showing the issue:
> {code}
> "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition [0x6ad69000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x70a93368> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:267)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
> at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
> at 
> kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:194)
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:269)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:45)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
> at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:42)
> at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1865) Add a flush() call to the new producer API

2015-02-23 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333471#comment-14333471
 ] 

Neha Narkhede commented on KAFKA-1865:
--

+1 :-)

> Add a flush() call to the new producer API
> --
>
> Key: KAFKA-1865
> URL: https://issues.apache.org/jira/browse/KAFKA-1865
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1865.patch, KAFKA-1865_2015-02-21_15:36:54.patch, 
> KAFKA-1865_2015-02-22_16:26:46.patch
>
>
> The postconditions of this would be that any record enqueued prior to flush() 
> would have completed being sent (either successfully or not).
> An open question is whether you can continue sending new records while this 
> call is executing (on other threads).
> We should only do this if it doesn't add inefficiencies for people who don't 
> use it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2015-02-23 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-527:

Component/s: compression

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
> Attachments: KAFKA-527.message-copy.history, 
> java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1977) Make logEndOffset available in the Zookeeper consumer

2015-02-23 Thread Will Funnell (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Will Funnell updated KAFKA-1977:

Status: Patch Available  (was: Open)

> Make logEndOffset available in the Zookeeper consumer
> -
>
> Key: KAFKA-1977
> URL: https://issues.apache.org/jira/browse/KAFKA-1977
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Will Funnell
>Priority: Minor
> Attachments: 
> Make_logEndOffset_available_in_the_Zookeeper_consumer.patch
>
>
> The requirement is to create a snapshot from the Kafka topic but NOT do 
> continual reads after that point. For example you might be creating a backup 
> of the data to a file.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps 
> was to expose the high watermark, as maxEndOffset, from the FetchResponse 
> object through to each MessageAndMetadata object in order to be aware when 
> the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the 
> PartitionTopicInfo, which is updated when a new message arrives in the 
> ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1977) Make logEndOffset available in the Zookeeper consumer

2015-02-23 Thread Will Funnell (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Will Funnell updated KAFKA-1977:

Attachment: Make_logEndOffset_available_in_the_Zookeeper_consumer.patch

> Make logEndOffset available in the Zookeeper consumer
> -
>
> Key: KAFKA-1977
> URL: https://issues.apache.org/jira/browse/KAFKA-1977
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Will Funnell
>Priority: Minor
> Attachments: 
> Make_logEndOffset_available_in_the_Zookeeper_consumer.patch
>
>
> The requirement is to create a snapshot from the Kafka topic but NOT do 
> continual reads after that point. For example you might be creating a backup 
> of the data to a file.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps 
> was to expose the high watermark, as maxEndOffset, from the FetchResponse 
> object through to each MessageAndMetadata object in order to be aware when 
> the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the 
> PartitionTopicInfo, which is updated when a new message arrives in the 
> ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1977) Make logEndOffset available in the Zookeeper consumer

2015-02-23 Thread Will Funnell (JIRA)
Will Funnell created KAFKA-1977:
---

 Summary: Make logEndOffset available in the Zookeeper consumer
 Key: KAFKA-1977
 URL: https://issues.apache.org/jira/browse/KAFKA-1977
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Will Funnell
Priority: Minor


The requirement is to create a snapshot from the Kafka topic but NOT do 
continual reads after that point. For example you might be creating a backup of 
the data to a file.

In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps 
was to expose the high watermark, as maxEndOffset, from the FetchResponse 
object through to each MessageAndMetadata object in order to be aware when the 
consumer has reached the end of each partition.

The submitted patch achieves this by adding the maxEndOffset to the 
PartitionTopicInfo, which is updated when a new message arrives in the 
ConsumerFetcherThread and then exposed in MessageAndMetadata.

See here for discussion:
http://search-hadoop.com/m/4TaT4TpJy71




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1964) testPartitionReassignmentCallback hangs occasionally

2015-02-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333437#comment-14333437
 ] 

Guozhang Wang commented on KAFKA-1964:
--

I think this is related to KAFKA-1948, and my submitted patch does not fully 
solve the problem. We should really create the topic with replication so 
shutting down the hosting broker of the topic will not block the test. I will 
try to fix this as part of KAFKA-1910.

>  testPartitionReassignmentCallback hangs occasionally
> -
>
> Key: KAFKA-1964
> URL: https://issues.apache.org/jira/browse/KAFKA-1964
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>  Labels: newbie++
> Attachments: stack.out
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1755) Improve error handling in log cleaner

2015-02-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1489#comment-1489
 ] 

Joel Koshy commented on KAFKA-1755:
---

Also, I have an incremental patch that prevents the log cleaner from quitting 
due to uncaught errors while cleaning a specific partition. It basically moves 
that partition to a permanent failed state and allows the cleaner to continue 
compacting other partitions. It  continues to include the failed partition 
when computing the max dirty ratio so you can still accurately alert on that 
metric. We can discuss whether we want to add that or not.

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1755.patch
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1755) Improve error handling in log cleaner

2015-02-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1487#comment-1487
 ] 

Joel Koshy commented on KAFKA-1755:
---

I thought a bit more about this and here is a patch that summarizes my thoughts.

This patch does message validation on arrival, and drops unkeyed messages 
during log compaction.

I actually think it is better to reject invalid messages (unkeyed and for now 
compressed) up front as opposed to accepting those messages and only 
dropping/warning during compaction. This way the producer is given early 
indication via a client-side error that it is doing something wrong which is 
better than just a broker-side warning/invalid metric. We still need to deal 
with unkeyed messages that may already be in the log but that is orthogonal I 
think - this includes the case when you change a non-compacted topic to be 
compacted. That  is perhaps an invalid operation - i.e., you should ideally 
delete the topic before doing that, but in any event this patch handles that 
case by deleting invalid messages during log compaction.

Case in point: at LinkedIn we use Kafka-based offset management for some of our 
consumers. We recently discovered compressed messages in the offsets topic 
which caused the log cleaner to quit. We saw this issue in the past with Samza 
checkpoint topics and suspected that  Samza was doing something wrong. However, 
after seeing it in the __consumer_offsets topic it is more likely to be an 
actual bug in the broker - either in the log cleaner itself, or even at the 
lower level byte-buffer message set API level. We currently do not know. If we 
at least reject invalid messages on arrival we can rule out clients as being 
the issue.

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1755.patch
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1755) Improve error handling in log cleaner

2015-02-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1755:
--
Attachment: KAFKA-1755.patch

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1755.patch
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 31306: Patch for KAFKA-1755

2015-02-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31306/
---

Review request for kafka.


Bugs: KAFKA-1755
https://issues.apache.org/jira/browse/KAFKA-1755


Repository: kafka


Description
---

Add compacted topic constraint checks - reject unkeyed messages; reject 
compressed messages if topic's broker-side compression is not uncompressed


Diffs
-

  core/src/main/scala/kafka/log/Log.scala 
846023bb98d0fa0603016466360c97071ac935ea 
  core/src/main/scala/kafka/log/LogCleaner.scala 
f8e7cd5fabce78c248a9027c4bb374a792508675 
  core/src/main/scala/kafka/log/LogCleanerManager.scala 
fd87d90597981c867a9b23731fca3b555bf85b7f 
  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
f46ad5cbbbad77d8d1f490d1f8aac97858da9b06 
  core/src/main/scala/kafka/server/OffsetManager.scala 
83d52643028c5628057dc0aa29819becfda61fdb 
  core/src/test/scala/unit/kafka/log/CleanerTest.scala 
d10e4f4ccbca5e50d81a243d3ab30cc7314b7fef 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
  core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
73a26377eb63ab9989698e0491049434f032cba2 

Diff: https://reviews.apache.org/r/31306/diff/


Testing
---


Thanks,

Joel Koshy



[jira] [Updated] (KAFKA-1755) Improve error handling in log cleaner

2015-02-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1755:
--
Status: Patch Available  (was: Open)

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1755.patch
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1755) Improve error handling in log cleaner

2015-02-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1460#comment-1460
 ] 

Joel Koshy commented on KAFKA-1755:
---

Created reviewboard https://reviews.apache.org/r/31306/
 against branch origin/trunk

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1755.patch
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1379) Partition reassignment resets clock for time-based retention

2015-02-23 Thread Moritz Siuts (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1438#comment-1438
 ] 

Moritz Siuts commented on KAFKA-1379:
-

This also happens when a broker dies and loses it's data. 

When the broker comes back without any data it will use more and more disk 
space until it doubles the used disk space until the retention kicks in and the 
usage drops to normal.

IMHO this is pretty bad for disaster scenarios, so I would like to see a higher 
prio on this.


> Partition reassignment resets clock for time-based retention
> 
>
> Key: KAFKA-1379
> URL: https://issues.apache.org/jira/browse/KAFKA-1379
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>
> Since retention is driven off mod-times reassigned partitions will result in
> data that has been on a leader to be retained for another full retention
> cycle. E.g., if retention is seven days and you reassign partitions on the
> sixth day then those partitions will remain on the replicas for another
> seven days.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: How to get a JIRA assigned

2015-02-23 Thread Joe Stein
Jay,

I thought it was the same issue like with confluence and comments and why
we have to grant rights for that. Bots coming and reassigning everything to
them or something in JIRA.

We could ask/open a ticket with INFRA, if nothing else maybe help come up
with a different way to solve it.

~ Joestein

On Sun, Feb 22, 2015 at 7:31 PM, Jay Kreps  wrote:

> Anyone know if there a way to turn this off? Is it possible to configure
> JIRA to let anyone assign them? Unlike the other lockdown stuff which
> prevents spam this doesn't seem like it could be a spam vector and it would
> be awesome to make it easier for people.
>
> -Jay
>
> On Sun, Feb 22, 2015 at 3:43 PM, Guozhang Wang  wrote:
>
> > Hi Jonathan,
> >
> > You need to be added to the "contributor" list before can be assigned to
> > jiras, and only committers can do that for you.
> >
> > I have just add you to the list so you should be able to assign yourself
> > now.
> >
> > Guozhang
> >
> > On Sun, Feb 22, 2015 at 3:08 PM, Jonathan Rafalski <
> > jonathan.rafal...@gmail.com> wrote:
> >
> > > Hello,
> > >
> > >   I was wondering if there are any rights to be able to assign JIRA
> > > tickets to myself?  I found what I think is a bug while working on 1679
> > so
> > > I opened a ticket and was going to assign a review board for both with
> my
> > > solution but now some else has attempted a patch.  Just want to be able
> > to
> > > assign a ticket to me so time isn't wasted.
> > >
> > > If it is something that I need to be granted after submitting a few
> > > patches that are accepted can someone at least assign 1679 and 1972 to
> me
> > > so nobody else attempts to work while I am?
> > >
> > > Thanks!
> > >
> > > Jonathan.
> > >
> > > Sent from my iPhone
> >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


[jira] [Commented] (KAFKA-1680) JmxTool exits if no arguments are given

2015-02-23 Thread Jonathan Rafalski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333188#comment-14333188
 ] 

Jonathan Rafalski commented on KAFKA-1680:
--

I understand what you are saying here and usually I would agree that if we have 
default working values for all arguments we should be able to run the tool with 
no actual args, but in the case the "default" value for the JMX-URL will never 
work as it doesn't even specify a host name in the URL and the port number used 
is random.  

For a user to turn on remote JMX on their JVM they need to specifically feed 
the remote option when kicking off the JVM explicitly giving the hostname and 
port they want to use to allow remote connections.  

Since there is no default for a JVM that will auto turn on remote JMX on a 
default port, a default value for this JMX tool's URL cannot be considered a 
valid default setting and really is only there to prevent an exception when run 
without it.  

In my opinion if you believe we SHOULD give a default value and allow running 
with no args then we should change the default value to have the hostname of 
the local machine and then output a message saying that no JMX URL was given 
and then print the default value that was used.  This would allow the user to 
properly troubleshoot why it might not be working and/or turn on JMX on their 
local using our default values so it would work.

> JmxTool exits if no arguments are given
> ---
>
> Key: KAFKA-1680
> URL: https://issues.apache.org/jira/browse/KAFKA-1680
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Ryan Berdeen
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-1680.patch
>
>
> JmxTool has no required arguments, but it exits if no arguments are provided. 
> You can work around this by passing a non-option argument, which will be 
> ignored, e.g.{{./bin/kafka-run-class.sh kafka.tools.JmxTool xxx}}.
> It looks like this was broken in KAFKA-1291 / 
> 6b0ae4bba0d0f8e4c8da19de65a8f03f162bec39



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1679) JmxTool outputs nothing if any mbean attributes can't be retrieved

2015-02-23 Thread Jonathan Rafalski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Rafalski reassigned KAFKA-1679:


Assignee: Jonathan Rafalski

> JmxTool outputs nothing if any mbean attributes can't be retrieved
> --
>
> Key: KAFKA-1679
> URL: https://issues.apache.org/jira/browse/KAFKA-1679
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ryan Berdeen
>Assignee: Jonathan Rafalski
>Priority: Minor
>  Labels: newbie
>
> JmxTool counts the number of attributes for all MBeans and if the number of 
> attributes retrieved does not equal this number, nothing is printed.
> Several {{java.lang:type=MemoryPool}} MBeans have unsupported attributes (see 
> HADOOP-8027, for example), so running JmxTool with no arguments fails to 
> fetch these metrics and outputs nothing while continuing to run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1972) JMX Tool output for CSV format does not handle attributes with comma in their value

2015-02-23 Thread Jonathan Rafalski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Rafalski reassigned KAFKA-1972:


Assignee: Jonathan Rafalski

> JMX Tool output for CSV format does not handle attributes with comma in their 
> value
> ---
>
> Key: KAFKA-1972
> URL: https://issues.apache.org/jira/browse/KAFKA-1972
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Jonathan Rafalski
>Assignee: Jonathan Rafalski
>Priority: Minor
>  Labels: newbie
>
> When the JMXTools outputs all attributes using a comma delimitation it does 
> not have an exit character or a way to handle attributes that contain comma's 
> in their value.  This could potentially limit the uses of the output to 
> single value attributes only.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)