[jira] [Commented] (KAFKA-260) Add audit trail to kafka

2014-10-27 Thread Stas Levin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14185220#comment-14185220
 ] 

Stas Levin commented on KAFKA-260:
--

Hi guys,

We've adopted the data model above in Aletheia 
(https://github.com/outbrain/Aletheia), an open source data delivery framework 
we've been working on here at Outbrain. 
In Aletheia we call these audit trails "Breadcrumbs", and have them generated 
by the producer and consumer sides. We're working towards integrating the above 
mentioned patch in order to provide a client side dashboard.

Aletheia is by no means meant to replace Kafka, it is rather an abstraction 
layer on top of Kafka and other messaging systems, as we point out in the wiki.
Having audit capabilities built into Kafka would be really great, meanwhile, 
you're most welcome to check out Aletheia, perhaps you'll find it useful as it 
provides the Breadcrumb generation out of the box.

-Stas

> Add audit trail to kafka
> 
>
> Key: KAFKA-260
> URL: https://issues.apache.org/jira/browse/KAFKA-260
> Project: Kafka
>  Issue Type: New Feature
>Affects Versions: 0.8.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure 
> all data is delivered to all consumers of data. This works by having each 
> logical "tier" through which data passes produce messages to a central 
> "audit-trail" topic; these messages give a time period and the number of 
> messages that passed through that tier in that time period. Example of tiers 
> for data might be "producer", "broker", "hadoop-etl", etc. This makes it 
> possible to compare the total events for a given time period to ensure that 
> all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that 
> "balances the books" and checks that all data is consumed in a timely 
> fashion. This gives graphs for each topic and shows any data loss and the lag 
> at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of 
> reconciliation automatically for all the topics kafka hosts against all the 
> tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following 
> format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 130172640, // the beginning of the time bucket this 
> data applies to
>   "bucket_end": 130172700, // the end of the time bucket this data 
> applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was 
> sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for 
> this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on 
> machine processing the event. Using this timestamp means that all downstream 
> consumers will report audit data on the right time bucket. This means that 
> there must be a timestamp in the event, which we don't currently require. 
> Arguably we should just add a timestamp to the events, but I think it is 
> sufficient for now just to allow the user to provide a function to extract 
> the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity 
> based on the least common multiple of the bucket size used by all tiers. The 
> simplest is just to configure them all to use the same bucket size. We 
> currently use a bucket size of 10 mins, but anything from 1-60 mins is 
> probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do 
> reconciliation against this count (e.g. if another tier has less, that is 
> treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an 
> audit message. It also makes false negatives possible since if you lose both 
> normal messages and the associated audit messages it will appear that 
> everything adds up. The later problem is astronomically unlikely to happen 
> exactly, though.
> This would integrate into the client (producer and consumer both) in the 
> following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and 
> application name as part of

Re: Review Request 26885: Patch for KAFKA-1642

2014-10-27 Thread Ewen Cheslack-Postava


> On Oct. 27, 2014, 12:13 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 122
> > 
> >
> > The comments "When connecting or connected, this handles slow/stalled 
> > connections" here are a bit misleading: after checking the code I realize 
> > connectionDelay is only triggered to detemine the delay in milis that we 
> > can re-check connectivity for node that is not connected, and hence if the 
> > node is connected again while we are determining its delay, we just set it 
> > to MAX.
> > 
> > Instead of making it general to the KafkaClient interface, shall we 
> > just add this to the code block of line 155?

It gets triggered any time NetworkClient.ready returns false for a node. The 
obvious case is that it will return "not ready" when disconnected, but it also 
does so when connecting or when connected but inFlightRequests.canSendMore() 
returns false (thus the mention of "slow/stalled connections". The important 
thing is that the value returned *is* MAX_VALUE in those latter cases because 
neither one will be resolved by polling -- they both require an external event 
(connection established/failed or outstanding request receives a response) 
which should wake up the event loop when there's something to do. That keeps us 
from polling unnecessarily. Previously there were conditions in which 
connections in these states could trigger busy waiting of the poll loop.

I don't think we can get the same effect just inlining the code because it uses 
state that's only available through ClusterConnectionStates, which is private 
to NetworkClient. The KafkaClient only exposes the higher level concept of 
"ready".


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review58575
---


On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26885/
> ---
> 
> (Updated Oct. 23, 2014, 11:19 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1642
> https://issues.apache.org/jira/browse/KAFKA-1642
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Fixes two issues with the computation of ready nodes and poll timeouts in
> Sender/RecordAccumulator:
> 
> 1. The timeout was computed incorrectly because it took into account all 
> nodes,
> even if they had data to send such that their timeout would be 0. However, 
> nodes
> were then filtered based on whether it was possible to send (i.e. their
> connection was still good) which could result in nothing to send and a 0
> timeout, resulting in busy looping. Instead, the timeout needs to be computed
> only using data that cannot be immediately sent, i.e. where the timeout will 
> be
> greater than 0. This timeout is only used if, after filtering by whether
> connections are ready for sending, there is no data to be sent. Other events 
> can
> wake the thread up earlier, e.g. a client reconnects and becomes ready again.
> 
> 2. One of the conditions indicating whether data is sendable is whether a
> timeout has expired -- either the linger time or the retry backoff. This
> condition wasn't accounting for both cases properly, always using the linger
> time. This means the retry backoff was probably not being respected.
> 
> KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
> none can send data because they are in a connection backoff period.
> 
> 
> Addressing Jun's comments.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> d304660f29246e9600efe3ddb28cfcc2b074bed3 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 29658d4a15f112dc0af5ce517eaab93e6f00134b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  c5d470011d334318d5ee801021aadd0c000974a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> aae8d4a1e98279470587d397cc779a9baf6fee6c 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  0762b35abba0551f23047348c5893bb8c9acff14 
> 
> Diff: https://reviews.apache.org/r/26885/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



Review Request 27232: Patch for KAFKA-559

2014-10-27 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27232/
---

Review request for kafka.


Bugs: KAFKA-559
https://issues.apache.org/jira/browse/KAFKA-559


Repository: kafka


Description
---

Addressing Joel's comments.


Fix naming: entires -> entries.


Only remove partitions from a group if all partitions were last modified before 
the threshold date.


Diffs
-

  core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/27232/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-559:

Attachment: KAFKA-559.patch

> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Tejas Patil
>  Labels: newbie, project
> Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch
>
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-559:

Assignee: Ewen Cheslack-Postava  (was: Tejas Patil)
  Status: Patch Available  (was: Open)

> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie, project
> Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch
>
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14185377#comment-14185377
 ] 

Ewen Cheslack-Postava commented on KAFKA-559:
-

Created reviewboard https://reviews.apache.org/r/27232/diff/
 against branch origin/trunk

> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Tejas Patil
>  Labels: newbie, project
> Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch
>
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14185380#comment-14185380
 ] 

Ewen Cheslack-Postava commented on KAFKA-559:
-

This is an updated version of the patch by [~tejas.patil]. I'm pretty sure I've 
addressed all the issues [~jjkoshy] brought up.

> Garbage collect old consumer metadata entries
> -
>
> Key: KAFKA-559
> URL: https://issues.apache.org/jira/browse/KAFKA-559
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jay Kreps
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie, project
> Attachments: KAFKA-559.patch, KAFKA-559.v1.patch, KAFKA-559.v2.patch
>
>
> Many use cases involve tranient consumers. These consumers create entries 
> under their consumer group in zk and maintain offsets there as well. There is 
> currently no way to delete these entries. It would be good to have a tool 
> that did something like
>   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
> --zookeeper [zk_connect]
> This would scan through consumer group entries and delete any that had no 
> offset update since the given date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-1732:


 Summary: DumpLogSegments tool fails when path has a '.'
 Key: KAFKA-1732
 URL: https://issues.apache.org/jira/browse/KAFKA-1732
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Priority: Minor


Using DumpLogSegments in a directory that has a '.' that isn't part of the file 
extension causes an exception:

{code}
16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments  --file 
/Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
 --verify-index-only
Dumping 
/Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
Exception in thread "main" java.io.FileNotFoundException: 
/Users/ewencp/kafka.log (No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:146)
at kafka.utils.Utils$.openChannel(Utils.scala:162)
at kafka.log.FileMessageSet.(FileMessageSet.scala:74)
at 
kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
at 
kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
at 
kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.

2014-10-27 Thread Marc Chung (JIRA)
Marc Chung created KAFKA-1733:
-

 Summary: Producer.send will block indeterminately when broker is 
unavailable.
 Key: KAFKA-1733
 URL: https://issues.apache.org/jira/browse/KAFKA-1733
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Reporter: Marc Chung
Assignee: Jun Rao


This is a follow up to the conversation here:

https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E

During ClientUtils.fetchTopicMetadata, if the broker is unavailable, 
socket.connect will block indeterminately. Any retry policy 
(message.send.max.retries) further increases the time spent waiting for the 
socket to connect.

The root fix is to add a connection timeout value to the BlockingChannel's 
socket configuration, like so:

{noformat}
-channel.socket.connect(new InetSocketAddress(host, port))
+channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
{noformat}

The simplest thing to do here would be to have a constant, default value that 
would be applied to every socket configuration. 

Is that acceptable? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.

2014-10-27 Thread Marc Chung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14185539#comment-14185539
 ] 

Marc Chung commented on KAFKA-1733:
---

I have a patch (work in progress) here: 
https://github.com/mchung/kafka/commit/87b8ddbfe23dc887f56fa6f9ea3669733933c49b

> Producer.send will block indeterminately when broker is unavailable.
> 
>
> Key: KAFKA-1733
> URL: https://issues.apache.org/jira/browse/KAFKA-1733
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Reporter: Marc Chung
>Assignee: Jun Rao
>
> This is a follow up to the conversation here:
> https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E
> During ClientUtils.fetchTopicMetadata, if the broker is unavailable, 
> socket.connect will block indeterminately. Any retry policy 
> (message.send.max.retries) further increases the time spent waiting for the 
> socket to connect.
> The root fix is to add a connection timeout value to the BlockingChannel's 
> socket configuration, like so:
> {noformat}
> -channel.socket.connect(new InetSocketAddress(host, port))
> +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
> {noformat}
> The simplest thing to do here would be to have a constant, default value that 
> would be applied to every socket configuration. 
> Is that acceptable? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 27238: Patch for KAFKA-1732

2014-10-27 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27238/
---

Review request for kafka.


Bugs: KAFKA-1732
https://issues.apache.org/jira/browse/KAFKA-1732


Repository: kafka


Description
---

KAFKA-1732 Handle paths with '.' properly in DumpLogSegments.


Diffs
-

  core/src/main/scala/kafka/tools/DumpLogSegments.scala 
8e9d47b8d4adc5754ed8861aa04ddd3c6b629e3d 

Diff: https://reviews.apache.org/r/27238/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Commented] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14185586#comment-14185586
 ] 

