[jira] [Updated] (KAFKA-1506) Cancel "kafka-reassign-partitions" Job

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1506:
-
Labels: newbie++  (was: )

> Cancel "kafka-reassign-partitions" Job
> --
>
> Key: KAFKA-1506
> URL: https://issues.apache.org/jira/browse/KAFKA-1506
> Project: Kafka
>  Issue Type: New Feature
>  Components: replication, tools
>Affects Versions: 0.8.1, 0.8.1.1
>Reporter: Paul Lung
>Assignee: Neha Narkhede
>  Labels: newbie++
>
> I started a reassignment, and for some reason it just takes forever. However, 
> it won¹t let me start another reassignment job while this one is running. So 
> a tool to cancel a reassignment job is needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1506) Cancel "kafka-reassign-partitions" Job

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1506:
-
Reviewer: Neha Narkhede
Assignee: (was: Neha Narkhede)

> Cancel "kafka-reassign-partitions" Job
> --
>
> Key: KAFKA-1506
> URL: https://issues.apache.org/jira/browse/KAFKA-1506
> Project: Kafka
>  Issue Type: New Feature
>  Components: replication, tools
>Affects Versions: 0.8.1, 0.8.1.1
>Reporter: Paul Lung
>  Labels: newbie++
>
> I started a reassignment, and for some reason it just takes forever. However, 
> it won¹t let me start another reassignment job while this one is running. So 
> a tool to cancel a reassignment job is needed. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-10-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173792#comment-14173792
 ] 

Jun Rao commented on KAFKA-1481:


4. removeAllMetricsInList() will be called when a producer/consumer instance is 
closed to remove metrics related to a specific client id.

> Stop using dashes AND underscores as separators in MBean names
> --
>
> Key: KAFKA-1481
> URL: https://issues.apache.org/jira/browse/KAFKA-1481
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Otis Gospodnetic
>Priority: Critical
>  Labels: patch
> Fix For: 0.8.2
>
> Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
> KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
> KAFKA-1481_2014-10-15_10-23-35.patch, 
> KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, 
> KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch
>
>
> MBeans should not use dashes or underscores as separators because these 
> characters are allowed in hostnames, topics, group and consumer IDs, etc., 
> and these are embedded in MBeans names making it impossible to parse out 
> individual bits from MBeans.
> Perhaps a pipe character should be used to avoid the conflict. 
> This looks like a major blocker because it means nobody can write Kafka 0.8.x 
> monitoring tools unless they are doing it for themselves AND do not use 
> dashes AND do not use underscores.
> See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Message Metadata

2014-10-16 Thread Joel Koshy
I think the tags are a useful concept to have in that they do for
applications, what the additional metadata does for brokers. i.e.,
avoiding decompression and recompression of an entire message-set. I
agree that we should not place any "core" fields (i.e., those used
internally by Kafka) in tags and those should be first-class fields in
the message header.  E.g., if we intend to support in-built end-to-end
audit in Kafka then fields for auditing (server, timestamps, etc.)
should be first-class fields in the message header.  However, tags are
useful for application-level features that can avoid a full
decompression.

Although Avro has the ability to just deserialize select fields (say a
header) we then limit the optimization to avro-like formats. Also,
that will remain an application-specific thing and not an intrinsic
part of the wire protocol. i.e., brokers will continue to have to
decompress and recompress messages to assign offsets.

Joel

On Wed, Oct 15, 2014 at 09:04:55PM +, Todd Palino wrote:
> Let me add my view on #2 in less delicate terms than Guozhang did :)
> 
> When you¹re trying to run Kafka as a service, having to care about the
> format of the message sucks. I have plenty of users who are just fine
> using the Avro standard and play nice. Then I have a bunch of users who
> don¹t want to use Avro and want to do something else (json, some plain
> text, whatever). Then I have a bunch of users who use Avro but don¹t
> properly register their schemas. Then I have a bunch of users who do
> whatever they want and don¹t tell us.
> 
> What this means is that I can¹t have standard tooling, like auditing, that
> works on the entire system. I either have to whitelist or blacklist
> topics, and then I run into problems when someone adds something new
> either way. It would be preferable if I could monitor and maintain the
> health of the system without having to worry about the message format.
> 
> -Todd
> 
> 
> On 10/15/14, 10:50 AM, "Guozhang Wang"  wrote:
> 
> >Thanks Joe,
> >
> >I think we now have a few open questions to discuss around this topic:
> >
> >1. Shall we make core Kafka properties as first class fields in message
> >header or put them as tags?
> >
> >The pros of the first approach is more compacted format and hence less
> >message header overhead; the cons are that any future message header
> >change
> >needs protocol bump and possible multi-versioned handling on the server
> >side.
> >
> >Vice versa for the second approach.
> >
> >2. Shall we leave app properties still in message content and enforce
> >schema based topics or make them as extensible tags?
> >
> >The pros of the first approach is again saving message header overhead for
> >apps properties; and the cons are that it enforce schema usage for message
> >content to be partially de-serialized only for app header. At LinkedIn we
> >enforce Avro schemas for auditing purposes, and as a result the Kafka team
> >has to manage the schema registration process / schema repository as well.
> >
> >3. Which properties should be core KAFKA and which should be app
> >properties? For example, shall we make properties that only MM cares about
> >as app properties or Kafka properties?
> >
> >Guozhang
> >
> >On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein  wrote:
> >
> >> I think we could add schemaId(binary) to the MessageAndMetaData
> >>
> >> With the schemaId you can implement different downstream software
> >>pattern
> >> on the messages reliably. I wrote up more thoughts on this use
> >> https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it
> >> should strive to encompass all implementation needs for producer,
> >>broker,
> >> consumer hooks.
> >>
> >> So if the application and tagged fields are important you can package
> >>that
> >> into a specific Kafka topic plug-in and assign it to topic(s).  Kafka
> >> server should be able to validate your expected formats (like
> >> encoders/decoders but in broker by topic regardless of producer) to the
> >> topics that have it enabled. We should have these maintained in the
> >>project
> >> under contrib.
> >>
> >> =- Joestein
> >>
> >> On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang 
> >> wrote:
> >>
> >> > Hi Jay,
> >> >
> >> > Thanks for the comments. Replied inline.
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps 
> >>wrote:
> >> >
> >> > > I need to take more time to think about this. Here are a few
> >> off-the-cuff
> >> > > remarks:
> >> > >
> >> > > - To date we have tried really, really hard to keep the data model
> >>for
> >> > > message simple since after all you can always add whatever you like
> >> > inside
> >> > > the message body.
> >> > >
> >> > > - For system tags, why not just make these fields first class
> >>fields in
> >> > > message? The purpose of a system tag is presumably that Why have a
> >> bunch
> >> > of
> >> > > key-value pairs versus first-class fields?
> >> > >
> >> >
> >> > Yes, we can alternatively make system t

Review Request 26811: Patch for KAFKA-1196

2014-10-16 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26811/
---

Review request for kafka.


Bugs: KAFKA-1196
https://issues.apache.org/jira/browse/KAFKA-1196


Repository: kafka


Description
---

KAFKA-1196 WIP Ensure FetchResponses don't exceed 2GB limit.


Diffs
-

  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
a5386a03b62956bc440b40783247c8cdf7432315 

Diff: https://reviews.apache.org/r/26811/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Neha Narkhede
Another JIRA that will be nice to include as part of 0.8.2-beta is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
naming. Looking for people's thoughts on 2 things here -

1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones mentioned
above) in 0.8.2-beta? If so, it will be great to know now so it will allow
us to move forward with the beta release quickly.

Thanks,
Neha

On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
wrote:

> Hi,
>
> We have accumulated an impressive list of pretty major features in 0.8.2 -
> Delete topic
> Automated leader rebalancing
> Controlled shutdown
> Offset management
> Parallel recovery
> min.isr and
> clean leader election
>
> In the past, what has worked for major feature releases is a beta release
> prior to a final release. I'm proposing we do the same for 0.8.2. The only
> blockers for 0.8.2-beta, that I know of are -
>
> https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and
> requires some thinking about the new dependency. Since it is not fully
> ready and there are things to think about, I suggest we take it out, think
> it end to end and then include it in 0.8.3.)
> https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
> Guozhang Wang)
> https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
> waiting on a review by Joe Stein)
>
> It seems that 1634 and 1671 can get wrapped up in a week. Do people think
> we should cut 0.8.2-beta by next week?
>
> Thanks,
> Neha
>


[jira] [Updated] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1196:
-
Attachment: KAFKA-1196.patch

> java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
> ---
>
> Key: KAFKA-1196
> URL: https://issues.apache.org/jira/browse/KAFKA-1196
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.0
> Environment: running java 1.7, linux and kafka compiled against scala 
> 2.9.2
>Reporter: Gerrit Jansen van Vuuren
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1196.patch
>
>
> I have 6 topics each with 8 partitions spread over 4 kafka servers.
> the servers are 24 core 72 gig ram.
> While consuming from the topics I get an IlegalArgumentException and all 
> consumption stops, the error keeps on throwing.
> I've tracked it down to FectchResponse.scala line 33
> The error happens when the FetchResponsePartitionData object's readFrom 
> method calls:
> messageSetBuffer.limit(messageSetSize)
> I put in some debug code the the messageSetSize is 671758648, while the 
> buffer.capacity() gives 155733313, for some reason the buffer is smaller than 
> the required message size.
> I don't know the consumer code enough to debug this. It doesn't matter if 
> compression is used or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173941#comment-14173941
 ] 

Ewen Cheslack-Postava commented on KAFKA-1196:
--

Created reviewboard https://reviews.apache.org/r/26811/diff/
 against branch origin/trunk

> java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
> ---
>
> Key: KAFKA-1196
> URL: https://issues.apache.org/jira/browse/KAFKA-1196
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.0
> Environment: running java 1.7, linux and kafka compiled against scala 
> 2.9.2
>Reporter: Gerrit Jansen van Vuuren
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1196.patch
>
>
> I have 6 topics each with 8 partitions spread over 4 kafka servers.
> the servers are 24 core 72 gig ram.
> While consuming from the topics I get an IlegalArgumentException and all 
> consumption stops, the error keeps on throwing.
> I've tracked it down to FectchResponse.scala line 33
> The error happens when the FetchResponsePartitionData object's readFrom 
> method calls:
> messageSetBuffer.limit(messageSetSize)
> I put in some debug code the the messageSetSize is 671758648, while the 
> buffer.capacity() gives 155733313, for some reason the buffer is smaller than 
> the required message size.
> I don't know the consumer code enough to debug this. It doesn't matter if 
> compression is used or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1196:
-
Assignee: Ewen Cheslack-Postava
  Status: Patch Available  (was: Open)

> java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
> ---
>
> Key: KAFKA-1196
> URL: https://issues.apache.org/jira/browse/KAFKA-1196
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.0
> Environment: running java 1.7, linux and kafka compiled against scala 
> 2.9.2
>Reporter: Gerrit Jansen van Vuuren
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1196.patch
>
>
> I have 6 topics each with 8 partitions spread over 4 kafka servers.
> the servers are 24 core 72 gig ram.
> While consuming from the topics I get an IlegalArgumentException and all 
> consumption stops, the error keeps on throwing.
> I've tracked it down to FectchResponse.scala line 33
> The error happens when the FetchResponsePartitionData object's readFrom 
> method calls:
> messageSetBuffer.limit(messageSetSize)
> I put in some debug code the the messageSetSize is 671758648, while the 
> buffer.capacity() gives 155733313, for some reason the buffer is smaller than 
> the required message size.
> I don't know the consumer code enough to debug this. It doesn't matter if 
> compression is used or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173943#comment-14173943
 ] 

Ewen Cheslack-Postava commented on KAFKA-1196:
--

This is a wip patch to fix this issue, which previous discussion suggests was 
due to the FetchResponse exceeding 2GB. My approach to triggering the issue, 
however, doesn't exhibit exactly the same issue but does cause an unrecoverable 
error that causes the consumer connection to terminate. (For reference, it 
causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete 
and sendSize is negative due to overflow. This confuses the server since it 
looks like the message is already done sending and the server forcibly closes 
the consumer's connection.)

The patch addresses the core issue by ensuring the returned message doesn't 
exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the 
consumer. But there are a lot of points that still need to be addressed:

* I started by building an integration test to trigger the issue, included in 
PrimitiveApiTest. However, since we necessarily need to have > 2GB data to 
trigger the issue, it's probably too expensive to include in this way. Offline 
discussion suggests maybe a system test would be a better place to include 
this. It's still included here for completeness.
* The implementation filters to a subset of the data in FetchResponse. The main 
reason for this is that this process needs to know the exact (or at least 
conservative estimate) size of serialized data, which only FetchResponse knows. 
But it's also a bit weird compared to other message classes, which are case 
classes and don't modify those inputs.
* Algorithm for choosing subset to return: initial approach is to remove random 
elements until we get below the limit. This is simple to understand and avoids 
starvation of specific TopicAndPartitions. Any concerns with this basic 
approach?
* I'm pretty sure I've managed to keep the < 2GB case to effectively the same 
computational cost (computing the serialized size, grouped data, etc. exactly 
once as before). However, for the > 2GB case I've only ensured correctness. In 
particular, the progressive removal and reevaluation of serialized size could 
potentially be very bad for very large data sets (e.g. starting a mirror maker 
against a large data set with large # of partitions from scratch).
* Note that the algorithm never deals with the actual message data, only 
metadata about what messages are available. This is relevant since this is what 
suggested the approach in the patch could still be performant -- 
ReplicaManager.readMessageSets processes the entire FetchRequest and filters it 
down because the metadata involved is relatively small.
* Based on the previous two points, this really needs some more realistic large 
scale system tests to make sure this approach is not only correct, but provides 
reasonable performance (or indicates we need to revise the algorithm for 
selecting a subset of the data).
* Testing isn't really complete -- I triggered the issue with 4 topics * 600 
MB/topic, which is > 2GB. Another obvious case to check is when some partitions 
contain > 2GB on their own.
* I'd like someone to help sanity check the exact maximum FetchResponse 
serialized size we limit messages to. It's not Int.MaxValue because the 
FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size. 
I'd like a sanity check that the extra 4 bytes is enough -- is there any 
additional wrapping we might need to account for? Getting a test to hit exactly 
that narrow range could be tricky.
* The tests include both immediate-response and purgatory paths, but the 
purgatory version requires a timeout in the test, which could end up being 
flaky + wasting time, but it doesn't look like there's a great way to mock that 
right now. Maybe this doesn't matter if it moves to a system test?
* One case this doesn't handle yet is when the data reaches > 2GB after it's in 
the purgatory. The result is correct, but the response is not sent as soon as 
that condition is satisfied. This is because it looks like evaluating this 
exactly would require calling readMessageSets and evaluating the size of the 
message for every DelayedFetch.isSatisifed call. This sounds like it could end 
up being pretty expensive. Maybe there's a better way, perhaps an approximate 
scheme?
* The test requires some extra bytes in the fetchSize for each partition, 
presumably for overhead in encoding. I haven't tracked down exactly how big 
that should be, but I'm guessing it could end up affecting the results of more 
comprehensive tests.

> java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
> ---
>
> Key: KAFKA-1196
> URL: https://issues.apac

[jira] [Resolved] (KAFKA-1707) ConsumerOffsetChecker shows none partitions assigned

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1707.
--
Resolution: Won't Fix

This is a known issue but there are a few things to check. Did you run the 
VerifyConsumerRebalance? However, I'd suggest directing such questions to the 
mailing list first. So I'll go ahead and close this ticket.

> ConsumerOffsetChecker shows none partitions assigned
> 
>
> Key: KAFKA-1707
> URL: https://issues.apache.org/jira/browse/KAFKA-1707
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.0
> Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 
> 2.40GHz/1.2e+02GB
>Reporter: Hari
>Assignee: Neha Narkhede
>  Labels: patch
>
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker shows some 
> partitions having "none" consumers after re-balance triggered due to new 
> consumer joined/disconnected to the group. The lag gets piling up till the 
> partitions are assigned to it usually after another re-balance trigger.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1708) Consumers intermittently stop consuming till restart

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1708.
--
Resolution: Won't Fix

Same here. Please direct this to the mailing list where people can help out. If 
we agree that there is a problem, then you can file a JIRA. 

> Consumers intermittently stop consuming till restart
> 
>
> Key: KAFKA-1708
> URL: https://issues.apache.org/jira/browse/KAFKA-1708
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.0
> Environment: HP 40 x Intel(R) Xeon(R) CPU E5-2470 v2 @ 
> 2.40GHz/1.2e+02GB
>Reporter: Hari
>Assignee: Neha Narkhede
>  Labels: patch
>
> Using a simple consumer, and reading messages using StreamIterator noticed 
> that the consumptions suddenly stops and the lag starts building up till the 
> consumer is restarted. Below is the code snippet
> final Map>> streamsByName = 
> consumerConnector.createMessageStreams(topicCountMap);
> ConsumerIterator streamIterator = 
> streamsByName.get(topicName).get(IDX_FIRST_ITEM).iterator();
> if (streamIterator.hasNext()) {
> final MessageAndMetadata item =   
> streamIterator.next();
> ...
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1710:
-
Reviewer: Jun Rao
Assignee: (was: Jun Rao)

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> Thanks,
> Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1492) Getting error when sending producer request at the broker end with a single broker

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173987#comment-14173987
 ] 

Neha Narkhede commented on KAFKA-1492:
--

bq. This seems more appropriate for the Kafka user mailing list or Stack 
Overflow (apache-kafka tag) rather than JIRA.

+1

> Getting error when sending producer request at the broker end with a single 
> broker
> --
>
> Key: KAFKA-1492
> URL: https://issues.apache.org/jira/browse/KAFKA-1492
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
>Reporter: sriram
>Assignee: Jun Rao
>
> Tried to run a simple example by sending a message to a single broker . 
> Getting error 
> [2014-06-13 08:35:45,402] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-06-13 08:35:45,440] WARN [KafkaApi-1] Produce request with correlation 
> id 2 from client  on partition [samsung,0] failed due to Leader not local for 
> partition [samsung,0] on broker 1 (kafka.server.KafkaApis)
> [2014-06-13 08:35:45,440] INFO [KafkaApi-1] Send the close connection 
> response due to error handling produce request [clientId = , correlationId = 
> 2, topicAndPartition = [samsung,0]] with Ack=0 (kafka.server.KafkaApis)
> OS- Windows 7 , JDK 1.7 , Scala 2.10



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1637:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Pushed updated patch to trunk and 0.8.2

> SimpleConsumer.fetchOffset returns wrong error code when no offset exists for 
> topic/partition/consumer group
> 
>
> Key: KAFKA-1637
> URL: https://issues.apache.org/jira/browse/KAFKA-1637
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.8.1, 0.8.1.1
> Environment: Linux
>Reporter: Amir Malekpour
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie
> Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch, 
> KAFKA-1637_2014-10-15_14:47:21.patch
>
>
> This concerns Kafka's Offset  Fetch API:
> According to Kafka's current documentation, "if there is no offset associated 
> with a topic-partition under that consumer group the broker does not set an 
> error code (since it is not really an error), but returns empty metadata and 
> sets the offset field to -1."  (Link below)
> However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes 
> it impossible for the client to decide if there was an error, or if there is 
> no offset associated with a topic-partition under that consumer group.
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26770: Patch for KAFKA-1108

2014-10-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26770/#review56956
---



core/src/main/scala/kafka/server/KafkaServer.scala


Should this be WARN instead? ERROR wouldn't be ideal since this operation 
is retried later. Also wondering if this message actually gives much 
information about the reason of the failure? It might just print out 
IOException. I think the reason for failure that people might understand is 
what might cause the IOException. How about improving the error message by 
saying that the possible cause for this error could be that the leader movement 
operation on the controller took longer than than the configured 
socket.timeout.ms. 

This will encourage users to inspect if the socket.timeout.ms needs to be 
bumped up or inspect why the controller is taking long for moving the leaders 
away from this broker.


- Neha Narkhede


On Oct. 15, 2014, 6:55 p.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26770/
> ---
> 
> (Updated Oct. 15, 2014, 6:55 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1108
> https://issues.apache.org/jira/browse/KAFKA-1108
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1108 Log IOException messages during controlled shutdown.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 07c0a078ffa5142441f687da851472da732c3837 
> 
> Diff: https://reviews.apache.org/r/26770/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1653:
-
Reviewer: Neha Narkhede

> Duplicate broker ids allowed in replica assignment
> --
>
> Key: KAFKA-1653
> URL: https://issues.apache.org/jira/browse/KAFKA-1653
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Berdeen
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie
> Attachments: KAFKA-1653.patch
>
>
> The reassign partitions command and the controller do not ensure that all 
> replicas for a partition are on different brokers. For example, you could set 
> 1,2,2 as the list of brokers for the replicas.
> kafka-topics.sh --describe --under-replicated will list these partitions as 
> under-replicated, but I can't see a reason why the controller should allow 
> this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26666: Patch for KAFKA-1653

2014-10-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/#review56958
---


Since you fixed some other tools as well, can we also fix the preferred replica 
election command where we can de-dup the partitions?


core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala


I think it's worth telling the user which partition's replicas contain 
duplicates (and include all such partitions instead of one) since typically 
partition reassignment operation can contain 100s of partitions.


- Neha Narkhede


On Oct. 13, 2014, 11:57 p.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/2/
> ---
> 
> (Updated Oct. 13, 2014, 11:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1653
> https://issues.apache.org/jira/browse/KAFKA-1653
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. 
> This covers a few cases besides the one identified in the bug. Aside from a 
> major refactoring to use Sets for broker/replica lists, sanitizing user input 
> seems to be the best solution here. I chose to generate errors instead of 
> just using toSet since a duplicate entry may indicate that a different broker 
> id was accidentally omitted.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
> 691d69a49a240f38883d2025afaec26fd61281b5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
>   core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
> d298e7e81acc7427c6cf4796b445966267ca54eb 
> 
> Diff: https://reviews.apache.org/r/2/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174052#comment-14174052
 ] 

Neha Narkhede commented on KAFKA-1476:
--

[~balaji.sesha...@dish.com] Thanks for the patch. Few comments-
1. The "CONSUMER GROUPS
*" format is inconsistent with the other tools. For example, 
kafka-topics --list. Can we please remove it?
2. Currently, your tool only supports the list option. So the topic option is 
not required. 
3. The getConsumerGroups() API is better suited for ZkUtils.

Would you mind addressing the other feature requirements as well? Alternately, 
we can limit this JIRA to list and describe.

> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>Assignee: BalajiSeshadri
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, 
> KAFKA-1476.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174061#comment-14174061
 ] 

Neha Narkhede commented on KAFKA-328:
-

Thanks for the patch. Would you mind using our patch review tool going forward? 
It will make it easier to review.
1. Better to use intercept[IllegalStateException] in the test. 
2. We should add all relevant test cases mentioned in the description like 
repeated shutdown

> Write unit test for kafka server startup and shutdown API 
> --
>
> Key: KAFKA-328
> URL: https://issues.apache.org/jira/browse/KAFKA-328
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: BalajiSeshadri
>  Labels: newbie
> Attachments: KAFKA-328-FORMATTED.patch, KAFKA-328.patch
>
>
> Background discussion in KAFKA-320
> People often try to embed KafkaServer in an application that ends up calling 
> startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
> works correctly we have to be very careful about cleaning up resources. This 
> is a good practice for making unit tests reliable anyway.
> A good first step would be to add some unit tests on startup and shutdown to 
> cover various cases:
> 1. A Kafka server can startup if it is not already starting up, if it is not 
> currently being shutdown, or if it hasn't been already started
> 2. A Kafka server can shutdown if it is not already shutting down, if it is 
> not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1654) Provide a way to override server configuration from command line

2014-10-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174101#comment-14174101
 ] 

Neha Narkhede commented on KAFKA-1654:
--

[~jarcec] Thanks for the patch. Overall looks good. Few comments
1. The usage command says "USAGE: java [options] %s [kafka options] 
server.properties", but in order for the kafka options to take effect you also 
need to use --set. 
2. If you leave set out, it doesn't error out saying that --set is required and 
silently does not end up overriding the property value
3. Can we rename set to override?
4. If you specify multiple properties, it is unclear that you need to use --set 
for each of those. If you don't, it doesn't error out and silently doesn't 
override it.


> Provide a way to override server configuration from command line
> 
>
> Key: KAFKA-1654
> URL: https://issues.apache.org/jira/browse/KAFKA-1654
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Jarek Jarcec Cecho
>Assignee: Jarek Jarcec Cecho
> Fix For: 0.8.3
>
> Attachments: KAFKA-1654.patch
>
>
> I've been recently playing with Kafka and I found the current way of server 
> configuration quite inflexible. All the configuration options have to be 
> inside a properties file and there is no way how they can be overridden for 
> execution.  In order to temporarily change one property I had to copy the 
> config file and change the property there. Hence, I'm wondering if people 
> would be open to provide a way how to specify and override the configs from 
> the command line when starting Kafka?
> Something like:
> {code}
> ./bin/kafka-server-start.sh -Dmy.cool.property=X kafka.properties
> {code}
> or 
> {code}
> ./bin/kafka-server-start.sh --set my.cool.property=X kafka.properties
> {code}
> I'm more than happy to take a stab at it, but I would like to see if there is 
> an interest for such capability?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review56989
---


This is a fairly tricky patch and I'm not 100% sure that we haven't introduced 
any kind of regression. I'd feel more comfortable accepting this patch, if we 
repeated the kind of testing that was done to find this bug.


core/src/main/scala/kafka/server/ReplicaManager.scala


if(!partition.makeFollower)



core/src/main/scala/kafka/server/ReplicaManager.scala


Now for partitions that have a leader, we are not adding a follower.


- Neha Narkhede


On Oct. 13, 2014, 11:38 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26373/
> ---
> 
> (Updated Oct. 13, 2014, 11:38 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1647
> https://issues.apache.org/jira/browse/KAFKA-1647
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 78b7514cc109547c562e635824684fad581af653 
> 
> Diff: https://reviews.apache.org/r/26373/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174178#comment-14174178
 ] 

Ewen Cheslack-Postava commented on KAFKA-1710:
--

This looks like a red herring due to the structure of the test. The test code 
generates 200 threads which share 4 producers, and each thread round-robins 
through the consumers, then sleeps for 10ms.

It looks like all that's happening is that the profiling tool sees the same 
stack trace repeatedly because there's a huge amount of contention for the 4 
producers. If you take a look at the stack traces, they're almost all waiting 
on a lock on a queue that the messages get appended to. The few active threads 
have those queues locked and are working on compressing data before sending it 
out. Given the number of threads and the small number of producers, it's not 
surprising that YourKit sees the same stack traces for a long time -- the 
threads can be making forward progress, but any time the profiler stops to look 
at the stack traces, it's very likely that any given thread will be waiting on 
a lock with the same stack trace. None of the stack traces show any evidence of 
a real deadlock (i.e. I can't find any set of locks where there could be 
ordering issues since almost every thread is just waiting on a one lock in one 
of the producers).

If this did hit deadlock, the process should stop entirely because all the 
worker threads use all 4 producers and the supposedly deadlocked threads are 
all waiting on locks in the producer. I ran the test to completion multiple 
times without any issues. Unless this has actually been observed to hit 
deadlock and stop making progress, I think this should be closed since these 
messages are really just warnings from YourKit.

[~Bmis13] you might try reducing the # of threads and seeing if those charts 
end up looking better. I bet if you actually showed all the threads instead of 
just the couple in the screenshot, the areas marked as runnable across all 
threads would sum to a reasonable total. Also, there are other possible issues 
with getting good performance from this test code, e.g. the round robin 
approach can cause all threads to get blocked on the same producer if the 
producer gets locked for a relatively long time. This can happen when data is 
ready to be sent and is getting compressed. Other approaches to distributing 
work across the producers may provide better throughput.


> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRe

Re: Security JIRAS

2014-10-16 Thread Michael Herstine
Thanks, Jay.

I¹m new to the project, and I¹m wondering how things proceed from hereŠ
are folks working on these tasks, or do they get assigned, orŠ?

On 10/7/14, 5:15 PM, "Jay Kreps"  wrote:

>Hey guys,
>
>As promised, I added a tree of JIRAs for the stuff in the security wiki (
>https://cwiki.apache.org/confluence/display/KAFKA/Security):
>
>https://issues.apache.org/jira/browse/KAFKA-1682
>
>I tried to break it into reasonably standalone pieces. I think many of the
>tickets could actually be done in parallel. Since there were many people
>interested in this area this may help parallelize the work a bit.
>
>I added some strawman details on implementation to each ticket. We can
>discuss and refine further on the individual tickets.
>
>Please take a look and let me know if this breakdown seems reasonable.
>
>Cheers,
>
>-Jay



Re: Security JIRAS

2014-10-16 Thread Gwen Shapira
Wondering the same here :)

I think there are some parallel threads here (SSL is independent of
Kerberos, as far as I can see).

Kerberos work is blocked on
https://issues.apache.org/jira/browse/KAFKA-1683 - "Implement a
"session" concept in the socket server". So there's no point in
picking up other tasks before this is assigned (and at least
designed).

I'm looking at Kafka Brokers authentication with ZooKeeper since this
looks independent of other tasks.

Gwen



On Thu, Oct 16, 2014 at 4:23 PM, Michael Herstine
 wrote:
> Thanks, Jay.
>
> I¹m new to the project, and I¹m wondering how things proceed from hereŠ
> are folks working on these tasks, or do they get assigned, orŠ?
>
> On 10/7/14, 5:15 PM, "Jay Kreps"  wrote:
>
>>Hey guys,
>>
>>As promised, I added a tree of JIRAs for the stuff in the security wiki (
>>https://cwiki.apache.org/confluence/display/KAFKA/Security):
>>
>>https://issues.apache.org/jira/browse/KAFKA-1682
>>
>>I tried to break it into reasonably standalone pieces. I think many of the
>>tickets could actually be done in parallel. Since there were many people
>>interested in this area this may help parallelize the work a bit.
>>
>>I added some strawman details on implementation to each ticket. We can
>>discuss and refine further on the individual tickets.
>>
>>Please take a look and let me know if this breakdown seems reasonable.
>>
>>Cheers,
>>
>>-Jay
>


Re: Review Request 26658: Patch for KAFKA-1493

2014-10-16 Thread James Oliver

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/
---

(Updated Oct. 16, 2014, 8:49 p.m.)


Review request for kafka.


Bugs: KAFKA-1493
https://issues.apache.org/jira/browse/KAFKA-1493


Repository: kafka


Description (updated)
---

KAFKA-1493 Implement LZ4 Frame I/O Streams


KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
5227b2d7ab803389d1794f48c8232350c05b14fd 
  clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
0323f5f7032dceb49d820c17a41b78c56591ffc4 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 
  config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
  core/src/main/scala/kafka/message/CompressionCodec.scala 
de0a0fade5387db63299c6b112b3c9a5e41d82ec 
  core/src/main/scala/kafka/message/CompressionFactory.scala 
8420e13d0d8680648df78f22ada4a0d4e3ab8758 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b024a693c23cb21f1efe405ed414bf23f3974f31 
  core/src/main/scala/kafka/tools/PerfConfig.scala 
c72002976d90416559090a665f6494072a6c2dec 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
c95485170fd8b4f5faad740f049e5d09aca8829d 
  core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
6f0addcea64f1e78a4de50ec8135f4d02cebd305 
  core/src/test/scala/unit/kafka/message/MessageTest.scala 
958c1a60069ad85ae20f5c58e74679cd9fa6f70e 

Diff: https://reviews.apache.org/r/26658/diff/


Testing
---


Thanks,

James Oliver



Re: Review Request 26658: Patch for KAFKA-1493

2014-10-16 Thread James Oliver

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/
---

(Updated Oct. 16, 2014, 8:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1493
https://issues.apache.org/jira/browse/KAFKA-1493


Repository: kafka


Description
---

KAFKA-1493 Implement LZ4 Frame I/O Streams


KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
5227b2d7ab803389d1794f48c8232350c05b14fd 
  clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
0323f5f7032dceb49d820c17a41b78c56591ffc4 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 
  config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
  core/src/main/scala/kafka/message/CompressionCodec.scala 
de0a0fade5387db63299c6b112b3c9a5e41d82ec 
  core/src/main/scala/kafka/message/CompressionFactory.scala 
8420e13d0d8680648df78f22ada4a0d4e3ab8758 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b024a693c23cb21f1efe405ed414bf23f3974f31 
  core/src/main/scala/kafka/tools/PerfConfig.scala 
c72002976d90416559090a665f6494072a6c2dec 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
c95485170fd8b4f5faad740f049e5d09aca8829d 
  core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
6f0addcea64f1e78a4de50ec8135f4d02cebd305 
  core/src/test/scala/unit/kafka/message/MessageTest.scala 
958c1a60069ad85ae20f5c58e74679cd9fa6f70e 

Diff: https://reviews.apache.org/r/26658/diff/


Testing (updated)
---

./gradlew test
All tests passed


Thanks,

James Oliver



[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Oliver updated KAFKA-1493:

Attachment: KAFKA-1493_2014-10-16_13:49:34.patch

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174217#comment-14174217
 ] 

James Oliver commented on KAFKA-1493:
-

Updated reviewboard https://reviews.apache.org/r/26658/diff/
 against branch origin/trunk

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Oliver reassigned KAFKA-1493:
---

Assignee: James Oliver  (was: Ivan Lyutov)

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174222#comment-14174222
 ] 

Ewen Cheslack-Postava commented on KAFKA-1108:
--

Updated reviewboard https://reviews.apache.org/r/26770/diff/
 against branch origin/trunk

> when controlled shutdown attempt fails, the reason is not always logged
> ---
>
> Key: KAFKA-1108
> URL: https://issues.apache.org/jira/browse/KAFKA-1108
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch
>
>
> In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and 
> then if there's a failure, it will retry the controlledShutdown.
> Looking at the code, there are 2 ways a retry could fail, one with an error 
> response from the controller, and this messaging code:
> {code}
> info("Remaining partitions to move: 
> %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
> info("Error code from controller: %d".format(shutdownResponse.errorCode))
> {code}
> Alternatively, there could be an IOException, with this code executed:
> {code}
> catch {
>   case ioe: java.io.IOException =>
> channel.disconnect()
> channel = null
> // ignore and try again
> }
> {code}
> And then finally, in either case:
> {code}
>   if (!shutdownSuceeded) {
> Thread.sleep(config.controlledShutdownRetryBackoffMs)
> warn("Retrying controlled shutdown after the previous attempt 
> failed...")
>   }
> {code}
> It would be nice if the nature of the IOException were logged in either case 
> (I'd be happy with an ioe.getMessage() instead of a full stack trace, as 
> kafka in general tends to be too willing to dump IOException stack traces!).
> I suspect, in my case, the actual IOException is a socket timeout (as the 
> time between initial "Starting controlled shutdown" and the first 
> "Retrying..." message is usually about 35 seconds (the socket timeout + the 
> controlled shutdown retry backoff).  So, it would seem that really, the issue 
> in this case is that controlled shutdown is taking too long.  It would seem 
> sensible instead to have the controller report back to the server (before the 
> socket timeout) that more time is needed, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26770: Patch for KAFKA-1108

2014-10-16 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26770/
---

(Updated Oct. 16, 2014, 8:53 p.m.)


Review request for kafka.


Bugs: KAFKA-1108
https://issues.apache.org/jira/browse/KAFKA-1108


Repository: kafka


Description (updated)
---

More informative message and increase log level to warn.


Diffs (updated)
-

  core/src/main/scala/kafka/server/KafkaServer.scala 
07c0a078ffa5142441f687da851472da732c3837 

Diff: https://reviews.apache.org/r/26770/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1108) when controlled shutdown attempt fails, the reason is not always logged

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1108:
-
Attachment: KAFKA-1108_2014-10-16_13:53:11.patch

> when controlled shutdown attempt fails, the reason is not always logged
> ---
>
> Key: KAFKA-1108
> URL: https://issues.apache.org/jira/browse/KAFKA-1108
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1108.patch, KAFKA-1108_2014-10-16_13:53:11.patch
>
>
> In KafkaServer.controlledShutdown(), it initiates a controlled shutdown, and 
> then if there's a failure, it will retry the controlledShutdown.
> Looking at the code, there are 2 ways a retry could fail, one with an error 
> response from the controller, and this messaging code:
> {code}
> info("Remaining partitions to move: 
> %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
> info("Error code from controller: %d".format(shutdownResponse.errorCode))
> {code}
> Alternatively, there could be an IOException, with this code executed:
> {code}
> catch {
>   case ioe: java.io.IOException =>
> channel.disconnect()
> channel = null
> // ignore and try again
> }
> {code}
> And then finally, in either case:
> {code}
>   if (!shutdownSuceeded) {
> Thread.sleep(config.controlledShutdownRetryBackoffMs)
> warn("Retrying controlled shutdown after the previous attempt 
> failed...")
>   }
> {code}
> It would be nice if the nature of the IOException were logged in either case 
> (I'd be happy with an ioe.getMessage() instead of a full stack trace, as 
> kafka in general tends to be too willing to dump IOException stack traces!).
> I suspect, in my case, the actual IOException is a socket timeout (as the 
> time between initial "Starting controlled shutdown" and the first 
> "Retrying..." message is usually about 35 seconds (the socket timeout + the 
> controlled shutdown retry backoff).  So, it would seem that really, the issue 
> in this case is that controlled shutdown is taking too long.  It would seem 
> sensible instead to have the controller report back to the server (before the 
> socket timeout) that more time is needed, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26770: Patch for KAFKA-1108

2014-10-16 Thread Ewen Cheslack-Postava


> On Oct. 16, 2014, 5:55 p.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/server/KafkaServer.scala, line 239
> > 
> >
> > Should this be WARN instead? ERROR wouldn't be ideal since this 
> > operation is retried later. Also wondering if this message actually gives 
> > much information about the reason of the failure? It might just print out 
> > IOException. I think the reason for failure that people might understand is 
> > what might cause the IOException. How about improving the error message by 
> > saying that the possible cause for this error could be that the leader 
> > movement operation on the controller took longer than than the configured 
> > socket.timeout.ms. 
> > 
> > This will encourage users to inspect if the socket.timeout.ms needs to 
> > be bumped up or inspect why the controller is taking long for moving the 
> > leaders away from this broker.

The INFO level just matched similar messages a few lines above, although this 
is a more significant issue than those. Newest patch updates to WARN. Message 
is also more detailed, but ideally the IOException message also contains more 
than just the class name.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26770/#review56956
---


On Oct. 16, 2014, 8:53 p.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26770/
> ---
> 
> (Updated Oct. 16, 2014, 8:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1108
> https://issues.apache.org/jira/browse/KAFKA-1108
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> More informative message and increase log level to warn.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 07c0a078ffa5142441f687da851472da732c3837 
> 
> Diff: https://reviews.apache.org/r/26770/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Joe Stein
+1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.

I agree to the tickets you brought up to have in 0.8.2-beta and also
https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.

/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
/
On Oct 16, 2014 12:55 PM, "Neha Narkhede"  wrote:

> Another JIRA that will be nice to include as part of 0.8.2-beta is
> https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
> naming. Looking for people's thoughts on 2 things here -
>
> 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
> final 4-5 weeks later?
> 2. Do people want to include any JIRAs (other than the ones mentioned
> above) in 0.8.2-beta? If so, it will be great to know now so it will allow
> us to move forward with the beta release quickly.
>
> Thanks,
> Neha
>
> On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
> wrote:
>
> > Hi,
> >
> > We have accumulated an impressive list of pretty major features in 0.8.2
> -
> > Delete topic
> > Automated leader rebalancing
> > Controlled shutdown
> > Offset management
> > Parallel recovery
> > min.isr and
> > clean leader election
> >
> > In the past, what has worked for major feature releases is a beta release
> > prior to a final release. I'm proposing we do the same for 0.8.2. The
> only
> > blockers for 0.8.2-beta, that I know of are -
> >
> > https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and
> > requires some thinking about the new dependency. Since it is not fully
> > ready and there are things to think about, I suggest we take it out,
> think
> > it end to end and then include it in 0.8.3.)
> > https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
> > Guozhang Wang)
> > https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
> > waiting on a review by Joe Stein)
> >
> > It seems that 1634 and 1671 can get wrapped up in a week. Do people think
> > we should cut 0.8.2-beta by next week?
> >
> > Thanks,
> > Neha
> >
>


[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate use 
case.  It would be great if you into alternative to synchronization block.

{code}
 synchronized (dq) {
..
}
{code}

Thanks,

Bhavesh 

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-55 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> Thanks,
> Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:32 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.

{code title=RecordAccumulator.java|borderStyle=solid}
 synchronized (dq) {
..
}
{code}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
   

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:33 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.

{code title=RecordAccumulator.java|borderStyle=solid}
 synchronized (dq) {
  
}
{code}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
   

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
record = asyncQueue.take();
   

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:33 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java|borderStyle=solid }
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
record = asyncQueue.take();
  

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code title=KafkaAsyncProducer.java}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
record = asyncQueue.take();
 

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:34 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.  That is root 
of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
record = asyncQueue.take();
if(record != nu

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:36 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you into alternative implementation to synchronization block.Test code 
amplifies the root cause.

That is root of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
record = asyncQueue.take();

[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:38 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you can look into alternative implementation to synchronization block.
Test code amplifies the root cause.

That is root of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
record = asyncQueue.take();
   

[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174305#comment-14174305
 ] 

Jun Rao commented on KAFKA-1493:


James,

Thanks for the patch. There are a few things marked as todo in the patch. Are 
those required? Do you think you have time to finish the patch for 0.8.2?

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26666: Patch for KAFKA-1653

2014-10-16 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/
---

(Updated Oct. 16, 2014, 9:54 p.m.)


Review request for kafka.


Bugs: KAFKA-1653
https://issues.apache.org/jira/browse/KAFKA-1653


Repository: kafka


Description (updated)
---

Generate error for duplicates in PreferredLeaderElectionCommand instead of just 
swallowing duplicates.


Report which entries are duplicated for ReassignPartitionCommand since they may 
be difficult to find in large reassignments.


Diffs (updated)
-

  core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
c7918483c02040a7cc18d6e9edbd20a3025a3a55 
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
691d69a49a240f38883d2025afaec26fd61281b5 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
  core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
d298e7e81acc7427c6cf4796b445966267ca54eb 

Diff: https://reviews.apache.org/r/2/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1653:
-
Attachment: KAFKA-1653_2014-10-16_14:54:07.patch

> Duplicate broker ids allowed in replica assignment
> --
>
> Key: KAFKA-1653
> URL: https://issues.apache.org/jira/browse/KAFKA-1653
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Berdeen
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie
> Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch
>
>
> The reassign partitions command and the controller do not ensure that all 
> replicas for a partition are on different brokers. For example, you could set 
> 1,2,2 as the list of brokers for the replicas.
> kafka-topics.sh --describe --under-replicated will list these partitions as 
> under-replicated, but I can't see a reason why the controller should allow 
> this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174319#comment-14174319
 ] 

Ewen Cheslack-Postava commented on KAFKA-1653:
--

Updated reviewboard https://reviews.apache.org/r/2/diff/
 against branch origin/trunk

> Duplicate broker ids allowed in replica assignment
> --
>
> Key: KAFKA-1653
> URL: https://issues.apache.org/jira/browse/KAFKA-1653
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Berdeen
>Assignee: Ewen Cheslack-Postava
>  Labels: newbie
> Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch
>
>
> The reassign partitions command and the controller do not ensure that all 
> replicas for a partition are on different brokers. For example, you could set 
> 1,2,2 as the list of brokers for the replicas.
> kafka-topics.sh --describe --under-replicated will list these partitions as 
> under-replicated, but I can't see a reason why the controller should allow 
> this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26666: Patch for KAFKA-1653

2014-10-16 Thread Ewen Cheslack-Postava


> On Oct. 16, 2014, 6:10 p.m., Neha Narkhede wrote:
> > Since you fixed some other tools as well, can we also fix the preferred 
> > replica election command where we can de-dup the partitions?

This was already removing duplicates, I had it generate an exception instead 
since duplicates may indicate a config error. I'm assuming that's what you 
meant here.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/2/#review56958
---


On Oct. 16, 2014, 9:54 p.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/2/
> ---
> 
> (Updated Oct. 16, 2014, 9:54 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1653
> https://issues.apache.org/jira/browse/KAFKA-1653
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Generate error for duplicates in PreferredLeaderElectionCommand instead of 
> just swallowing duplicates.
> 
> 
> Report which entries are duplicated for ReassignPartitionCommand since they 
> may be difficult to find in large reassignments.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
> c7918483c02040a7cc18d6e9edbd20a3025a3a55 
>   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
> 691d69a49a240f38883d2025afaec26fd61281b5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa 
>   core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
> d298e7e81acc7427c6cf4796b445966267ca54eb 
> 
> Diff: https://reviews.apache.org/r/2/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>



[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174278#comment-14174278
 ] 

Bhavesh Mistry edited comment on KAFKA-1710 at 10/16/14 9:56 PM:
-

[~ewencp],

Thanks for looking into this.  If you look at the thread dump, you will see the 
blocked threads as well.  As this particular code exposes the Thread 
contentions in the Kafka Producer.  We have this issues when we aggregate event 
to send to same partition regardless of number of producers.  It would be great 
if you can look into alternative implementation to synchronization block.
Test code amplifies the root cause.

That is root of the problem.
synchronized (dq) {
  
}

Do you think it would be better to do this following way ?
{code}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;

public class KafkaAsyncProducer implements Producer {

// TODO configure this queue
private final LinkedBlockingQueue asyncQueue; 
private final KafkaProducer producer;
private final List threadList;
private final CountDownLatch latch;

private final AtomicBoolean close = new AtomicBoolean(false);

public KafkaAsyncProducer(int capacity, int numberOfDrainTreads,
Properties configFile ){
if(configFile == null){
throw new NullPointerException("Producer configuration 
cannot be null");
}
// set the capacity for the queue
asyncQueue = new LinkedBlockingQueue(capacity);
producer = new KafkaProducer(configFile);
threadList = new ArrayList(numberOfDrainTreads);
latch = new CountDownLatch(numberOfDrainTreads);
// start the drain threads...
for(int i =0 ; i < numberOfDrainTreads ; i ++){
Thread th = new Thread(new 
ConsumerThread(),"Kafka_Drain-" +i);
th.setDaemon(true);
threadList.add(th);
th.start();
}

}



public Future send(ProducerRecord record) {
try {
if(record == null){
throw new NullPointerException("Null record 
cannot be sent.");
}
if(close.get()){
throw new KafkaException("Producer aready 
closed or in processec of closing...");
}
asyncQueue.put(record);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}   
return null;
}

public Future send(ProducerRecord record, Callback 
callback) {
throw new UnsupportedOperationException("Send not supported");
}

public List partitionsFor(String topic) {
// TODO Auto-generated method stub
return null;
}

public Map metrics() {

return producer.metrics();
}

public void close() {
close.compareAndSet(false, true);
// wait for drain threads to finish
try {
latch.await();
// now drain the remaining messages
while(!asyncQueue.isEmpty()){
ProducerRecord record  = asyncQueue.poll();
producer.send(record);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.close();
}

private class ConsumerThread implements Runnable{
public void run() {
try{
while(!close.get()){
ProducerRecord record;
try {
record = asyncQueue.take();
   

Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Guozhang Wang
Regarding 1634, I was intended to work on that after 1583 since it will
changes the commit offset request handling logic a lot. If people think
1583 is only a few days away before check-in, we can leave in in
0.8.2-beta; otherwise we can push to 0.8.3.

Guozhang

On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein  wrote:

> +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
>
> I agree to the tickets you brought up to have in 0.8.2-beta and also
> https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
>
> /***
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop
> /
> On Oct 16, 2014 12:55 PM, "Neha Narkhede"  wrote:
>
> > Another JIRA that will be nice to include as part of 0.8.2-beta is
> > https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
> > naming. Looking for people's thoughts on 2 things here -
> >
> > 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
> > final 4-5 weeks later?
> > 2. Do people want to include any JIRAs (other than the ones mentioned
> > above) in 0.8.2-beta? If so, it will be great to know now so it will
> allow
> > us to move forward with the beta release quickly.
> >
> > Thanks,
> > Neha
> >
> > On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
> > wrote:
> >
> > > Hi,
> > >
> > > We have accumulated an impressive list of pretty major features in
> 0.8.2
> > -
> > > Delete topic
> > > Automated leader rebalancing
> > > Controlled shutdown
> > > Offset management
> > > Parallel recovery
> > > min.isr and
> > > clean leader election
> > >
> > > In the past, what has worked for major feature releases is a beta
> release
> > > prior to a final release. I'm proposing we do the same for 0.8.2. The
> > only
> > > blockers for 0.8.2-beta, that I know of are -
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change
> and
> > > requires some thinking about the new dependency. Since it is not fully
> > > ready and there are things to think about, I suggest we take it out,
> > think
> > > it end to end and then include it in 0.8.3.)
> > > https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
> > > Guozhang Wang)
> > > https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
> > > waiting on a review by Joe Stein)
> > >
> > > It seems that 1634 and 1671 can get wrapped up in a week. Do people
> think
> > > we should cut 0.8.2-beta by next week?
> > >
> > > Thanks,
> > > Neha
> > >
> >
>



-- 
-- Guozhang


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Neha Narkhede
Thanks Guozhang. In that case, I'd vote for pushing 1634 out of 0.8.2-beta.

On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang  wrote:

> Regarding 1634, I was intended to work on that after 1583 since it will
> changes the commit offset request handling logic a lot. If people think
> 1583 is only a few days away before check-in, we can leave in in
> 0.8.2-beta; otherwise we can push to 0.8.3.
>
> Guozhang
>
> On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein  wrote:
>
> > +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
> >
> > I agree to the tickets you brought up to have in 0.8.2-beta and also
> > https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
> >
> > /***
> > Joe Stein
> > Founder, Principal Consultant
> > Big Data Open Source Security LLC
> > http://www.stealth.ly
> > Twitter: @allthingshadoop
> > /
> > On Oct 16, 2014 12:55 PM, "Neha Narkhede" 
> wrote:
> >
> > > Another JIRA that will be nice to include as part of 0.8.2-beta is
> > > https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
> > > naming. Looking for people's thoughts on 2 things here -
> > >
> > > 1. How do folks feel about doing a 0.8.2-beta release right now and
> 0.8.2
> > > final 4-5 weeks later?
> > > 2. Do people want to include any JIRAs (other than the ones mentioned
> > > above) in 0.8.2-beta? If so, it will be great to know now so it will
> > allow
> > > us to move forward with the beta release quickly.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > We have accumulated an impressive list of pretty major features in
> > 0.8.2
> > > -
> > > > Delete topic
> > > > Automated leader rebalancing
> > > > Controlled shutdown
> > > > Offset management
> > > > Parallel recovery
> > > > min.isr and
> > > > clean leader election
> > > >
> > > > In the past, what has worked for major feature releases is a beta
> > release
> > > > prior to a final release. I'm proposing we do the same for 0.8.2. The
> > > only
> > > > blockers for 0.8.2-beta, that I know of are -
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change
> > and
> > > > requires some thinking about the new dependency. Since it is not
> fully
> > > > ready and there are things to think about, I suggest we take it out,
> > > think
> > > > it end to end and then include it in 0.8.3.)
> > > > https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
> > > > Guozhang Wang)
> > > > https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
> > > > waiting on a review by Joe Stein)
> > > >
> > > > It seems that 1634 and 1671 can get wrapped up in a week. Do people
> > think
> > > > we should cut 0.8.2-beta by next week?
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Jun Rao
+1 on doing an 0.8.2 beta.

Guozhang,

kafka-1583 is relatively large. Given that we are getting close to
releasing 0.8.2 beta, my feeling is that we probably shouldn't include it
in 0.8.2 beta even if we can commit it in a few days.

Thanks,

Jun

On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang  wrote:

> Regarding 1634, I was intended to work on that after 1583 since it will
> changes the commit offset request handling logic a lot. If people think
> 1583 is only a few days away before check-in, we can leave in in
> 0.8.2-beta; otherwise we can push to 0.8.3.
>
> Guozhang
>
> On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein  wrote:
>
> > +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
> >
> > I agree to the tickets you brought up to have in 0.8.2-beta and also
> > https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
> >
> > /***
> > Joe Stein
> > Founder, Principal Consultant
> > Big Data Open Source Security LLC
> > http://www.stealth.ly
> > Twitter: @allthingshadoop
> > /
> > On Oct 16, 2014 12:55 PM, "Neha Narkhede" 
> wrote:
> >
> > > Another JIRA that will be nice to include as part of 0.8.2-beta is
> > > https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
> > > naming. Looking for people's thoughts on 2 things here -
> > >
> > > 1. How do folks feel about doing a 0.8.2-beta release right now and
> 0.8.2
> > > final 4-5 weeks later?
> > > 2. Do people want to include any JIRAs (other than the ones mentioned
> > > above) in 0.8.2-beta? If so, it will be great to know now so it will
> > allow
> > > us to move forward with the beta release quickly.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > We have accumulated an impressive list of pretty major features in
> > 0.8.2
> > > -
> > > > Delete topic
> > > > Automated leader rebalancing
> > > > Controlled shutdown
> > > > Offset management
> > > > Parallel recovery
> > > > min.isr and
> > > > clean leader election
> > > >
> > > > In the past, what has worked for major feature releases is a beta
> > release
> > > > prior to a final release. I'm proposing we do the same for 0.8.2. The
> > > only
> > > > blockers for 0.8.2-beta, that I know of are -
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change
> > and
> > > > requires some thinking about the new dependency. Since it is not
> fully
> > > > ready and there are things to think about, I suggest we take it out,
> > > think
> > > > it end to end and then include it in 0.8.3.)
> > > > https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
> > > > Guozhang Wang)
> > > > https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
> > > > waiting on a review by Joe Stein)
> > > >
> > > > It seems that 1634 and 1671 can get wrapped up in a week. Do people
> > think
> > > > we should cut 0.8.2-beta by next week?
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174356#comment-14174356
 ] 

James Oliver commented on KAFKA-1493:
-

Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress & decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by an implementation 
supporting some of the missing features. If this were to occur, a 
RuntimeException with detailed information will be thrown.

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174356#comment-14174356
 ] 

James Oliver edited comment on KAFKA-1493 at 10/16/14 10:23 PM:


Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress & decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by a more advanced 
implementation, using one or more of the missing features. If this were to 
occur, a RuntimeException with detailed information will be thrown.


was (Author: joliver):
Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress & decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by an implementation 
supporting some of the missing features. If this were to occur, a 
RuntimeException with detailed information will be thrown.

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Guozhang Wang
Agree.
On Oct 16, 2014 3:16 PM, "Jun Rao"  wrote:

> +1 on doing an 0.8.2 beta.
>
> Guozhang,
>
> kafka-1583 is relatively large. Given that we are getting close to
> releasing 0.8.2 beta, my feeling is that we probably shouldn't include it
> in 0.8.2 beta even if we can commit it in a few days.
>
> Thanks,
>
> Jun
>
> On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang  wrote:
>
> > Regarding 1634, I was intended to work on that after 1583 since it will
> > changes the commit offset request handling logic a lot. If people think
> > 1583 is only a few days away before check-in, we can leave in in
> > 0.8.2-beta; otherwise we can push to 0.8.3.
> >
> > Guozhang
> >
> > On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein  wrote:
> >
> > > +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
> > >
> > > I agree to the tickets you brought up to have in 0.8.2-beta and also
> > > https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
> > >
> > > /***
> > > Joe Stein
> > > Founder, Principal Consultant
> > > Big Data Open Source Security LLC
> > > http://www.stealth.ly
> > > Twitter: @allthingshadoop
> > > /
> > > On Oct 16, 2014 12:55 PM, "Neha Narkhede" 
> > wrote:
> > >
> > > > Another JIRA that will be nice to include as part of 0.8.2-beta is
> > > > https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the
> mbean
> > > > naming. Looking for people's thoughts on 2 things here -
> > > >
> > > > 1. How do folks feel about doing a 0.8.2-beta release right now and
> > 0.8.2
> > > > final 4-5 weeks later?
> > > > 2. Do people want to include any JIRAs (other than the ones mentioned
> > > > above) in 0.8.2-beta? If so, it will be great to know now so it will
> > > allow
> > > > us to move forward with the beta release quickly.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede <
> > neha.narkh...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > We have accumulated an impressive list of pretty major features in
> > > 0.8.2
> > > > -
> > > > > Delete topic
> > > > > Automated leader rebalancing
> > > > > Controlled shutdown
> > > > > Offset management
> > > > > Parallel recovery
> > > > > min.isr and
> > > > > clean leader election
> > > > >
> > > > > In the past, what has worked for major feature releases is a beta
> > > release
> > > > > prior to a final release. I'm proposing we do the same for 0.8.2.
> The
> > > > only
> > > > > blockers for 0.8.2-beta, that I know of are -
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major
> change
> > > and
> > > > > requires some thinking about the new dependency. Since it is not
> > fully
> > > > > ready and there are things to think about, I suggest we take it
> out,
> > > > think
> > > > > it end to end and then include it in 0.8.3.)
> > > > > https://issues.apache.org/jira/browse/KAFKA-1634 (This has an
> owner:
> > > > > Guozhang Wang)
> > > > > https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and
> is
> > > > > waiting on a review by Joe Stein)
> > > > >
> > > > > It seems that 1634 and 1671 can get wrapped up in a week. Do people
> > > think
> > > > > we should cut 0.8.2-beta by next week?
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174356#comment-14174356
 ] 

James Oliver edited comment on KAFKA-1493 at 10/16/14 10:30 PM:


Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress & decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by a more advanced 
implementation, using one or more of the missing features. If this were to 
occur, a RuntimeException with detailed information will be thrown.

EDIT: I can take out the TODOs if you think it causes confusion


was (Author: joliver):
Jun,

My pleasure. The TODOs are parts of the specification that are unimplemented, 
but are not required. I left them in there as hints if/when the spec is 
contributed back to lz4-java. The validation routines will disallow the use of 
any portion of the spec that is unimplemented, but it's totally usable.

What the spec can do - compress & decompress messages using 64kb/256kb/1mb/4mb 
blockSize (64kb by default) with optional block checksums (disabled by default)
What the spec cannot do - decompress messages compressed by a more advanced 
implementation, using one or more of the missing features. If this were to 
occur, a RuntimeException with detailed information will be thrown.

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174373#comment-14174373
 ] 

Ewen Cheslack-Postava commented on KAFKA-1710:
--

[~Bmis13] That approach just pushes the problem into KafkaAsyncProducer's 
thread that processes messages -- there won't be lock contention in 
KafkaProducer since KafkaAsyncProducer will be the only user of it, but you may 
not get an improvement in throughput because ultimately you're limited to the 
time a single thread can get. It may even get *slower* because you'll have more 
runnable threads at any given time, which means that the KafkaAsyncProducer 
worker thread will get less CPU time. Even disregarding that, since you used a 
LinkedBlockingQueue that will become your new source of contention (since it 
must be synchronized internally). If you have a very large capacity, that'll 
let the threads continue to make progress and contention will be lower since 
the time spent adding an item is very small, but it will cost a lot of memory 
since you're just adding a layer of buffering. That might be useful if you have 
bursty traffic (the buffer allows you to temporarily buffer more data while the 
KafkaProducer works on getting it sent), but if you have sustained traffic 
you'll just have constantly growing memory usage. If the capacity is small, 
then the threads producing messages will eventually end up getting blocked 
waiting for there to be space in the queue.

Probably the biggest issue here is that this test only writes to a single 
partition in a single topic. You could improve performance by using more 
partitions in that topic. You're already writing to all producers from all 
threads, so you must not need the ordering guarantees of a single partition. If 
you still want a single partition, you can improve performance by using more 
Producers, which will spread the contention across more queues. Since you 
already have 4 that you're running round-robin on, I'd guess adding more 
shouldn't be a problem.

In any case, this use case seems a bit odd. Are you really going to have 200 
threads generating messages *as fast as they can* with only 4 producers?

As far as this issue is concerned, the original report said the problem was 
deadlock but that doesn't seem to be the case. If you're just worried about 
performance, it probably makes more sense to move the discussion over to the 
mailing list. It'll probably be seen by more people and there will probably be 
multiple suggestions for improvements to your approach before we have to make 
changes to the Kafka code.

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurrent.ThreadPoolExecutor$Worker.run() 
> ThreadPoolExecutor.java:615
> java.lang.Thread.run() Thread.java:744
> pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPar

Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Joel Koshy
+1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2
so that will only go out with 0.8.3.

Joel

On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote:
> Agree.
> On Oct 16, 2014 3:16 PM, "Jun Rao"  wrote:
> 
> > +1 on doing an 0.8.2 beta.
> >
> > Guozhang,
> >
> > kafka-1583 is relatively large. Given that we are getting close to
> > releasing 0.8.2 beta, my feeling is that we probably shouldn't include it
> > in 0.8.2 beta even if we can commit it in a few days.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang  wrote:
> >
> > > Regarding 1634, I was intended to work on that after 1583 since it will
> > > changes the commit offset request handling logic a lot. If people think
> > > 1583 is only a few days away before check-in, we can leave in in
> > > 0.8.2-beta; otherwise we can push to 0.8.3.
> > >
> > > Guozhang
> > >
> > > On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein  wrote:
> > >
> > > > +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
> > > >
> > > > I agree to the tickets you brought up to have in 0.8.2-beta and also
> > > > https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
> > > >
> > > > /***
> > > > Joe Stein
> > > > Founder, Principal Consultant
> > > > Big Data Open Source Security LLC
> > > > http://www.stealth.ly
> > > > Twitter: @allthingshadoop
> > > > /
> > > > On Oct 16, 2014 12:55 PM, "Neha Narkhede" 
> > > wrote:
> > > >
> > > > > Another JIRA that will be nice to include as part of 0.8.2-beta is
> > > > > https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the
> > mbean
> > > > > naming. Looking for people's thoughts on 2 things here -
> > > > >
> > > > > 1. How do folks feel about doing a 0.8.2-beta release right now and
> > > 0.8.2
> > > > > final 4-5 weeks later?
> > > > > 2. Do people want to include any JIRAs (other than the ones mentioned
> > > > > above) in 0.8.2-beta? If so, it will be great to know now so it will
> > > > allow
> > > > > us to move forward with the beta release quickly.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede <
> > > neha.narkh...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > We have accumulated an impressive list of pretty major features in
> > > > 0.8.2
> > > > > -
> > > > > > Delete topic
> > > > > > Automated leader rebalancing
> > > > > > Controlled shutdown
> > > > > > Offset management
> > > > > > Parallel recovery
> > > > > > min.isr and
> > > > > > clean leader election
> > > > > >
> > > > > > In the past, what has worked for major feature releases is a beta
> > > > release
> > > > > > prior to a final release. I'm proposing we do the same for 0.8.2.
> > The
> > > > > only
> > > > > > blockers for 0.8.2-beta, that I know of are -
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major
> > change
> > > > and
> > > > > > requires some thinking about the new dependency. Since it is not
> > > fully
> > > > > > ready and there are things to think about, I suggest we take it
> > out,
> > > > > think
> > > > > > it end to end and then include it in 0.8.3.)
> > > > > > https://issues.apache.org/jira/browse/KAFKA-1634 (This has an
> > owner:
> > > > > > Guozhang Wang)
> > > > > > https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and
> > is
> > > > > > waiting on a review by Joe Stein)
> > > > > >
> > > > > > It seems that 1634 and 1671 can get wrapped up in a week. Do people
> > > > think
> > > > > > we should cut 0.8.2-beta by next week?
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >



[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174403#comment-14174403
 ] 

Bhavesh Mistry commented on KAFKA-1710:
---

[~ewencp],

Thanks for the looking into this issue.  We consume as fast as we can 
re-publish the message to another aggregated topic based on some kes in 
message. We  see thread contentions in profile tool and I separated out the 
code and to amplify the problem.  We run with about 75 threads.  [~ewencp] can 
you please discuss this issue with Kafka Community as well ?  The dead lock 
will occur something depending on Thread scheduling  and how log the are 
blocked.  All I am asking is there a better way to enqueue in coming messages.  
I just proposed simple above solution that does not impact application threads 
and only drain threads will be blocked and with buffer as you mentioned we 
might get better through-put (of course at expense of buffered memory 
(unbounded concurrent queue)  and thread context switching) .If you feel 
this is know performance issue to send to to single partition then please close 
this, and you may start discussion on Kafka Community for this issue.  Thanks 
for your help and suggestions  !! 

According to thread dumps, blocks are happening  in Synchronization block.  
{code}
"pool-1-thread-200" prio=5 tid=0x7f92451c2000 nid=0x20103 waiting for 
monitor entry [0x00012d228000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139)
- waiting to lock <0x000703ce39f0> (a java.util.ArrayDeque)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238)
at 
org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

"pool-1-thread-199" prio=5 tid=0x7f92451c1800 nid=0x1ff03 waiting for 
monitor entry [0x00012d0e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:139)
- waiting to lock <0x000703ce39f0> (a java.util.ArrayDeque)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:238)
at 
org.kafka.test.TestNetworkDownProducer$MyProducer.run(TestNetworkDownProducer.java:85)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
{code}

> [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
> being sent to single partition
> 
>
> Key: KAFKA-1710
> URL: https://issues.apache.org/jira/browse/KAFKA-1710
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
> Environment: Development
>Reporter: Bhavesh Mistry
>Priority: Critical
>  Labels: performance
> Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot 
> 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, 
> TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, 
> th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, 
> th6.dump, th7.dump, th8.dump, th9.dump
>
>
> Hi Kafka Dev Team,
> When I run the test to send message to single partition for 3 minutes or so 
> on, I have encounter deadlock (please see the screen attached) and thread 
> contention from YourKit profiling.  
> Use Case:
> 1)  Aggregating messages into same partition for metric counting. 
> 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
> Here is output:
> Frozen threads found (potential deadlock)
>  
> It seems that the following threads have not changed their stack for more 
> than 10 seconds.
> These threads are possibly (but not necessarily!) in a deadlock or hung.
>  
> pool-1-thread-128 <--- Frozen for at least 2m
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
>  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
> Callback) KafkaProducer.java:237
> org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
> TestNetworkDownProducer.java:84
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
> ThreadPoolExecutor.java:1145
> java.util.concurren

[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174462#comment-14174462
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

[~jkreps],

Did you get chance to re-produce the problem ?  Has someone else reported this 
issues or similar issue ?

Thanks,

Bhavesh 

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Jun Rao
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26658: Patch for KAFKA-1493

2014-10-16 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/#review57005
---


Thanks for the patch. Looks good overall. Some minor comments below.


clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java


Should this be private?



clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java


Should we flush after writing the end mark?



clients/src/main/java/org/apache/kafka/common/record/Compressor.java


Unused import.



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala


How about we make this 2000 so that we can test compression on more than 
64KB, which is the default block size of lz4?



core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala


How about also adding the following properties to test out compressing more 
than 64KB?
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
props.put(ProducerConfig.LINGER_MS_CONFIG, "200")


- Jun Rao


On Oct. 16, 2014, 8:50 p.m., James Oliver wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/26658/
> ---
> 
> (Updated Oct. 16, 2014, 8:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1493
> https://issues.apache.org/jira/browse/KAFKA-1493
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1493 Implement LZ4 Frame I/O Streams
> 
> 
> KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
>   
> clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
> 5227b2d7ab803389d1794f48c8232350c05b14fd 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> 0323f5f7032dceb49d820c17a41b78c56591ffc4 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 
>   config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
>   core/src/main/scala/kafka/message/CompressionCodec.scala 
> de0a0fade5387db63299c6b112b3c9a5e41d82ec 
>   core/src/main/scala/kafka/message/CompressionFactory.scala 
> 8420e13d0d8680648df78f22ada4a0d4e3ab8758 
>   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
> b024a693c23cb21f1efe405ed414bf23f3974f31 
>   core/src/main/scala/kafka/tools/PerfConfig.scala 
> c72002976d90416559090a665f6494072a6c2dec 
>   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
> c95485170fd8b4f5faad740f049e5d09aca8829d 
>   core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
> 6f0addcea64f1e78a4de50ec8135f4d02cebd305 
>   core/src/test/scala/unit/kafka/message/MessageTest.scala 
> 958c1a60069ad85ae20f5c58e74679cd9fa6f70e 
> 
> Diff: https://reviews.apache.org/r/26658/diff/
> 
> 
> Testing
> ---
> 
> ./gradlew test
> All tests passed
> 
> 
> Thanks,
> 
> James Oliver
> 
>



[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174473#comment-14174473
 ] 

Jun Rao commented on KAFKA-1493:


James,

Thanks for the answer. We can leave the TODOs there. The patch looks good to 
me. Could you look at the comments in the RB?

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 24676: Rebase KAFKA-1583

2014-10-16 Thread Guozhang Wang


> On Oct. 16, 2014, 1:29 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 167
> > 
> >
> > Should "replica manager" be "offset manager"?

This is "replica manager" actually, when it tries to write the commit message 
to the local log. I have changed the comment a bit to make it more clear.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review56843
---


On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> ---
> 
> (Updated Oct. 14, 2014, 2:42 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
> https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + 
> minor changes
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 
> b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 
> a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 
> 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 
> a123cdc52f341a802b3e4bfeb29a6154332e5f73 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala 
> e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 
> 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
> ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 67f2833804cb15976680e42b9dc49e275c89d266 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
> d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala 
> d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
> fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
> 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
> a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
> 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> ---
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Updated] (KAFKA-1583) Kafka API Refactoring

2014-10-16 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1583:
-
Attachment: KAFKA-1583_2014-10-16_21:15:40.patch

> Kafka API Refactoring
> -
>
> Key: KAFKA-1583
> URL: https://issues.apache.org/jira/browse/KAFKA-1583
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
> KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
> KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
> KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
> KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch
>
>
> This is the next step of KAFKA-1430. Details can be found at this page:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 24676: Fix KAFKA-1583

2014-10-16 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 17, 2014, 4:15 a.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1583


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incoporate Jun's comments after rebase


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



[jira] [Commented] (KAFKA-1583) Kafka API Refactoring

2014-10-16 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174709#comment-14174709
 ] 

Guozhang Wang commented on KAFKA-1583:
--

Updated reviewboard https://reviews.apache.org/r/24676/diff/
 against branch origin/trunk

> Kafka API Refactoring
> -
>
> Key: KAFKA-1583
> URL: https://issues.apache.org/jira/browse/KAFKA-1583
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
> KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
> KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
> KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
> KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch
>
>
> This is the next step of KAFKA-1430. Details can be found at this page:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26658: Patch for KAFKA-1493

2014-10-16 Thread James Oliver

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26658/
---

(Updated Oct. 17, 2014, 4:25 a.m.)


Review request for kafka.


Bugs: KAFKA-1493
https://issues.apache.org/jira/browse/KAFKA-1493


Repository: kafka


Description (updated)
---

KAFKA-1493 Implement LZ4 Frame I/O Streams


KAFKA-1493 Add utils functions, tweak test cases and OutputStream construction


KAFKA-1493 Flush stream after writing frame end mark


KAFKA-1493 Remove unused import


KAFKA-1493 Move finish() logic into close()


KAFKA-1493 Modify test cases to compress a >64kb message to test multi-block 
lz4 frame compression/decompression


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
bf4ed66791b9a502aae6cb2ec7681f42732d9a43 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
5227b2d7ab803389d1794f48c8232350c05b14fd 
  clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
0323f5f7032dceb49d820c17a41b78c56591ffc4 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
a0827f576e8c38b1bd828cf0d6aefff9fd5ecc22 
  config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
  core/src/main/scala/kafka/message/CompressionCodec.scala 
de0a0fade5387db63299c6b112b3c9a5e41d82ec 
  core/src/main/scala/kafka/message/CompressionFactory.scala 
8420e13d0d8680648df78f22ada4a0d4e3ab8758 
  core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b024a693c23cb21f1efe405ed414bf23f3974f31 
  core/src/main/scala/kafka/tools/PerfConfig.scala 
c72002976d90416559090a665f6494072a6c2dec 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
c95485170fd8b4f5faad740f049e5d09aca8829d 
  core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
6f0addcea64f1e78a4de50ec8135f4d02cebd305 
  core/src/test/scala/unit/kafka/message/MessageTest.scala 
958c1a60069ad85ae20f5c58e74679cd9fa6f70e 

Diff: https://reviews.apache.org/r/26658/diff/


Testing
---

./gradlew test
All tests passed


Thanks,

James Oliver



[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174716#comment-14174716
 ] 

James Oliver commented on KAFKA-1493:
-

Updated reviewboard https://reviews.apache.org/r/26658/diff/
 against branch origin/trunk

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch, KAFKA-1493_2014-10-16_21:25:23.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option

2014-10-16 Thread James Oliver (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Oliver updated KAFKA-1493:

Attachment: KAFKA-1493_2014-10-16_21:25:23.patch

> Use a well-documented LZ4 compression format and remove redundant LZ4HC option
> --
>
> Key: KAFKA-1493
> URL: https://issues.apache.org/jira/browse/KAFKA-1493
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1493.patch, KAFKA-1493.patch, 
> KAFKA-1493_2014-10-16_13:49:34.patch, KAFKA-1493_2014-10-16_21:25:23.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)