Ewen Cheslack-Postava commented on KAFKA-1732:
--

Created reviewboard https://reviews.apache.org/r/27238/diff/
 against branch origin/trunk

> DumpLogSegments tool fails when path has a '.'
> --
>
> Key: KAFKA-1732
> URL: https://issues.apache.org/jira/browse/KAFKA-1732
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-1732.patch
>
>
> Using DumpLogSegments in a directory that has a '.' that isn't part of the 
> file extension causes an exception:
> {code}
> 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments  --file 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
>  --verify-index-only
> Dumping 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
> Exception in thread "main" java.io.FileNotFoundException: 
> /Users/ewencp/kafka.log (No such file or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at kafka.utils.Utils$.openChannel(Utils.scala:162)
>   at kafka.log.FileMessageSet.(FileMessageSet.scala:74)
>   at 
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
>   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1732:
-
Assignee: Ewen Cheslack-Postava
  Status: Patch Available  (was: Open)

> DumpLogSegments tool fails when path has a '.'
> --
>
> Key: KAFKA-1732
> URL: https://issues.apache.org/jira/browse/KAFKA-1732
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-1732.patch
>
>
> Using DumpLogSegments in a directory that has a '.' that isn't part of the 
> file extension causes an exception:
> {code}
> 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments  --file 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
>  --verify-index-only
> Dumping 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
> Exception in thread "main" java.io.FileNotFoundException: 
> /Users/ewencp/kafka.log (No such file or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at kafka.utils.Utils$.openChannel(Utils.scala:162)
>   at kafka.log.FileMessageSet.(FileMessageSet.scala:74)
>   at 
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
>   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1732:
-
Attachment: KAFKA-1732.patch

> DumpLogSegments tool fails when path has a '.'
> --
>
> Key: KAFKA-1732
> URL: https://issues.apache.org/jira/browse/KAFKA-1732
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-1732.patch
>
>
> Using DumpLogSegments in a directory that has a '.' that isn't part of the 
> file extension causes an exception:
> {code}
> 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments  --file 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
>  --verify-index-only
> Dumping 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
> Exception in thread "main" java.io.FileNotFoundException: 
> /Users/ewencp/kafka.log (No such file or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at kafka.utils.Utils$.openChannel(Utils.scala:162)
>   at kafka.log.FileMessageSet.(FileMessageSet.scala:74)
>   at 
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
>   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka Security?

2014-10-27 Thread Stephenson, John L
List Users,
Does anyone know when/if Kafka security features are being planned?   I 
haven't seen much on the net outside of the following proposal:  
https://cwiki.apache.org/confluence/display/KAFKA/Security.

Thanks!
john


[jira] [Created] (KAFKA-1734) System test metric plotting nonexistent file warnings

2014-10-27 Thread Andrew Olson (JIRA)
Andrew Olson created KAFKA-1734:
---

 Summary: System test metric plotting nonexistent file warnings
 Key: KAFKA-1734
 URL: https://issues.apache.org/jira/browse/KAFKA-1734
 Project: Kafka
  Issue Type: Bug
Reporter: Andrew Olson
Priority: Minor


Running the system tests (trunk code), there are many "The file ... does not 
exist for plotting (metrics)" warning messages, for example,

{noformat}
2014-10-27 14:47:58,478 - WARNING - The file 
/opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/kafka.network.RequestMetrics.Produce-RemoteTimeMs.csv
 does not exist for plotting (metrics)
{noformat}

Looks like the generated metric file names only include the last part of the 
metric, e.g. "Produce-RemoteTimeMs.csv" not 
"kafka.network.RequestMetrics.Produce-RemoteTimeMs.csv".

{noformat}
$ ls 
/opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/*Produce*
/opt/kafka/system_test/replication_testsuite/testcase_0007/logs/broker-3/metrics/Produce-RemoteTimeMs.csv
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Security?

2014-10-27 Thread Gwen Shapira
This is very much work in progress.
You can follow the Jira here to see how it goes:
https://issues.apache.org/jira/browse/KAFKA-1682

On Mon, Oct 27, 2014 at 11:49 AM, Stephenson, John L
 wrote:
> List Users,
> Does anyone know when/if Kafka security features are being planned?   I 
> haven't seen much on the net outside of the following proposal:  
> https://cwiki.apache.org/confluence/display/KAFKA/Security.
>
> Thanks!
> john


[jira] [Updated] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1731:
---
Fix Version/s: 0.8.2
 Assignee: Jun Rao

I made a pass on the site doc to add the new broker side configs (offset 
management related configs will be added in kafka-1729) and the important jmxs. 
This is already committed to svn. I will leave this ticket open for a few more 
days for comments.

> add config/jmx changes in 0.8.2 doc
> ---
>
> Key: KAFKA-1731
> URL: https://issues.apache.org/jira/browse/KAFKA-1731
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8.2
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14185818#comment-14185818
 ] 

Gwen Shapira commented on KAFKA-1731:
-

Any chance you can upload a patch so we can see what changed?

> add config/jmx changes in 0.8.2 doc
> ---
>
> Key: KAFKA-1731
> URL: https://issues.apache.org/jira/browse/KAFKA-1731
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8.2
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1731:
---
Attachment: config-jmx_082.patch

Attached please find the patch.

> add config/jmx changes in 0.8.2 doc
> ---
>
> Key: KAFKA-1731
> URL: https://issues.apache.org/jira/browse/KAFKA-1731
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8.2
>
> Attachments: config-jmx_082.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1733) Producer.send will block indeterminately when broker is unavailable.

2014-10-27 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1733:
-
Reviewer: Jun Rao
Assignee: (was: Jun Rao)

> Producer.send will block indeterminately when broker is unavailable.
> 
>
> Key: KAFKA-1733
> URL: https://issues.apache.org/jira/browse/KAFKA-1733
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Reporter: Marc Chung
>
> This is a follow up to the conversation here:
> https://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3ccaog_4qymoejhkbo0n31+a-ujx0z5unsisd5wbrmn-xtx7gi...@mail.gmail.com%3E
> During ClientUtils.fetchTopicMetadata, if the broker is unavailable, 
> socket.connect will block indeterminately. Any retry policy 
> (message.send.max.retries) further increases the time spent waiting for the 
> socket to connect.
> The root fix is to add a connection timeout value to the BlockingChannel's 
> socket configuration, like so:
> {noformat}
> -channel.socket.connect(new InetSocketAddress(host, port))
> +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
> {noformat}
> The simplest thing to do here would be to have a constant, default value that 
> would be applied to every socket configuration. 
> Is that acceptable? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 27238: Patch for KAFKA-1732

2014-10-27 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27238/#review58723
---

Ship it!


Ship It!

- Neha Narkhede


On Oct. 27, 2014, 6:41 p.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27238/
> ---
> 
> (Updated Oct. 27, 2014, 6:41 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1732
> https://issues.apache.org/jira/browse/KAFKA-1732
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1732 Handle paths with '.' properly in DumpLogSegments.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/DumpLogSegments.scala 
> 8e9d47b8d4adc5754ed8861aa04ddd3c6b629e3d 
> 
> Diff: https://reviews.apache.org/r/27238/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1732:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch. Pushed to trunk and 0.8.2

> DumpLogSegments tool fails when path has a '.'
> --
>
> Key: KAFKA-1732
> URL: https://issues.apache.org/jira/browse/KAFKA-1732
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-1732.patch
>
>
> Using DumpLogSegments in a directory that has a '.' that isn't part of the 
> file extension causes an exception:
> {code}
> 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments  --file 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
>  --verify-index-only
> Dumping 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
> Exception in thread "main" java.io.FileNotFoundException: 
> /Users/ewencp/kafka.log (No such file or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at kafka.utils.Utils$.openChannel(Utils.scala:162)
>   at kafka.log.FileMessageSet.(FileMessageSet.scala:74)
>   at 
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
>   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Joel Koshy


> On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 109
> > 
> >
> > getAndDecrement(sizeFunction.get(e))
> 
> Jiangjie Qin wrote:
> It seems getAndDecrement() does not take argument and will always 
> decrement by 1.

ah yes you are right


> On Oct. 25, 2014, 7:52 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala, line 82
> > 
> >
> > One significant caveat to this approach (and in the timed variant 
> > above) is that if a single large element needs to be enqueued it could 
> > potentially block a number of smaller elements from being enqueued. This 
> > may be okay in the case of mirror maker though but would make it less 
> > useful as a generic utility.
> 
> Jiangjie Qin wrote:
> I'm not sure why the big put could block small ones... It is possible 
> that there is a super big item put into the queue and makes the queue to pass 
> the byte limit by a lot. In that case, all the put will be blocked until a 
> bunch of small messages are taken out of the queue. But it seems to be the 
> purpose of having a byte limit for the queue.

I looked again. Yes you are right. It should not block smaller puts. Now I'm 
going to ask the question from the other side of the table: since you are just 
notifying waiting threads, it is possible for a large put to get starved if 
there are a lot of smaller puts that get notified earlier. To the best of my 
knowledge the JVM does not guarantee fairness in unblocking multiple contending 
threads. Ideally there should be some notion of maximum wait before a put 
attempt takes priority over others. i.e., these are nuances that may be a 
compelling reason to make it a specialized utility within MirrorMaker itself 
since it is not general enough (yet).


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/#review58497
---


On Oct. 27, 2014, 6:50 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26755/
> ---
> 
> (Updated Oct. 27, 2014, 6:50 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1706
> https://issues.apache.org/jira/browse/KAFKA-1706
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> changed arguments name
> 
> 
> correct typo.
> 
> 
> Incorporated Joel's comments. Also fixed negative queue size problem.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/26755/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[DISCUSSION] Nested compression in Kafka?

2014-10-27 Thread Guozhang Wang
Hello folks,

I came across this "testComplexCompressDecompress" in
kafka.message.MessageCompressionTest while I'm working some consumer
decompression optimization. This test checks if nested compression is
supported.

I remember vaguely that some time ago we decide not to support nested
compression at Kafka, and in the new producer's MemoryRecords I also make
this assumption in this iterator implementation. Is that still the case? If
yes shall we remove this test case?

-- Guozhang


[jira] [Created] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1735:


 Summary: MemoryRecords.Iterator needs to handle partial reads from 
compressed stream
 Key: KAFKA-1735
 URL: https://issues.apache.org/jira/browse/KAFKA-1735
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0


Found a bug in the MemoryRecords.Iterator implementation, where 

{code}
stream.read(recordBuffer, 0, size)
{code}

can read less than size'ed bytes, and rest of the recordBuffer would set to 
"\0".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 27256: Fix KAFKA-1735

2014-10-27 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27256/
---

Review request for kafka.


Bugs: KAFKA-1735
https://issues.apache.org/jira/browse/KAFKA-1735


Repository: kafka


Description
---

Handle partial reads from compressed stream


Diffs
-

  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 

Diff: https://reviews.apache.org/r/27256/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Updated] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1735:
-
Status: Patch Available  (was: Open)

> MemoryRecords.Iterator needs to handle partial reads from compressed stream
> ---
>
> Key: KAFKA-1735
> URL: https://issues.apache.org/jira/browse/KAFKA-1735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1735.patch
>
>
> Found a bug in the MemoryRecords.Iterator implementation, where 
> {code}
> stream.read(recordBuffer, 0, size)
> {code}
> can read less than size'ed bytes, and rest of the recordBuffer would set to 
> "\0".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1735:
-
Attachment: KAFKA-1735.patch

> MemoryRecords.Iterator needs to handle partial reads from compressed stream
> ---
>
> Key: KAFKA-1735
> URL: https://issues.apache.org/jira/browse/KAFKA-1735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1735.patch
>
>
> Found a bug in the MemoryRecords.Iterator implementation, where 
> {code}
> stream.read(recordBuffer, 0, size)
> {code}
> can read less than size'ed bytes, and rest of the recordBuffer would set to 
> "\0".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream

2014-10-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186072#comment-14186072
 ] 

Guozhang Wang commented on KAFKA-1735:
--

Created reviewboard https://reviews.apache.org/r/27256/diff/
 against branch origin/trunk

> MemoryRecords.Iterator needs to handle partial reads from compressed stream
> ---
>
> Key: KAFKA-1735
> URL: https://issues.apache.org/jira/browse/KAFKA-1735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1735.patch
>
>
> Found a bug in the MemoryRecords.Iterator implementation, where 
> {code}
> stream.read(recordBuffer, 0, size)
> {code}
> can read less than size'ed bytes, and rest of the recordBuffer would set to 
> "\0".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-27 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/
---

(Updated Oct. 28, 2014, 12:19 a.m.)


Review request for kafka.


Bugs: KAFKA-1647
https://issues.apache.org/jira/browse/KAFKA-1647


Repository: kafka


Description (updated)
---

Addressed Joel's comments.


the version 2 code seems to be submitted by mistake... This should be the code 
for review that addressed Joel's comments.


Addressed Jun's comments. Will do tests to verify if it works.


Addressed Joel's comments, we do not need to check the if leader exits for not 
when adding fetcher.


Diffs (updated)
-

  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 

Diff: https://reviews.apache.org/r/26373/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-10-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186104#comment-14186104
 ] 

Jiangjie Qin commented on KAFKA-1647:
-

Updated reviewboard https://reviews.apache.org/r/26373/diff/
 against branch origin/trunk

> Replication offset checkpoints (high water marks) can be lost on hard kills 
> and restarts
> 
>
> Key: KAFKA-1647
> URL: https://issues.apache.org/jira/browse/KAFKA-1647
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Joel Koshy
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: newbie++
> Fix For: 0.8.2
>
> Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, 
> KAFKA-1647_2014-10-18_00:26:51.patch, KAFKA-1647_2014-10-21_23:08:43.patch, 
> KAFKA-1647_2014-10-27_17:19:07.patch
>
>
> We ran into this scenario recently in a production environment. This can 
> happen when enough brokers in a cluster are taken down. i.e., a rolling 
> bounce done properly should not cause this issue. It can occur if all 
> replicas for any partition are taken down.
> Here is a sample scenario:
> * Cluster of three brokers: b0, b1, b2
> * Two partitions (of some topic) with replication factor two: p0, p1
> * Initial state:
> p0: leader = b0, ISR = {b0, b1}
> p1: leader = b1, ISR = {b0, b1}
> * Do a parallel hard-kill of all brokers
> * Bring up b2, so it is the new controller
> * b2 initializes its controller context and populates its leader/ISR cache 
> (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
> known leaders are b0 (for p0) and b1 (for p2)
> * Bring up b1
> * The controller's onBrokerStartup procedure initiates a replica state change 
> for all replicas on b1 to become online. As part of this replica state change 
> it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
> (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
> leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
> included in the leaders field because b0 is down.
> * On receiving the LeaderAndIsrRequest, b1's replica manager will 
> successfully make itself (b1) the leader for p1 (and create the local replica 
> object corresponding to p1). It will however abort the become follower 
> transition for p0 because the designated leader b0 is offline. So it will not 
> create the local replica object for p0.
> * It will then start the high water mark checkpoint thread. Since only p1 has 
> a local replica object, only p1's high water mark will be checkpointed to 
> disk. p0's previously written checkpoint  if any will be lost.
> So in summary it seems we should always create the local replica object even 
> if the online transition does not happen.
> Possible symptoms of the above bug could be one or more of the following (we 
> saw 2 and 3):
> # Data loss; yes on a hard-kill data loss is expected, but this can actually 
> cause loss of nearly all data if the broker becomes follower, truncates, and 
> soon after happens to become leader.
> # High IO on brokers that lose their high water mark then subsequently (on a 
> successful become follower transition) truncate their log to zero and start 
> catching up from the beginning.
> # If the offsets topic is affected, then offsets can get reset. This is 
> because during an offset load we don't read past the high water mark. So if a 
> water mark is missing then we don't load anything (even if the offsets are 
> there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-10-27 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1647:

Attachment: KAFKA-1647_2014-10-27_17:19:07.patch

> Replication offset checkpoints (high water marks) can be lost on hard kills 
> and restarts
> 
>
> Key: KAFKA-1647
> URL: https://issues.apache.org/jira/browse/KAFKA-1647
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Joel Koshy
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: newbie++
> Fix For: 0.8.2
>
> Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, 
> KAFKA-1647_2014-10-18_00:26:51.patch, KAFKA-1647_2014-10-21_23:08:43.patch, 
> KAFKA-1647_2014-10-27_17:19:07.patch
>
>
> We ran into this scenario recently in a production environment. This can 
> happen when enough brokers in a cluster are taken down. i.e., a rolling 
> bounce done properly should not cause this issue. It can occur if all 
> replicas for any partition are taken down.
> Here is a sample scenario:
> * Cluster of three brokers: b0, b1, b2
> * Two partitions (of some topic) with replication factor two: p0, p1
> * Initial state:
> p0: leader = b0, ISR = {b0, b1}
> p1: leader = b1, ISR = {b0, b1}
> * Do a parallel hard-kill of all brokers
> * Bring up b2, so it is the new controller
> * b2 initializes its controller context and populates its leader/ISR cache 
> (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
> known leaders are b0 (for p0) and b1 (for p2)
> * Bring up b1
> * The controller's onBrokerStartup procedure initiates a replica state change 
> for all replicas on b1 to become online. As part of this replica state change 
> it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
> (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
> leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
> included in the leaders field because b0 is down.
> * On receiving the LeaderAndIsrRequest, b1's replica manager will 
> successfully make itself (b1) the leader for p1 (and create the local replica 
> object corresponding to p1). It will however abort the become follower 
> transition for p0 because the designated leader b0 is offline. So it will not 
> create the local replica object for p0.
> * It will then start the high water mark checkpoint thread. Since only p1 has 
> a local replica object, only p1's high water mark will be checkpointed to 
> disk. p0's previously written checkpoint  if any will be lost.
> So in summary it seems we should always create the local replica object even 
> if the online transition does not happen.
> Possible symptoms of the above bug could be one or more of the following (we 
> saw 2 and 3):
> # Data loss; yes on a hard-kill data loss is expected, but this can actually 
> cause loss of nearly all data if the broker becomes follower, truncates, and 
> soon after happens to become leader.
> # High IO on brokers that lose their high water mark then subsequently (on a 
> successful become follower transition) truncate their log to zero and start 
> catching up from the beginning.
> # If the offsets topic is affected, then offsets can get reset. This is 
> because during an offset load we don't read past the high water mark. So if a 
> water mark is missing then we don't load anything (even if the offsets are 
> there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-27 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/
---

(Updated Oct. 28, 2014, 12:20 a.m.)


Review request for kafka.


Bugs: KAFKA-1647
https://issues.apache.org/jira/browse/KAFKA-1647


Repository: kafka


Description
---

Addressed Joel's comments.


the version 2 code seems to be submitted by mistake... This should be the code 
for review that addressed Joel's comments.


Addressed Jun's comments. Will do tests to verify if it works.


Addressed Joel's comments, we do not need to check the if leader exits for not 
when adding fetcher.


Diffs
-

  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 

Diff: https://reviews.apache.org/r/26373/diff/


Testing (updated)
---

Followed Joel's testing step. I was able to reproduce the problem without the 
patch and the WARN message goes away after applied the patch.


Thanks,

Jiangjie Qin



Jenkins build is back to normal : Kafka-trunk #319

2014-10-27 Thread Apache Jenkins Server
See 



[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-27 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186124#comment-14186124
 ] 

Jun Rao commented on KAFKA-1481:


Vladimir,

Thanks for the patch. Really appreciate your help. I realized that this is one 
of the biggest technical debt that we have accumulated over time. So, it may 
take some time to sort this out. So, bear with me. Some more comments.

30. About Taggable, I still have mixed feelings. I can see why you created it. 
However, my reasoning is that for a lot of the case classes (ClientIdTopic, 
CliendIdAndBroker) that we create, it's weird that some of them are taggable 
and some of them are not, depending whether they are used for tagging metric 
names or not. Those classes have no direct relationships with the metrics. 
Similarly, we only need to be aware of tags when creating metrics. Also, 
because of this, we change the constructor of SimpleConsumer. Since this is an 
API change, we should really try to avoid it. 

My feeling is that it's probably simpler if we just create regular case classes 
as before and generate metric tags explicitly when we create the metric. For 
example, in AbstractFetcherThread, we can do

class FetcherStats(clientIdAndBroker: ClientIdAndBroker) extends 
KafkaMetricsGroup {
  val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS,
Map("cliendId" -> 
clientIdAndBroker.clientId,
"brokerHost" -> 
clientIdAndBroker.host,
"brokerPort" -> 
clientIdAndBroker.port))

and just have ClientIdAndBroker be the following case class.

case class ClientIdAndBroker(clientId: String, host: String, port: Int)

This way, the code is a bit cleaner since all the metric tag related stuff are 
isolated to those places when the metrics are created. So, I'd suggest that we 
remove Taggable.

31. AbstractFetcherThread:
31.1 You changed the meaning of clientId. clientId is used in the fetch request 
and we want to leave it as just the clientId string. Since the clientId should 
be uniquely representing a particular consumer client, we just need to include 
the clientId in the metric name. We don't need to include the consumer id in 
either the fetch request or the metric name since it's too long and has 
redundant info. 
31.2 FetcherLagStats: This is an existing problem. FetcherLagMetrics shouldn't 
be keyed off ClientIdBrokerTopicPartition. It should be keyed off 
ClientIdTopicPartition. This way, the metric name remains the same independent 
of the current leader of those partitions.

32. ZookeeperConsumerConnector:
32.1 FetchQueueSize: I agree that the metric name just needs to be tagged with 
clientId, topic and threadId. We don't need to include the consumerId since 
it's too long (note that topicThread._2 includes both the consumerId and the 
threadId).

33. KafkaMetricsGroup: Duplicate entries.
// kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, 
PartitionTopicInfo}
explicitMetricName("kafka.consumer", "ConsumerTopicMetrics", 
"MessagesPerSec"),
explicitMetricName("kafka.consumer", "ConsumerTopicMetrics", 
"MessagesPerSec"),

// kafka.consumer.ConsumerTopicStats
explicitMetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
explicitMetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),

// kafka.consumer.FetchRequestAndResponseStats <-- 
kafka.consumer.SimpleConsumer
explicitMetricName("kafka.consumer", "FetchRequestAndResponseMetrics", 
"FetchResponseSize"),
explicitMetricName("kafka.consumer", "FetchRequestAndResponseMetrics", 
"FetchRequestRateAndTimeMs"),
explicitMetricName("kafka.consumer", "FetchRequestAndResponseMetrics", 
"FetchResponseSize"),
explicitMetricName("kafka.consumer", "FetchRequestAndResponseMetrics", 
"FetchRequestRateAndTimeMs"),

/**
 * ProducerRequestStats <-- SyncProducer
 * metric for SyncProducer in fetchTopicMetaData() needs to be removed when 
consumer is closed.
 */
explicitMetricName("kafka.producer", "ProducerRequestMetrics", 
"ProducerRequestRateAndTimeMs"),
explicitMetricName("kafka.producer", "ProducerRequestMetrics", 
"ProducerRequestSize"),
explicitMetricName("kafka.producer", "ProducerRequestMetrics", 
"ProducerRequestRateAndTimeMs"),
explicitMetricName("kafka.producer", "ProducerRequestMetrics", 
"ProducerRequestSize")

34. AbstractFetcherManager: Could you put the followings in 2 separate lines? 
Similar things happen in a few other files. Perhaps you need to change the 
formatting in your IDE?

   }, metricPrefix.toTags

  private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers



> Stop using dashes AND underscores as separators in MBean nam

[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-27 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186130#comment-14186130
 ] 

Jay Kreps commented on KAFKA-1501:
--

Nice, so statistically it is 93% likely to be fixed, then!

So since this changes the socket server default is this the right thing to do? 
Could this have any negative side effects in production? I actually don't 
really understand the effect of this option or why lack of it was causing the 
failure. Could you explain?

> transient unit tests failures due to port already in use
> 
>
> Key: KAFKA-1501
> URL: https://issues.apache.org/jira/browse/KAFKA-1501
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jun Rao
>Assignee: Guozhang Wang
>  Labels: newbie
> Attachments: KAFKA-1501.patch, KAFKA-1501.patch
>
>
> Saw the following transient failures.
> kafka.api.ProducerFailureHandlingTest > testTooLargeRecordWithAckOne FAILED
> kafka.common.KafkaException: Socket server failed to bind to 
> localhost:59909: Address already in use.
> at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
> at kafka.network.Acceptor.(SocketServer.scala:141)
> at kafka.network.SocketServer.startup(SocketServer.scala:68)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
> at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
> at 
> kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Joe Stein
The Apache Kafka community is pleased to announce the beta release for Apache 
Kafka 0.8.2.

The 0.8.2-beta release introduces many new features, improvements and fixes 
including:
 - A new Java producer for ease of implementation and enhanced performance.
 - Delete topic support.
 - Per topic configuration of preference for consistency over availability.
 - Scala 2.11 support and dropping support for Scala 2.8.
 - LZ4 Compression.

All of the changes in this release can be found: 
https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html

Apache Kafka is high-throughput, publish-subscribe messaging system rethought 
of as a distributed commit log.

** Fast => A single Kafka broker can handle hundreds of megabytes of reads and 
writes per second from thousands of clients.

** Scalable => Kafka is designed to allow a single cluster to serve as the 
central data backbone 
for a large organization. It can be elastically and transparently expanded 
without downtime. 
Data streams are partitioned and spread over a cluster of machines to allow 
data streams 
larger than the capability of any single machine and to allow clusters of 
co-ordinated consumers.

** Durable => Messages are persisted on disk and replicated within the cluster 
to prevent 
data loss. Each broker can handle terabytes of messages without performance 
impact.

** Distributed by Design => Kafka has a modern cluster-centric design that 
offers 
strong durability and fault-tolerance guarantees.

You can download the release from: http://kafka.apache.org/downloads.html

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at 
http://kafka.apache.org/



Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Jay Kreps
I actually don't see the beta release on that download page:
http://kafka.apache.org/downloads.html

-Jay

On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein  wrote:

> The Apache Kafka community is pleased to announce the beta release for
> Apache Kafka 0.8.2.
>
> The 0.8.2-beta release introduces many new features, improvements and
> fixes including:
>  - A new Java producer for ease of implementation and enhanced performance.
>  - Delete topic support.
>  - Per topic configuration of preference for consistency over availability.
>  - Scala 2.11 support and dropping support for Scala 2.8.
>  - LZ4 Compression.
>
> All of the changes in this release can be found:
> https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
>
> Apache Kafka is high-throughput, publish-subscribe messaging system
> rethought of as a distributed commit log.
>
> ** Fast => A single Kafka broker can handle hundreds of megabytes of reads
> and
> writes per second from thousands of clients.
>
> ** Scalable => Kafka is designed to allow a single cluster to serve as the
> central data backbone
> for a large organization. It can be elastically and transparently expanded
> without downtime.
> Data streams are partitioned and spread over a cluster of machines to
> allow data streams
> larger than the capability of any single machine and to allow clusters of
> co-ordinated consumers.
>
> ** Durable => Messages are persisted on disk and replicated within the
> cluster to prevent
> data loss. Each broker can handle terabytes of messages without
> performance impact.
>
> ** Distributed by Design => Kafka has a modern cluster-centric design that
> offers
> strong durability and fault-tolerance guarantees.
>
> You can download the release from: http://kafka.apache.org/downloads.html
>
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> http://kafka.apache.org/
>
>


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Gwen Shapira
Strange. I'm seeing it.

Browser cache?

On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps  wrote:
> I actually don't see the beta release on that download page:
> http://kafka.apache.org/downloads.html
>
> -Jay
>
> On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein  wrote:
>
>> The Apache Kafka community is pleased to announce the beta release for
>> Apache Kafka 0.8.2.
>>
>> The 0.8.2-beta release introduces many new features, improvements and
>> fixes including:
>>  - A new Java producer for ease of implementation and enhanced performance.
>>  - Delete topic support.
>>  - Per topic configuration of preference for consistency over availability.
>>  - Scala 2.11 support and dropping support for Scala 2.8.
>>  - LZ4 Compression.
>>
>> All of the changes in this release can be found:
>> https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
>>
>> Apache Kafka is high-throughput, publish-subscribe messaging system
>> rethought of as a distributed commit log.
>>
>> ** Fast => A single Kafka broker can handle hundreds of megabytes of reads
>> and
>> writes per second from thousands of clients.
>>
>> ** Scalable => Kafka is designed to allow a single cluster to serve as the
>> central data backbone
>> for a large organization. It can be elastically and transparently expanded
>> without downtime.
>> Data streams are partitioned and spread over a cluster of machines to
>> allow data streams
>> larger than the capability of any single machine and to allow clusters of
>> co-ordinated consumers.
>>
>> ** Durable => Messages are persisted on disk and replicated within the
>> cluster to prevent
>> data loss. Each broker can handle terabytes of messages without
>> performance impact.
>>
>> ** Distributed by Design => Kafka has a modern cluster-centric design that
>> offers
>> strong durability and fault-tolerance guarantees.
>>
>> You can download the release from: http://kafka.apache.org/downloads.html
>>
>> We welcome your help and feedback. For more information on how to
>> report problems, and to get involved, visit the project website at
>> http://kafka.apache.org/
>>
>>


Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/#review58725
---


Another thing I forgot to mention in the earlier review: we definitely should 
have a unit test for this. You will need to allow passing in the Time interface 
and use MockTime in the test.


core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala


Unused



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala


if



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala


no need the return
you can add on line 63:
else {
  false
}

(and remove the false at the very end)

Equivalent, but a little cleaner to look at



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala


Again, this is obviously stylistic, but in small methods like this there is 
little need to return from the middle.

Can you restructure it to something like:

if (...)
  false
else {
  ...
  success
}



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala


Same here


- Joel Koshy


On Oct. 27, 2014, 6:50 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26755/
> ---
> 
> (Updated Oct. 27, 2014, 6:50 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1706
> https://issues.apache.org/jira/browse/KAFKA-1706
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> changed arguments name
> 
> 
> correct typo.
> 
> 
> Incorporated Joel's comments. Also fixed negative queue size problem.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/26755/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Jay Kreps
Yeah it must be a caching thing because others in the same office do see it
(but not all). And ctrl-shift-r doesn't seem to help. Nevermind :-)

-Jay

On Mon, Oct 27, 2014 at 6:00 PM, Gwen Shapira  wrote:

> Strange. I'm seeing it.
>
> Browser cache?
>
> On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps  wrote:
> > I actually don't see the beta release on that download page:
> > http://kafka.apache.org/downloads.html
> >
> > -Jay
> >
> > On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein  wrote:
> >
> >> The Apache Kafka community is pleased to announce the beta release for
> >> Apache Kafka 0.8.2.
> >>
> >> The 0.8.2-beta release introduces many new features, improvements and
> >> fixes including:
> >>  - A new Java producer for ease of implementation and enhanced
> performance.
> >>  - Delete topic support.
> >>  - Per topic configuration of preference for consistency over
> availability.
> >>  - Scala 2.11 support and dropping support for Scala 2.8.
> >>  - LZ4 Compression.
> >>
> >> All of the changes in this release can be found:
> >> https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
> >>
> >> Apache Kafka is high-throughput, publish-subscribe messaging system
> >> rethought of as a distributed commit log.
> >>
> >> ** Fast => A single Kafka broker can handle hundreds of megabytes of
> reads
> >> and
> >> writes per second from thousands of clients.
> >>
> >> ** Scalable => Kafka is designed to allow a single cluster to serve as
> the
> >> central data backbone
> >> for a large organization. It can be elastically and transparently
> expanded
> >> without downtime.
> >> Data streams are partitioned and spread over a cluster of machines to
> >> allow data streams
> >> larger than the capability of any single machine and to allow clusters
> of
> >> co-ordinated consumers.
> >>
> >> ** Durable => Messages are persisted on disk and replicated within the
> >> cluster to prevent
> >> data loss. Each broker can handle terabytes of messages without
> >> performance impact.
> >>
> >> ** Distributed by Design => Kafka has a modern cluster-centric design
> that
> >> offers
> >> strong durability and fault-tolerance guarantees.
> >>
> >> You can download the release from:
> http://kafka.apache.org/downloads.html
> >>
> >> We welcome your help and feedback. For more information on how to
> >> report problems, and to get involved, visit the project website at
> >> http://kafka.apache.org/
> >>
> >>
>


Re: Review Request 27256: Fix KAFKA-1735

2014-10-27 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27256/#review58744
---



clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java


Would it be possible to add a unit test for this?


- Neha Narkhede


On Oct. 27, 2014, 11:59 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27256/
> ---
> 
> (Updated Oct. 27, 2014, 11:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1735
> https://issues.apache.org/jira/browse/KAFKA-1735
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Handle partial reads from compressed stream
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
> 
> Diff: https://reviews.apache.org/r/27256/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Commented] (KAFKA-1731) add config/jmx changes in 0.8.2 doc

2014-10-27 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186179#comment-14186179
 ] 

Gwen Shapira commented on KAFKA-1731:
-

Thanks :)

No comments, it looks good. 

> add config/jmx changes in 0.8.2 doc
> ---
>
> Key: KAFKA-1731
> URL: https://issues.apache.org/jira/browse/KAFKA-1731
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8.2
>
> Attachments: config-jmx_082.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Jun Rao
Joe,

Thanks for driving the release.

Jun

On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein  wrote:

> The Apache Kafka community is pleased to announce the beta release for
> Apache Kafka 0.8.2.
>
> The 0.8.2-beta release introduces many new features, improvements and
> fixes including:
>  - A new Java producer for ease of implementation and enhanced performance.
>  - Delete topic support.
>  - Per topic configuration of preference for consistency over availability.
>  - Scala 2.11 support and dropping support for Scala 2.8.
>  - LZ4 Compression.
>
> All of the changes in this release can be found:
> https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
>
> Apache Kafka is high-throughput, publish-subscribe messaging system
> rethought of as a distributed commit log.
>
> ** Fast => A single Kafka broker can handle hundreds of megabytes of reads
> and
> writes per second from thousands of clients.
>
> ** Scalable => Kafka is designed to allow a single cluster to serve as the
> central data backbone
> for a large organization. It can be elastically and transparently expanded
> without downtime.
> Data streams are partitioned and spread over a cluster of machines to
> allow data streams
> larger than the capability of any single machine and to allow clusters of
> co-ordinated consumers.
>
> ** Durable => Messages are persisted on disk and replicated within the
> cluster to prevent
> data loss. Each broker can handle terabytes of messages without
> performance impact.
>
> ** Distributed by Design => Kafka has a modern cluster-centric design that
> offers
> strong durability and fault-tolerance guarantees.
>
> You can download the release from: http://kafka.apache.org/downloads.html
>
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> http://kafka.apache.org/
>
>


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-27 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review58748
---

Ship it!


Looks good to me. Can you make these final edits and upload another RB?


core/src/main/scala/kafka/server/ReplicaManager.scala


The .format needs to be on this line. Can you fix it and upload a new patch?



core/src/main/scala/kafka/server/ReplicaManager.scala


Small edits:

// Create the local replica even if the leader is unavailable. This is 
required to ensure that we include the partition's high watermark in the 
checkpoint file (see KAFKA-1647)

Also, I'm not sure if we need to explicitly reference the jira in comments 
since people can just git annotate.


- Joel Koshy


On Oct. 28, 2014, 12:20 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26373/
> ---
> 
> (Updated Oct. 28, 2014, 12:20 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1647
> https://issues.apache.org/jira/browse/KAFKA-1647
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Joel's comments.
> 
> 
> the version 2 code seems to be submitted by mistake... This should be the 
> code for review that addressed Joel's comments.
> 
> 
> Addressed Jun's comments. Will do tests to verify if it works.
> 
> 
> Addressed Joel's comments, we do not need to check the if leader exits for 
> not when adding fetcher.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 78b7514cc109547c562e635824684fad581af653 
> 
> Diff: https://reviews.apache.org/r/26373/diff/
> 
> 
> Testing
> ---
> 
> Followed Joel's testing step. I was able to reproduce the problem without the 
> patch and the WARN message goes away after applied the patch.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 26755: Patch for KAFKA-1706

2014-10-27 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/
---

(Updated Oct. 28, 2014, 1:34 a.m.)


Review request for kafka.


Bugs: KAFKA-1706
https://issues.apache.org/jira/browse/KAFKA-1706


Repository: kafka


Description (updated)
---

changed arguments name


correct typo.


Incorporated Joel's comments. Also fixed negative queue size problem.


Incorporated Joel's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/26755/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.

2014-10-27 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1706:

Attachment: KAFKA-1706_2014-10-27_18:34:37.patch

> Adding a byte bounded blocking queue to util.
> -
>
> Key: KAFKA-1706
> URL: https://issues.apache.org/jira/browse/KAFKA-1706
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, 
> KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, 
> KAFKA-1706_2014-10-26_23:50:07.patch, KAFKA-1706_2014-10-27_18:34:37.patch
>
>
> We saw many out of memory issues in Mirror Maker. To enhance memory 
> management we want to introduce a ByteBoundedBlockingQueue that has limit on 
> both number of messages and number of bytes in it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.

2014-10-27 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186196#comment-14186196
 ] 

Jiangjie Qin commented on KAFKA-1706:
-

Updated reviewboard https://reviews.apache.org/r/26755/diff/
 against branch origin/trunk

> Adding a byte bounded blocking queue to util.
> -
>
> Key: KAFKA-1706
> URL: https://issues.apache.org/jira/browse/KAFKA-1706
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, 
> KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, 
> KAFKA-1706_2014-10-26_23:50:07.patch, KAFKA-1706_2014-10-27_18:34:37.patch
>
>
> We saw many out of memory issues in Mirror Maker. To enhance memory 
> management we want to introduce a ByteBoundedBlockingQueue that has limit on 
> both number of messages and number of bytes in it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-27 Thread Libo Yu
Congrats! When do you think the final 0.82 will be released?

> To: annou...@apache.org; us...@kafka.apache.org; dev@kafka.apache.org
> Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
> Date: Tue, 28 Oct 2014 00:50:35 +
> From: joest...@apache.org
> 
> The Apache Kafka community is pleased to announce the beta release for Apache 
> Kafka 0.8.2.
> 
> The 0.8.2-beta release introduces many new features, improvements and fixes 
> including:
>  - A new Java producer for ease of implementation and enhanced performance.
>  - Delete topic support.
>  - Per topic configuration of preference for consistency over availability.
>  - Scala 2.11 support and dropping support for Scala 2.8.
>  - LZ4 Compression.
> 
> All of the changes in this release can be found: 
> https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
> 
> Apache Kafka is high-throughput, publish-subscribe messaging system rethought 
> of as a distributed commit log.
> 
> ** Fast => A single Kafka broker can handle hundreds of megabytes of reads 
> and 
> writes per second from thousands of clients.
> 
> ** Scalable => Kafka is designed to allow a single cluster to serve as the 
> central data backbone 
> for a large organization. It can be elastically and transparently expanded 
> without downtime. 
> Data streams are partitioned and spread over a cluster of machines to allow 
> data streams 
> larger than the capability of any single machine and to allow clusters of 
> co-ordinated consumers.
> 
> ** Durable => Messages are persisted on disk and replicated within the 
> cluster to prevent 
> data loss. Each broker can handle terabytes of messages without performance 
> impact.
> 
> ** Distributed by Design => Kafka has a modern cluster-centric design that 
> offers 
> strong durability and fault-tolerance guarantees.
> 
> You can download the release from: http://kafka.apache.org/downloads.html
> 
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at 
> http://kafka.apache.org/
> 
  

[jira] [Updated] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1732:
-
Fix Version/s: 0.8.2

> DumpLogSegments tool fails when path has a '.'
> --
>
> Key: KAFKA-1732
> URL: https://issues.apache.org/jira/browse/KAFKA-1732
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Fix For: 0.8.2
>
> Attachments: KAFKA-1732.patch
>
>
> Using DumpLogSegments in a directory that has a '.' that isn't part of the 
> file extension causes an exception:
> {code}
> 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments  --file 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
>  --verify-index-only
> Dumping 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
> Exception in thread "main" java.io.FileNotFoundException: 
> /Users/ewencp/kafka.log (No such file or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at kafka.utils.Utils$.openChannel(Utils.scala:162)
>   at kafka.log.FileMessageSet.(FileMessageSet.scala:74)
>   at 
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
>   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1732) DumpLogSegments tool fails when path has a '.'

2014-10-27 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186334#comment-14186334
 ] 

Neha Narkhede commented on KAFKA-1732:
--

Thanks [~charmalloc]. Missed updating the version myself.

> DumpLogSegments tool fails when path has a '.'
> --
>
> Key: KAFKA-1732
> URL: https://issues.apache.org/jira/browse/KAFKA-1732
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Fix For: 0.8.2
>
> Attachments: KAFKA-1732.patch
>
>
> Using DumpLogSegments in a directory that has a '.' that isn't part of the 
> file extension causes an exception:
> {code}
> 16:48 $ time /Users/ewencp/kafka.git/bin/kafka-run-class.sh 
> kafka.tools.DumpLogSegments  --file 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
>  --verify-index-only
> Dumping 
> /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_1/logs/broker-3/kafka_server_3_logs/test_1-1/00016895.index
> Exception in thread "main" java.io.FileNotFoundException: 
> /Users/ewencp/kafka.log (No such file or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at kafka.utils.Utils$.openChannel(Utils.scala:162)
>   at kafka.log.FileMessageSet.(FileMessageSet.scala:74)
>   at 
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:109)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80)
>   at 
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105)
>   at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
>   at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-27 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186350#comment-14186350
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> Thanks,
> Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-27 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186350#comment-14186350
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/28/14 4:40 AM:
-

[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  (Of Course at cost 
of memory).  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 


was (Author: bmis13):
[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurre

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-27 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14186350#comment-14186350
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/28/14 4:58 AM:
-

[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  (Of Course at cost 
of memory).  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!Only think I can think of is do 
*AsynKafkaProducer* as mentioned in previous comments where [~ewencp] mentioned 
that problem will be those threads that are enqueue message at cost of memory, 
thread context switching etc...

Thanks,

Bhavesh 


was (Author: bmis13):
[~jkreps],

I understand the current code base is adding bytes to shared memory and doing 
compression (on application thread).  The older consumer seems to do all this 
in back-ground thread.  So What changed to have this in fore-ground ?  Also, if 
you had to re-engineer this code, How would you  re-engineer to remove 
Synchronization and move everything in background so more runable state is give 
to Application Thread and cost of enqueue will very less.  (Of Course at cost 
of memory).  

I am really interested in solving this problem for my application.  So I just 
wanted to know your suggestions/ideas, how would you solve this ?

Thanks for all your help so far !!  

Thanks,

Bhavesh 

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionTyp