[jira] [Commented] (KAFKA-2091) Expose a Partitioner interface in the new producer

2015-04-23 Thread Gianmarco De Francisci Morales (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14508649#comment-14508649
 ] 

Gianmarco De Francisci Morales commented on KAFKA-2091:
---

Hi,
I think no KIP has been written yet. I can start writing one, but I need 
permissions on the wiki.

> Expose a Partitioner interface in the new producer
> --
>
> Key: KAFKA-2091
> URL: https://issues.apache.org/jira/browse/KAFKA-2091
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
> Attachments: KAFKA-2091.patch
>
>
> In the new producer you can pass in a key or hard code the partition as part 
> of ProducerRecord.
> Internally we are using a class
> {code}
> class Partitioner {
> public int partition(String topic, byte[] key, Integer partition, Cluster 
> cluster) {...}
> }
> {code}
> This class uses the specified partition if there is one; uses a hash of the 
> key if there isn't a partition but there is a key; and simply chooses a 
> partition round robin if there is neither a partition nor a key.
> However there are several partitioning strategies that could be useful that 
> we don't support out of the box. 
> An example would be having each producer periodically choose a random 
> partition. This tends to be the most efficient since all data goes to one 
> server and uses the fewest TCP connections, however it only produces good 
> load balancing if there are many producers.
> Of course a user can do this now by just setting the partition manually, but 
> that is a bit inconvenient if you need to do that across a bunch of apps 
> since each will need to remember to set the partition every time.
> The idea would be to expose a configuration to set the partitioner 
> implementation like
> {code}
> partitioner.class=org.apache.kafka.producer.DefaultPartitioner
> {code}
> This would default to the existing partitioner implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2134) Producer blocked on metric publish

2015-04-23 Thread Vamsi Subhash Achanta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vamsi Subhash Achanta updated KAFKA-2134:
-
Priority: Blocker  (was: Major)

> Producer blocked on metric publish
> --
>
> Key: KAFKA-2134
> URL: https://issues.apache.org/jira/browse/KAFKA-2134
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.1
> Environment: debian7, java8
>Reporter: Vamsi Subhash Achanta
>Assignee: Jun Rao
>Priority: Blocker
>
> Hi,
> We have a REST api to publish to a topic. Yesterday, we started noticing that 
> the producer is not able to produce messages at a good rate and the 
> CLOSE_WAITs of our producer REST app are very high. All the producer REST 
> requests are hence timing out.
> When we took the thread dump and analysed it, we noticed that the threads are 
> getting blocked on JmxReporter metricChange. Here is the attached stack trace.
> "dw-70 - POST /queues/queue_1/messages" #70 prio=5 os_prio=0 
> tid=0x7f043c8bd000 nid=0x54cf waiting for monitor entry 
> [0x7f04363c7000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76)
> - waiting to lock <0x0005c1823860> (a java.lang.Object)
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:182)
> - locked <0x0007a5e526c8> (a 
> org.apache.kafka.common.metrics.Metrics)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:165)
> - locked <0x0007a5e526e8> (a 
> org.apache.kafka.common.metrics.Sensor)
> When I looked at the code of metricChange method, it uses a synchronised 
> block on an object resource and it seems that it is held by another.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2134) Producer blocked on metric publish

2015-04-23 Thread Vamsi Subhash Achanta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14509083#comment-14509083
 ] 

Vamsi Subhash Achanta commented on KAFKA-2134:
--

We noticed this intermittently. What could be the problem?

> Producer blocked on metric publish
> --
>
> Key: KAFKA-2134
> URL: https://issues.apache.org/jira/browse/KAFKA-2134
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.1
> Environment: debian7, java8
>Reporter: Vamsi Subhash Achanta
>Assignee: Jun Rao
>Priority: Blocker
>
> Hi,
> We have a REST api to publish to a topic. Yesterday, we started noticing that 
> the producer is not able to produce messages at a good rate and the 
> CLOSE_WAITs of our producer REST app are very high. All the producer REST 
> requests are hence timing out.
> When we took the thread dump and analysed it, we noticed that the threads are 
> getting blocked on JmxReporter metricChange. Here is the attached stack trace.
> "dw-70 - POST /queues/queue_1/messages" #70 prio=5 os_prio=0 
> tid=0x7f043c8bd000 nid=0x54cf waiting for monitor entry 
> [0x7f04363c7000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76)
> - waiting to lock <0x0005c1823860> (a java.lang.Object)
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:182)
> - locked <0x0007a5e526c8> (a 
> org.apache.kafka.common.metrics.Metrics)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:165)
> - locked <0x0007a5e526e8> (a 
> org.apache.kafka.common.metrics.Sensor)
> When I looked at the code of metricChange method, it uses a synchronised 
> block on an object resource and it seems that it is held by another.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2091) Expose a Partitioner interface in the new producer

2015-04-23 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14509101#comment-14509101
 ] 

Sriharsha Chintalapani commented on KAFKA-2091:
---

[~azaroth] Here is the KIP 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
 . Posted a discussion thread as well.

> Expose a Partitioner interface in the new producer
> --
>
> Key: KAFKA-2091
> URL: https://issues.apache.org/jira/browse/KAFKA-2091
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
> Attachments: KAFKA-2091.patch
>
>
> In the new producer you can pass in a key or hard code the partition as part 
> of ProducerRecord.
> Internally we are using a class
> {code}
> class Partitioner {
> public int partition(String topic, byte[] key, Integer partition, Cluster 
> cluster) {...}
> }
> {code}
> This class uses the specified partition if there is one; uses a hash of the 
> key if there isn't a partition but there is a key; and simply chooses a 
> partition round robin if there is neither a partition nor a key.
> However there are several partitioning strategies that could be useful that 
> we don't support out of the box. 
> An example would be having each producer periodically choose a random 
> partition. This tends to be the most efficient since all data goes to one 
> server and uses the fewest TCP connections, however it only produces good 
> load balancing if there are many producers.
> Of course a user can do this now by just setting the partition manually, but 
> that is a bit inconvenient if you need to do that across a bunch of apps 
> since each will need to remember to set the partition every time.
> The idea would be to expose a configuration to set the partitioner 
> implementation like
> {code}
> partitioner.class=org.apache.kafka.producer.DefaultPartitioner
> {code}
> This would default to the existing partitioner implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Sriharsha Chintalapani
Hi,
Here is the KIP for adding a partitioner interface for producer.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
There is one open question about how interface should look like. Please take a 
look and let me know if you prefer one way or the other.

Thanks,
Harsha



Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Jay Kreps
Hey Harsha,

A few comments:

Can you finish up the KIP there are some unfinished sentences and odd
whitespace things going on.

Here are the questions I think we should consider:
1. Do we need this at all given that we have the partition argument in
ProducerRecord which gives full control? I think we do need it because this
is a way to plug in a different partitioning strategy at run time and do it
in a fairly transparent way.
2. We certainly need to add both the serialized and unserialized form for
the key as both are useful.
3. Do we need to add the value? I suspect people will have uses for
computing something off a few fields in the value to choose the partition.
This would be useful in cases where the key was being used for log
compaction purposes and did not contain the full information for computing
the partition.
4. This interface doesn't include either an init() or close() method. It
should implement Closable and Configurable, right?
5. What happens if the user both sets the partition id in the
ProducerRecord and sets a partitioner? Does the partition id just get
passed in to the partitioner (as sort of implied in this interface?). This
is a bit weird since if you pass in the partition id you kind of expect it
to get used, right? Or is it the case that if you specify a partition the
partitioner isn't used at all (in which case no point in including
partition in the Partitioner api).

Cheers,

-Jay

On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani 
wrote:

> Hi,
> Here is the KIP for adding a partitioner interface for producer.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
> There is one open question about how interface should look like. Please
> take a look and let me know if you prefer one way or the other.
>
> Thanks,
> Harsha
>
>


[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-04-23 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14509311#comment-14509311
 ] 

Jay Kreps commented on KAFKA-1646:
--

So [~waldenchen] next steps for this patch:
1. KIP is under discussion
2. This patch still includes no testing at all, right? We definitely need to 
remedy that
3. It would be good to get perf results for Windows/NTFS + Linux/ext4

Also, are you guys using this patch in production? If so what has been the 
experience?

> Improve consumer read performance for Windows
> -
>
> Key: KAFKA-1646
> URL: https://issues.apache.org/jira/browse/KAFKA-1646
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.1.1
> Environment: Windows
>Reporter: xueqiang wang
>Assignee: xueqiang wang
>  Labels: newbie, patch
> Attachments: Improve consumer read performance for Windows.patch, 
> KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
> KAFKA-1646.patch, KAFKA-1646_20141216_163008.patch, 
> KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch, 
> KAFKA-1646_20150414_035415.patch, KAFKA-1646_20150414_184503.patch, 
> KAFKA-1646_20150422.patch
>
>
> This patch is for Window platform only. In Windows platform, if there are 
> more than one replicas writing to disk, the segment log files will not be 
> consistent in disk and then consumer reading performance will be dropped down 
> greatly. This fix allocates more disk spaces when rolling a new segment, and 
> then it will improve the consumer reading performance in NTFS file system.
> This patch doesn't affect file allocation of other filesystems, for it only 
> adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-23 Thread Jay Kreps
Yeah if we understand the optimal policy for a config we always want to set
it automatically. In this case I don't think we do yet, but down the road
that could be nice. I think for now we should consider this option
experimental to give people a chance to try it out.

-Jay

On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen 
wrote:

> Hi Roshan,
> Use the 'auto' value maybe will break the rule and mess up the
> configuration. @Jay, any thoughts?
>
> Thanks, Honghai Chen
>
> -Original Message-
> From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm]
> Sent: Thursday, April 23, 2015 6:27 AM
> To: dev@kafka.apache.org; Roshan Naik
> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
> performance under windows and some old Linux file system
>
> +1 (non-binding).
>
> --
> Harsha
>
>
> On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com)
> wrote:
>
> I see that it is safe to keep it this off by default due to some concerns.
> Eventually, for settings such as this whose 'preferred' value is platform
> specific (or based on other criteria), it might be worth considering
> having a default value that is not a constant but an 'auto' value .. When
> kafka boots up it can automatically use the preferred value. Ofcourse it
> would have to documented as to what auto means for a given platform.
>
> -roshan
>
>
> On 4/22/15 1:21 PM, "Jakob Homan"  wrote:
>
> >+1. This is an important performance fix for Windows-based clusters.
> >
> >-Jakob
> >
> >On 22 April 2015 at 03:25, Honghai Chen 
> >wrote:
> >> Fix the issue Sriram mentioned. Code review and jira/KIP updated.
> >>
> >> Below are detail description for the scenarios:
> >> 1.If do clear shutdown, the last log file will be truncated to its
> >>real size since the close() function of FileMessageSet will call trim(),
> >> 2.If crash, then when restart, will go through the process of
> >>recover() and the last log file will be truncate to its real size, (and
> >>the position will be moved to end of the file)
> >> 3.When service start and open existing file
> >> a.Will run the LogSegment constructor which has NO parameter
> >>"preallocate",
> >> b.Then in FileMessageSet, the "end" in FileMessageSet will be
> >>Int.MaxValue, and then
> >>"channel.position(math.min(channel.size().toInt, end))" will make the
> >>position be end of the file,
> >> c.If recover needed, the recover function will truncate file to end of
> >>valid data, and also move the position to it,
> >>
> >> 4.When service running and need create new log segment and new
> >>FileMessageSet
> >>
> >> a.If preallocate = truei.the "end" in FileMessageSet will be 0, the
> >>file size will be "initFileSize", and then
> >>"channel.position(math.min(channel.size().toInt, end))" will make the
> >>position be 0,
> >>
> >> b.Else if preallocate = falsei.backward compatible, the "end" in
> >>FileMessageSet will be Int.MaxValue, the file size will be "0", and
> >>then "channel.position(math.min(channel.size().toInt, end))" will make
> >>the position be 0,
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre
> >>allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
> >>file+system
> >> https://issues.apache.org/jira/browse/KAFKA-1646
> >> https://reviews.apache.org/r/33204/diff/2/
> >>
> >> Thanks, Honghai Chen
> >> http://aka.ms/kafka
> >> http://aka.ms/manifold
> >>
> >> -Original Message-
> >> From: Honghai Chen
> >> Sent: Wednesday, April 22, 2015 11:12 AM
> >> To: dev@kafka.apache.org
> >> Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume
> >>performance under windows and some old Linux file system
> >>
> >> Hi Sriram,
> >> One sentence of code missed, will update code review board and
> >>KIP soon.
> >> For LogSegment and FileMessageSet, must use different
> >>constructor function for existing file and new file, then the code "
> >>channel.position(math.min(channel.size().toInt, end)) " will make sure
> >>the position at end of existing file.
> >>
> >> Thanks, Honghai Chen
> >>
> >> -Original Message-
> >> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> >> Sent: Wednesday, April 22, 2015 5:22 AM
> >> To: dev@kafka.apache.org
> >> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
> >>performance under windows and some old Linux file system
> >>
> >> My understanding of the patch is that clean shutdown truncates the file
> >>back to it's true size (and reallocates it on startup). Hard crash is
> >>handled by the normal recovery which should truncate off the empty
> >>portion of the file.
> >>
> >> On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian <
> >>srsubraman...@linkedin.com.invalid> wrote:
> >>
> >>> Could you describe how recovery works in this mode? Say, we had a 250
> >>> MB preallocated segment and we wrote till 50MB and crashed. Till what
> >>> point do we recover? Also, on startup, how is the append end pointer
> >>> set even on a clea

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-23 Thread Sriram Subramanian
+1 

Some information on how this will be tested would be useful.

On 4/23/15 9:33 AM, "Jay Kreps"  wrote:

>Yeah if we understand the optimal policy for a config we always want to
>set
>it automatically. In this case I don't think we do yet, but down the road
>that could be nice. I think for now we should consider this option
>experimental to give people a chance to try it out.
>
>-Jay
>
>On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen 
>wrote:
>
>> Hi Roshan,
>> Use the 'auto' value maybe will break the rule and mess up the
>> configuration. @Jay, any thoughts?
>>
>> Thanks, Honghai Chen
>>
>> -Original Message-
>> From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm]
>> Sent: Thursday, April 23, 2015 6:27 AM
>> To: dev@kafka.apache.org; Roshan Naik
>> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
>> performance under windows and some old Linux file system
>>
>> +1 (non-binding).
>>
>> --
>> Harsha
>>
>>
>> On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com)
>> wrote:
>>
>> I see that it is safe to keep it this off by default due to some
>>concerns.
>> Eventually, for settings such as this whose 'preferred' value is
>>platform
>> specific (or based on other criteria), it might be worth considering
>> having a default value that is not a constant but an 'auto' value ..
>>When
>> kafka boots up it can automatically use the preferred value. Ofcourse it
>> would have to documented as to what auto means for a given platform.
>>
>> -roshan
>>
>>
>> On 4/22/15 1:21 PM, "Jakob Homan"  wrote:
>>
>> >+1. This is an important performance fix for Windows-based clusters.
>> >
>> >-Jakob
>> >
>> >On 22 April 2015 at 03:25, Honghai Chen 
>> >wrote:
>> >> Fix the issue Sriram mentioned. Code review and jira/KIP updated.
>> >>
>> >> Below are detail description for the scenarios:
>> >> 1.If do clear shutdown, the last log file will be truncated to its
>> >>real size since the close() function of FileMessageSet will call
>>trim(),
>> >> 2.If crash, then when restart, will go through the process of
>> >>recover() and the last log file will be truncate to its real size,
>>(and
>> >>the position will be moved to end of the file)
>> >> 3.When service start and open existing file
>> >> a.Will run the LogSegment constructor which has NO parameter
>> >>"preallocate",
>> >> b.Then in FileMessageSet, the "end" in FileMessageSet will be
>> >>Int.MaxValue, and then
>> >>"channel.position(math.min(channel.size().toInt, end))" will make the
>> >>position be end of the file,
>> >> c.If recover needed, the recover function will truncate file to end
>>of
>> >>valid data, and also move the position to it,
>> >>
>> >> 4.When service running and need create new log segment and new
>> >>FileMessageSet
>> >>
>> >> a.If preallocate = truei.the "end" in FileMessageSet will be 0, the
>> >>file size will be "initFileSize", and then
>> >>"channel.position(math.min(channel.size().toInt, end))" will make the
>> >>position be 0,
>> >>
>> >> b.Else if preallocate = falsei.backward compatible, the "end" in
>> >>FileMessageSet will be Int.MaxValue, the file size will be "0", and
>> >>then "channel.position(math.min(channel.size().toInt, end))" will make
>> >>the position be 0,
>> >>
>> >>
>> >>
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre
>> 
allocate+to+improve+consume+performance+under+windows+and+some+old+Linu
x+
>> >>file+system
>> >> https://issues.apache.org/jira/browse/KAFKA-1646
>> >> https://reviews.apache.org/r/33204/diff/2/
>> >>
>> >> Thanks, Honghai Chen
>> >> http://aka.ms/kafka
>> >> http://aka.ms/manifold
>> >>
>> >> -Original Message-
>> >> From: Honghai Chen
>> >> Sent: Wednesday, April 22, 2015 11:12 AM
>> >> To: dev@kafka.apache.org
>> >> Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve
>>consume
>> >>performance under windows and some old Linux file system
>> >>
>> >> Hi Sriram,
>> >> One sentence of code missed, will update code review board and
>> >>KIP soon.
>> >> For LogSegment and FileMessageSet, must use different
>> >>constructor function for existing file and new file, then the code "
>> >>channel.position(math.min(channel.size().toInt, end)) " will make sure
>> >>the position at end of existing file.
>> >>
>> >> Thanks, Honghai Chen
>> >>
>> >> -Original Message-
>> >> From: Jay Kreps [mailto:jay.kr...@gmail.com]
>> >> Sent: Wednesday, April 22, 2015 5:22 AM
>> >> To: dev@kafka.apache.org
>> >> Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
>>consume
>> >>performance under windows and some old Linux file system
>> >>
>> >> My understanding of the patch is that clean shutdown truncates the
>>file
>> >>back to it's true size (and reallocates it on startup). Hard crash is
>> >>handled by the normal recovery which should truncate off the empty
>> >>portion of the file.
>> >>
>> >> On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian <
>> >>srsubraman...@linkedin.com.invalid> w

[jira] [Created] (KAFKA-2142) Follow-up patch for KAFKA-2138 Refactor the drain message logic in new producer

2015-04-23 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-2142:
---

 Summary: Follow-up patch for KAFKA-2138 Refactor the drain message 
logic in new producer
 Key: KAFKA-2142
 URL: https://issues.apache.org/jira/browse/KAFKA-2142
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin


This is the follow up patch for KAFKA-2138. Currently the logic for sender to 
drain message from accumulator is a little bit awkward, we want to refactor it 
a little bit. Copy/Paste Guozhang's suggestion below:
{quote}
1. while handle metadata response and update the metadata, check for ANY 
partitions if their leader is not known; if there is set 
metadata.requestUpdate. So we do not need to do this step anymore at the start 
of run().
2. get all the ready nodes based on their connection state only (i.e. no 
peeking in RecordAccumulator), and record the node_backoff as min 
(reconnection_backoff - time_waited) of all nodes; if one of these node is 
connected or connecting, this backoff should be 0.
3. for each of ready nodes, try to drain their corresponding partitions in 
RecordAccumulator while considering or kinds of conditions (full, expired, 
exhausted, etc...), and record the data_backoff as min (retry_backoff - 
time_waited) of all partitions; if one of the partitions is immediately 
sendable, this backoff should be 0.
4. formulate produce request and call client.poll() with timeout = 
reconnection_backoff > 0 ? recconection_backoff : retry_backoff.
5. in NetworkClient.poll(), the logic of "maybeUpdateMetadata" while update 
metadataTimeout can also be simplified.
{quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2077) Add ability to specify a TopicPicker class for KafkaLog4jApender

2015-04-23 Thread Benoy Antony (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14509468#comment-14509468
 ] 

Benoy Antony commented on KAFKA-2077:
-

Could I please get a review of this enhancement ?

> Add ability to specify a TopicPicker class for KafkaLog4jApender
> 
>
> Key: KAFKA-2077
> URL: https://issues.apache.org/jira/browse/KAFKA-2077
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Benoy Antony
>Assignee: Jun Rao
> Attachments: KAFKA-2077.patch, kafka-2077-001.patch
>
>
> KafkaLog4jAppender is the Log4j Appender to publish messages to Kafka. 

> Currently , a topic name has to be passed as a parameter. In some use cases, 
> it may be required to use a different topics for the same appender instance. 

> So it may be beneficial to enable KafkaLog4jAppender to accept TopicClass 
> which will provide a topic for a given message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2143) Replicas get ahead of leader and fail

2015-04-23 Thread Evan Huus (JIRA)
Evan Huus created KAFKA-2143:


 Summary: Replicas get ahead of leader and fail
 Key: KAFKA-2143
 URL: https://issues.apache.org/jira/browse/KAFKA-2143
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Neha Narkhede


On a cluster of 6 nodes, we recently saw a case where a single under-replicated 
partition suddenly appeared, replication lag spiked, and network IO spiked. The 
cluster appeared to recover eventually on its own,

Looking at the logs, the thing which failed was partition 7 of the topic 
{{background_queue}}. It had an ISR of 1,4,3 and its leader at the time was 3. 
Here are the interesting log lines:

On node 3 (the leader):
{noformat}
[2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when 
processing fetch request for partition [background_queue,7] offset 3722949957 
from follower with correlation id 148185816. Possible cause: Request for offset 
3722949957 but we only have log segments in the range 3648049863 to 3722949955. 
(kafka.server.ReplicaManager)
[2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when 
processing fetch request for partition [background_queue,7] offset 3722949957 
from follower with correlation id 156007054. Possible cause: Request for offset 
3722949957 but we only have log segments in the range 3648049863 to 3722949955. 
(kafka.server.ReplicaManager)
[2015-04-23 16:50:13,960] INFO Partition [background_queue,7] on broker 3: 
Shrinking ISR for partition [background_queue,7] from 1,4,3 to 3 
(kafka.cluster.Partition)
{noformat}

And on nodes 1 and 4 (the replicas) many occurrences of the following:
{noformat}
[2015-04-23 16:50:05,935] INFO Scheduling log segment 3648049863 for log 
background_queue-7 for deletion. (kafka.log.Log) (edited)
{noformat}

Based on my reading, this looks like the replicas somehow got *ahead* of the 
leader, asked for an invalid offset, got confused, and re-replicated the entire 
topic from scratch to recover (this matches our network graphs, which show 3 
sending a bunch of data to 1 and 4).

Taking a stab in the dark at the cause, there appears to be a race condition 
where replicas can receive a new offset before the leader has committed it and 
is ready to replicate?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2143) Replicas get ahead of leader and fail

2015-04-23 Thread Evan Huus (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Huus updated KAFKA-2143:
-
Description: 
On a cluster of 6 nodes, we recently saw a case where a single under-replicated 
partition suddenly appeared, replication lag spiked, and network IO spiked. The 
cluster appeared to recover eventually on its own,

Looking at the logs, the thing which failed was partition 7 of the topic 
{{background_queue}}. It had an ISR of 1,4,3 and its leader at the time was 3. 
Here are the interesting log lines:

On node 3 (the leader):
{noformat}
[2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when 
processing fetch request for partition [background_queue,7] offset 3722949957 
from follower with correlation id 148185816. Possible cause: Request for offset 
3722949957 but we only have log segments in the range 3648049863 to 3722949955. 
(kafka.server.ReplicaManager)
[2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when 
processing fetch request for partition [background_queue,7] offset 3722949957 
from follower with correlation id 156007054. Possible cause: Request for offset 
3722949957 but we only have log segments in the range 3648049863 to 3722949955. 
(kafka.server.ReplicaManager)
[2015-04-23 16:50:13,960] INFO Partition [background_queue,7] on broker 3: 
Shrinking ISR for partition [background_queue,7] from 1,4,3 to 3 
(kafka.cluster.Partition)
{noformat}

Note that both replicas suddenly asked for an offset *ahead* of the available 
offsets.

And on nodes 1 and 4 (the replicas) many occurrences of the following:
{noformat}
[2015-04-23 16:50:05,935] INFO Scheduling log segment 3648049863 for log 
background_queue-7 for deletion. (kafka.log.Log) (edited)
{noformat}

Based on my reading, this looks like the replicas somehow got *ahead* of the 
leader, asked for an invalid offset, got confused, and re-replicated the entire 
topic from scratch to recover (this matches our network graphs, which show 3 
sending a bunch of data to 1 and 4).

Taking a stab in the dark at the cause, there appears to be a race condition 
where replicas can receive a new offset before the leader has committed it and 
is ready to replicate?

  was:
On a cluster of 6 nodes, we recently saw a case where a single under-replicated 
partition suddenly appeared, replication lag spiked, and network IO spiked. The 
cluster appeared to recover eventually on its own,

Looking at the logs, the thing which failed was partition 7 of the topic 
{{background_queue}}. It had an ISR of 1,4,3 and its leader at the time was 3. 
Here are the interesting log lines:

On node 3 (the leader):
{noformat}
[2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when 
processing fetch request for partition [background_queue,7] offset 3722949957 
from follower with correlation id 148185816. Possible cause: Request for offset 
3722949957 but we only have log segments in the range 3648049863 to 3722949955. 
(kafka.server.ReplicaManager)
[2015-04-23 16:50:05,879] ERROR [Replica Manager on Broker 3]: Error when 
processing fetch request for partition [background_queue,7] offset 3722949957 
from follower with correlation id 156007054. Possible cause: Request for offset 
3722949957 but we only have log segments in the range 3648049863 to 3722949955. 
(kafka.server.ReplicaManager)
[2015-04-23 16:50:13,960] INFO Partition [background_queue,7] on broker 3: 
Shrinking ISR for partition [background_queue,7] from 1,4,3 to 3 
(kafka.cluster.Partition)
{noformat}

And on nodes 1 and 4 (the replicas) many occurrences of the following:
{noformat}
[2015-04-23 16:50:05,935] INFO Scheduling log segment 3648049863 for log 
background_queue-7 for deletion. (kafka.log.Log) (edited)
{noformat}

Based on my reading, this looks like the replicas somehow got *ahead* of the 
leader, asked for an invalid offset, got confused, and re-replicated the entire 
topic from scratch to recover (this matches our network graphs, which show 3 
sending a bunch of data to 1 and 4).

Taking a stab in the dark at the cause, there appears to be a race condition 
where replicas can receive a new offset before the leader has committed it and 
is ready to replicate?


> Replicas get ahead of leader and fail
> -
>
> Key: KAFKA-2143
> URL: https://issues.apache.org/jira/browse/KAFKA-2143
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Evan Huus
>Assignee: Neha Narkhede
>
> On a cluster of 6 nodes, we recently saw a case where a single 
> under-replicated partition suddenly appeared, replication lag spiked, and 
> network IO spiked. The cluster appeared to recover eventually on its own,
> Looking at the logs, the thing which failed was partition 7 of the topic 
> {{background_queue}}. It ha

[jira] [Created] (KAFKA-2144) Discuss benefits of adding a top level Topic class containing all the topic level details.

2015-04-23 Thread Parth Brahmbhatt (JIRA)
Parth Brahmbhatt created KAFKA-2144:
---

 Summary: Discuss benefits of adding a top level Topic class 
containing all the topic level details.
 Key: KAFKA-2144
 URL: https://issues.apache.org/jira/browse/KAFKA-2144
 Project: Kafka
  Issue Type: Improvement
Reporter: Parth Brahmbhatt
Priority: Minor


This is a by product of discussion that happenned as part of 
https://issues.apache.org/jira/browse/KAFKA-2035. We should discuss if adding a 
top level Topic entity containing all the topic related details will make sense 
in long run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2035) Add a topic config cache.

2015-04-23 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt resolved KAFKA-2035.
-
Resolution: Invalid

> Add a topic config cache.
> -
>
> Key: KAFKA-2035
> URL: https://issues.apache.org/jira/browse/KAFKA-2035
> Project: Kafka
>  Issue Type: Task
>Reporter: Parth Brahmbhatt
>Assignee: Parth Brahmbhatt
> Attachments: KAFKA-2035_2015-03-31_10:52:12.patch
>
>
> Currently the topic config is all about Log configuration so we have a 
> TopicConfigManager which takes in a Log instance and keeps updating that 
> instance's config instance as and when the topic config is updated. The topic 
> config update notifications are sent using zk watchers by Controller.
> I propose to introduce a TopicConfigCache which will be updated by 
> TopicConfigManager on any config changes. The log instance and any other 
> component (like the authorizer mentioned in KAFKA-1688) will have a reference 
> to TopicConfigCache using which they will access the topic configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1033) Metadata requests do not always return the complete list of available brokers

2015-04-23 Thread Evan Huus (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evan Huus resolved KAFKA-1033.
--
Resolution: Not A Problem

> Metadata requests do not always return the complete list of available brokers
> -
>
> Key: KAFKA-1033
> URL: https://issues.apache.org/jira/browse/KAFKA-1033
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.0
>Reporter: Evan Huus
>
> I discovered this while writing a Go client 
> (https://github.com/Shopify/sarama) and it is making one of the issues I'm 
> having rather difficult to solve 
> (https://github.com/Shopify/sarama/issues/15).
> In summary: sending a metadata request with an empty list of topics is 
> supposed to return a list of *all* metadata in the cluster. However, the list 
> of brokers is incomplete. I have not been able to pin down precisely which 
> brokers are missing, but I believe it happens when a broker is not currently 
> the leader for any partition of any topic.
> Among other things this can make it very difficult to provide failover in a 
> small cluster of only one master and one replica server - clients requesting 
> metadata sometimes are not told of the replica broker and cannot fail-over to 
> it when the master goes down.
> If it is intentional to only return a subset of brokers (whatever that subset 
> is), please document somewhere what that subset is, and how clients should 
> learn of brokers outside that subset.
> Thanks,
> Evan



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2145) An option to add topic owners.

2015-04-23 Thread Parth Brahmbhatt (JIRA)
Parth Brahmbhatt created KAFKA-2145:
---

 Summary: An option to add topic owners. 
 Key: KAFKA-2145
 URL: https://issues.apache.org/jira/browse/KAFKA-2145
 Project: Kafka
  Issue Type: Improvement
Reporter: Parth Brahmbhatt


We need to expose a way so users can identify users/groups that share ownership 
of topic. We discussed adding this as part of 
https://issues.apache.org/jira/browse/KAFKA-2035 and agreed that it will be 
simpler to add owner as a logconfig. 

The owner field can be used for auditing and also by authorization layer to 
grant access without having to explicitly configure acls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] New consumer offset commit API

2015-04-23 Thread Ewen Cheslack-Postava
Thanks, great feedback everyone.

Jiangjie -- I was worried about interleaving as well. For commits using the
consumer's own current set of offsets, I agree we could easily limit to 1
outstanding request so the older one gets cancelled. For commits that
specify offsets manually, we might not need to do anything special, just
note in the docs that bad things can happen if you submit two separate
offset commit requests (i.e. don't wait for callbacks) and have retries
enabled. Alternatively, we could just serialize them, i.e. always have at
most 1 outstanding and retries always take priority.

Guozhang -- That breakdown of use cases looks right to me. I agree that I
don't think users of this API would be trying to use the futures for
promise pipelining or anything, the only use is to provide allow them to
block on the operation. That said, I think there are some tradeoffs to
consider between the two options:

Pros of Future/Cons of only having callback:
* Gives control over timeouts. With only callbacks users have to manage
this themselves if they care about timing. I think there's at least one
very common case for this: graceful shutdown where I want to try to commit
offsets, but after some time shutdown whether successful or not.
* Consistency. This matches the Producer interface and Futures are a
standard pattern.

Pros of only callback/cons of Future
* Know up front if its sync/async. This might simplify the implementation,
as Guozhang points out. (However, it also means the patch needs to add a
timeout mechanism, which doesn't exist yet. That's probably not a huge
patch, but non-trivial. Maybe it needs to be added regardless.)

Regardless of the interface we settle on, I'd argue that we should get rid
of the infinite retry version, at least limiting it to a max # of retries,
each of which are bound by a timeout. It's really problematic having it run
indefinitely long since it locks up the consumer, which means you can't,
e.g., shut down properly. More generally, I think anytime we we have an API
where a TimeoutException is *not* a possibility, we're almost definitely
trying to hide the network from the user in a way that makes it difficult
for them to write applications that behave correctly.

On the muting implementation, I'm not sure I'm convinced it's *required* to
mute the others entirely. Couldn't Selector.poll have a mechanism for
prioritizing reads rather than completely muting the other nodes? For
example, what if poll() did one select for just the key we're currently
keeping unmuted with the timeout, then if there are no keys ready to read,
reregister interest and select(0) to only get the ones that are immediately
ready.

Jay -- the polling thing isn't an issue if you just poll from within the
Future. That's how the WIP patch works. My only concern with that is that
it's unintuitive because most future implementations are just waiting for
another thread to finish some operation; the only potentially bad side
affect I could think of is that user callbacks might then run in the thread
calling Future.get(), which might be unexpected if they think they are
doing all the work of polling in a different thread.

-Ewen


On Wed, Apr 22, 2015 at 12:29 PM, Bhavesh Mistry  wrote:

> Hi Ewen,
>
> Only time I can think of where Application needs to know result of offset
> was committed or not during graceful shutdown and/or
> Runtime.addShutdownHook() so consumer application does not get duplicated
> records upon restart or does not have to deal with eliminating  already
> process offset.  Only thing that consumer application will have to handle
> is after XX retry failure to commit offset.  Or would prefer application to
> manage this last offset commit when offset can not be commit due to
> failure, connection timeout or any other failure case ?
>
> Thanks,
> Bhavesh
>
>
>
> On Wed, Apr 22, 2015 at 11:20 AM, Jay Kreps  wrote:
>
> > I second Guozhang's proposal. I do think we need the callback. The
> current
> > state is that for async commits you actually don't know if it succeeded.
> > However there is a totally valid case where you do need to know if it
> > succeeded but don't need to block, and without the callback you are
> stuck.
> > I think the futures will likely cause problems since blocking on the
> future
> > precludes polling which would allow it to complete.
> >
> > -Jay
> >
> > On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang 
> > wrote:
> >
> > > Hi Ewen,
> > >
> > > I share the same concern you have about 2), that with the new API sync
> > > commit implementation is a bit awkward since we have a single-threaded
> > > design in new consumer. The reason that we need to mute other nodes for
> > > doing coordinator sync operations like join-group / offset commits /
> > offset
> > > fetches is to avoid long blocking due to possible "starvation" on
> network
> > > selector, so I think they need to be done still.
> > >
> > > On the other hand, I think users using the commit API will usually fall
> 

Re: Review Request 32650: Patch for KAFKA-2000

2015-04-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
---



core/src/main/scala/kafka/server/OffsetManager.scala


A safer fix is to proactively purge as part of UpdateMetadataRequest - 
i.e., removePartitionInfo in metadata cache.

Your fix is nice, but we need to make sure of the following: on a given 
offset manager (broker) the metadata cache must contain topic X before any 
consumer of topic X (and whose group is managed by that broker) commits offsets 
for topic X.

The original scenario I was concerned about should be fine:
- Suppose broker A (offset manager for G) starts up
- It receives UpdateMetadataRequests from the controller for all topics in 
the cluster
- It then receives LeaderAndIsrRequest for partitions of the offset topic 
which make it the offset manager.
- We should be fine _as long as_ the update metadata requests occur first. 
So if we go with your approach we should at the very least add a unit test to 
guarantee this.

There is another scenario. If topic X is a new topic (or has new 
partitions):
- Broker A is the offset manager for consumer group G
- Broker B leads a new partition of X
- Controller C sends become leader to B and update metadata to A (which 
will populate its metadata cache)
- B becomes the leader first
- A consumer starts consuming X and commits offsets to A (before it has 
received the update metadata request)
- Other consumers in the group may rebalance while all this is happening 
(since new partitions for the topic appeared) and may fetch offsets from A
- But A could have deleted the offset by then.
- This is improbable but not impossible.

Onur mentioned another corner case:
https://issues.apache.org/jira/browse/KAFKA-1787 

Both would be solved by having topic generations and incorporating 
generation information when determining which offsets to purge. I don't think 
we have a jira open for that but I will follow-up offline with Onur.

Do you see any other issues?

So I think the options are:
- Go with your approach + a unit test to ensure that the controller sends 
update metadata request first.
- Go with the more conservative fix which is to purge on 
metadataCache.removePartitionInfo

Also, we should add a unit test to verify offsets are in fact removed after 
deletion.


- Joel Koshy


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> ---
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
> https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: Review Request 32650: Patch for KAFKA-2000

2015-04-23 Thread Joel Koshy


> On April 23, 2015, 9:51 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/OffsetManager.scala, line 124
> > 
> >
> > A safer fix is to proactively purge as part of UpdateMetadataRequest - 
> > i.e., removePartitionInfo in metadata cache.
> > 
> > Your fix is nice, but we need to make sure of the following: on a given 
> > offset manager (broker) the metadata cache must contain topic X before any 
> > consumer of topic X (and whose group is managed by that broker) commits 
> > offsets for topic X.
> > 
> > The original scenario I was concerned about should be fine:
> > - Suppose broker A (offset manager for G) starts up
> > - It receives UpdateMetadataRequests from the controller for all topics 
> > in the cluster
> > - It then receives LeaderAndIsrRequest for partitions of the offset 
> > topic which make it the offset manager.
> > - We should be fine _as long as_ the update metadata requests occur 
> > first. So if we go with your approach we should at the very least add a 
> > unit test to guarantee this.
> > 
> > There is another scenario. If topic X is a new topic (or has new 
> > partitions):
> > - Broker A is the offset manager for consumer group G
> > - Broker B leads a new partition of X
> > - Controller C sends become leader to B and update metadata to A (which 
> > will populate its metadata cache)
> > - B becomes the leader first
> > - A consumer starts consuming X and commits offsets to A (before it has 
> > received the update metadata request)
> > - Other consumers in the group may rebalance while all this is 
> > happening (since new partitions for the topic appeared) and may fetch 
> > offsets from A
> > - But A could have deleted the offset by then.
> > - This is improbable but not impossible.
> > 
> > Onur mentioned another corner case:
> > https://issues.apache.org/jira/browse/KAFKA-1787 
> > 
> > Both would be solved by having topic generations and incorporating 
> > generation information when determining which offsets to purge. I don't 
> > think we have a jira open for that but I will follow-up offline with Onur.
> > 
> > Do you see any other issues?
> > 
> > So I think the options are:
> > - Go with your approach + a unit test to ensure that the controller 
> > sends update metadata request first.
> > - Go with the more conservative fix which is to purge on 
> > metadataCache.removePartitionInfo
> > 
> > Also, we should add a unit test to verify offsets are in fact removed 
> > after deletion.

Never mind - for the second scenario we are fine. We check in offset manager if 
the topic exists before committing offsets.

So your fix should be fine. Can you add a unit test?


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
---


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32650/
> ---
> 
> (Updated March 30, 2015, 9:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2000
> https://issues.apache.org/jira/browse/KAFKA-2000
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 395b1dbe43a5db47151e72a1b588d72f03cef963 
> 
> Diff: https://reviews.apache.org/r/32650/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Sriharsha Chintalapani
Hi Jay,
         Sorry about the KIP formatting . I fixed those in the KIP.

2. We certainly need to add both the serialized and unserialized form for 
the key as both are useful. 
I added those to the interface.

3. Do we need to add the value? I suspect people will have uses for 
computing something off a few fields in the value to choose the partition. 
This would be useful in cases where the key was being used for log 
compaction purposes and did not contain the full information for computing 
the partition. 
added it as well.

4. This interface doesn't include either an init() or close() method. It 
should implement Closable and Configurable, right? 
I am not quite sure about having init() or close() for partitioner. Are we 
looking at partitioner using some external resources to initialize and close. 
If thats the case than init() should also take in some config as param, this 
can add more complexity.



5. What happens if the user both sets the partition id in the 
ProducerRecord and sets a partitioner? Does the partition id just get 
passed in to the partitioner (as sort of implied in this interface?). This 
is a bit weird since if you pass in the partition id you kind of expect it 
to get used, right? Or is it the case that if you specify a partition the 
partitioner isn't used at all (in which case no point in including 
partition in the Partitioner api). 
In current Producer Record partition id is getting passed to Partitioner. If a 
custom partitioner is not going to use that than thats up to their 
implementation  right. Similarly in our interface we’ve Value as another param 
this may or may not be used. Essentially its up to the Partitioner to disclose 
on what available information they are going to partition against.


Thanks,
Harsha


On April 23, 2015 at 9:11:33 AM, Jay Kreps (jay.kr...@gmail.com) wrote:

Hey Harsha,  

A few comments:  

Can you finish up the KIP there are some unfinished sentences and odd  
whitespace things going on.  

Here are the questions I think we should consider:  
1. Do we need this at all given that we have the partition argument in  
ProducerRecord which gives full control? I think we do need it because this  
is a way to plug in a different partitioning strategy at run time and do it  
in a fairly transparent way.  
2. We certainly need to add both the serialized and unserialized form for  
the key as both are useful.  
3. Do we need to add the value? I suspect people will have uses for  
computing something off a few fields in the value to choose the partition.  
This would be useful in cases where the key was being used for log  
compaction purposes and did not contain the full information for computing  
the partition.  
4. This interface doesn't include either an init() or close() method. It  
should implement Closable and Configurable, right?  
5. What happens if the user both sets the partition id in the  
ProducerRecord and sets a partitioner? Does the partition id just get  
passed in to the partitioner (as sort of implied in this interface?). This  
is a bit weird since if you pass in the partition id you kind of expect it  
to get used, right? Or is it the case that if you specify a partition the  
partitioner isn't used at all (in which case no point in including  
partition in the Partitioner api).  

Cheers,  

-Jay  

On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani   
wrote:  

> Hi,  
> Here is the KIP for adding a partitioner interface for producer.  
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
>   
> There is one open question about how interface should look like. Please  
> take a look and let me know if you prefer one way or the other.  
>  
> Thanks,  
> Harsha  
>  
>  


Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Jay Kreps
Hey Sriharsha,

Great, thanks!

For 4:

Yeah the use case for init and close is making use of any kind of metadata.
An example of this would be if you are trying to do range partitioning you
need to map lexicographic ranges to numeric partitions. You might do this
by adding a new property to the config such as
   partitioner.metadata=a:0, b:1, ..., z:26
Or likewise you might have a partitioner built using Java's digest
interface and the config would be the digest algorithm name.

Or you might need to maintain this dynamically and have the partitioner
fetch this list on initialization from some central store (zk or whatever).

The init method we should use is the Configurable interface, that will
automatically pass in the configuration given to the producer at
instantiation time.

I agree that these additional methods add a bit of complexity and often
aren't needed, but on the flip side it is often hard to use the interface
without them when you do need them.

5. Yeah that's how the current partitioner works, but that is just because
it is a non-public interface. It's not clear to me if the partitioner
should override the partition or not. We could either say:
a. The partitioner is the default policy and the partition field is a way
to override that on a per-record basis for cases where you need that or
where it is simpler. If this is our description then the partitioner should
only take effect if partition==null
b. The partition the user specifies is just a suggestion and the
partitioner can interpret or override that in whatever way they want.

I think (a) may actually make more sense. The reason is because otherwise
the behavior of the partition field in ProducerRecord will be very hard to
depend on as the effect it has will be totally dependent on the partitioner
that is set. Any correct partitioner will basically have to implement the
case where the partition is set and I think the only sensible thing then is
to use it as the partition (right?).

Dunno, what do you think...?

-Jay

On Thu, Apr 23, 2015 at 2:59 PM, Sriharsha Chintalapani <
harsh...@fastmail.fm> wrote:

> Hi Jay,
>  Sorry about the KIP formatting . I fixed those in the KIP.
>
> 2. We certainly need to add both the serialized and unserialized form for
> the key as both are useful.
>
> I added those to the interface.
>
> 3. Do we need to add the value? I suspect people will have uses for
> computing something off a few fields in the value to choose the partition.
> This would be useful in cases where the key was being used for log
> compaction purposes and did not contain the full information for computing
> the partition.
>
> added it as well.
>
> 4. This interface doesn't include either an init() or close() method. It
> should implement Closable and Configurable, right?
>
> I am not quite sure about having init() or close() for partitioner. Are we
> looking at partitioner using some external resources to initialize and
> close. If thats the case than init() should also take in some config as
> param, this can add more complexity.
>
>
> 5. What happens if the user both sets the partition id in the
> ProducerRecord and sets a partitioner? Does the partition id just get
> passed in to the partitioner (as sort of implied in this interface?). This
> is a bit weird since if you pass in the partition id you kind of expect it
> to get used, right? Or is it the case that if you specify a partition the
> partitioner isn't used at all (in which case no point in including
> partition in the Partitioner api).
>
> In current Producer Record partition id is getting passed to Partitioner.
> If a custom partitioner is not going to use that than thats up to their
> implementation  right. Similarly in our interface we’ve Value as another
> param this may or may not be used. Essentially its up to the Partitioner to
> disclose on what available information they are going to partition against.
>
> Thanks,
> Harsha
>
>
> On April 23, 2015 at 9:11:33 AM, Jay Kreps (jay.kr...@gmail.com) wrote:
>
> Hey Harsha,
>
> A few comments:
>
> Can you finish up the KIP there are some unfinished sentences and odd
> whitespace things going on.
>
> Here are the questions I think we should consider:
> 1. Do we need this at all given that we have the partition argument in
> ProducerRecord which gives full control? I think we do need it because
> this
> is a way to plug in a different partitioning strategy at run time and do
> it
> in a fairly transparent way.
> 2. We certainly need to add both the serialized and unserialized form for
> the key as both are useful.
> 3. Do we need to add the value? I suspect people will have uses for
> computing something off a few fields in the value to choose the partition.
> This would be useful in cases where the key was being used for log
> compaction purposes and did not contain the full information for computing
> the partition.
> 4. This interface doesn't include either an init() or close() method. It
> should implement Closable a

[jira] [Commented] (KAFKA-2121) prevent potential resource leak in KafkaProducer and KafkaConsumer

2015-04-23 Thread Sean Lydon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510117#comment-14510117
 ] 

Sean Lydon commented on KAFKA-2121:
---

While this does resolve https://issues.apache.org/jira/browse/KAFKA-2126, there 
is a slight bug with the 'isKey' parameter which would be solved by the 
following patch.  This is so minor it seems silly to open a new 
branch+ticket+reviewboard just to change a boolean value.

{noformat}
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -495,7 +495,7 @@ public class KafkaConsumer implements Consumer {
 if (keyDeserializer == null) {
 this.keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 Deserializer.class);
-this.keyDeserializer.configure(config.originals(), false);
+this.keyDeserializer.configure(config.originals(), true);
 } else {
 this.keyDeserializer = keyDeserializer;
 }
{noformat}

> prevent potential resource leak in KafkaProducer and KafkaConsumer
> --
>
> Key: KAFKA-2121
> URL: https://issues.apache.org/jira/browse/KAFKA-2121
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
> Fix For: 0.8.3
>
> Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
> KAFKA-2121_2015-04-16_10:43:55.patch, KAFKA-2121_2015-04-18_20:09:20.patch, 
> KAFKA-2121_2015-04-19_20:08:45.patch, KAFKA-2121_2015-04-19_20:30:18.patch, 
> KAFKA-2121_2015-04-20_09:06:09.patch, KAFKA-2121_2015-04-20_09:51:51.patch, 
> KAFKA-2121_2015-04-20_09:52:46.patch, KAFKA-2121_2015-04-20_09:57:49.patch, 
> KAFKA-2121_2015-04-20_22:48:31.patch
>
>
> On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang  wrote:
> It is a valid problem and we should correct it as soon as possible, I'm
> with Ewen regarding the solution.
> On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava 
> wrote:
> > Steven,
> >
> > Looks like there is even more that could potentially be leaked -- since key
> > and value serializers are created and configured at the end, even the IO
> > thread allocated by the producer could leak. Given that, I think 1 isn't a
> > great option since, as you said, it doesn't really address the underlying
> > issue.
> >
> > 3 strikes me as bad from a user experience perspective. It's true we might
> > want to introduce additional constructors to make testing easier, but the
> > more components I need to allocate myself and inject into the producer's
> > constructor, the worse the default experience is. And since you would have
> > to inject the dependencies to get correct, non-leaking behavior, it will
> > always be more code than previously (and a backwards incompatible change).
> > Additionally, the code creating a the producer would have be more
> > complicated since it would have to deal with the cleanup carefully whereas
> > it previously just had to deal with the exception. Besides, for testing
> > specifically, you can avoid exposing more constructors just for testing by
> > using something like PowerMock that let you mock private methods. That
> > requires a bit of code reorganization, but doesn't affect the public
> > interface at all.
> >
> > So my take is that a variant of 2 is probably best. I'd probably do two
> > things. First, make close() safe to call even if some fields haven't been
> > initialized, which presumably just means checking for null fields. (You
> > might also want to figure out if all the methods close() calls are
> > idempotent and decide whether some fields should be marked non-final and
> > cleared to null when close() is called). Second, add the try/catch as you
> > suggested, but just use close().
> >
> > -Ewen
> >
> >
> > On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu  wrote:
> >
> > > Here is the resource leak problem that we have encountered when 0.8.2
> > java
> > > KafkaProducer failed in constructor. here is the code snippet of
> > > KafkaProducer to illustrate the problem.
> > >
> > > ---
> > > public KafkaProducer(ProducerConfig config, Serializer keySerializer,
> > > Serializer valueSerializer) {
> > >
> > > // create metrcis reporter via reflection
> > > List reporters =
> > >
> > >
> > config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
> > > MetricsReporter.class);
> > >
> > > // validate bootstrap servers
> > > List addresses =
> > >
> > >
> > ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
> > 

Remote kafka - Connection refused.

2015-04-23 Thread madhavan kumar
Dear all,
I am trying to connect my python consumer to a remote kafka server. But
in kafka/conn.py#reinit, the socket call socket.create_connection throws
"connection refused" error.
to confirm, it is not a firewall issue, i tried connecting to other servers
(in the same machine as kafka server) from my local m/c and the socket
connection works. connection refused happens, only for kafka server.
i am running kafka-0.8.2 and in which log file, can i see the stacktraces
(or) more useful information for errors related to connection requests?

thanks,
saravana


Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-04-23 Thread Sean Lydon
Currently the clients consumer (trunk) sends offset commit requests of
version 2.  The 0.8.2 brokers fail to handle this particular request
with a:

java.lang.AssertionError: assertion failed: Version 2 is invalid for
OffsetCommitRequest. Valid versions are 0 or 1.

I was able to make this work via a forceful downgrade of this
particular request, but I would like some feedback on whether a
"enable.commit.downgrade" configuration would be a tolerable method to
allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
interested in this even being a goal worth pursuing.

Thanks,
Sean


[jira] [Commented] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-23 Thread Sasaki Toru (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510155#comment-14510155
 ] 

Sasaki Toru commented on KAFKA-2128:


This is a easy patch, which add System.exit(1) when catch Throwable in 
kafka.Kafka.

> kafka.Kafka should return non-zero exit code when caught exception.
> ---
>
> Key: KAFKA-2128
> URL: https://issues.apache.org/jira/browse/KAFKA-2128
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Sasaki Toru
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2128-1.patch
>
>
> kafka.Kafka Object always return exit code zero.
> I think that it should return non-zero exit code when caught exception.
> (for example FileNotFoundException caught, since server.properies is not 
> exist)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-23 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510197#comment-14510197
 ] 

Gwen Shapira commented on KAFKA-2128:
-

+1

It will make life a lot less confusing to everyone with automatic deployment.

> kafka.Kafka should return non-zero exit code when caught exception.
> ---
>
> Key: KAFKA-2128
> URL: https://issues.apache.org/jira/browse/KAFKA-2128
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Sasaki Toru
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2128-1.patch
>
>
> kafka.Kafka Object always return exit code zero.
> I think that it should return non-zero exit code when caught exception.
> (for example FileNotFoundException caught, since server.properies is not 
> exist)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-23 Thread Aditya Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510209#comment-14510209
 ] 

Aditya Auradkar commented on KAFKA-2128:


+1

> kafka.Kafka should return non-zero exit code when caught exception.
> ---
>
> Key: KAFKA-2128
> URL: https://issues.apache.org/jira/browse/KAFKA-2128
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Sasaki Toru
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2128-1.patch
>
>
> kafka.Kafka Object always return exit code zero.
> I think that it should return non-zero exit code when caught exception.
> (for example FileNotFoundException caught, since server.properies is not 
> exist)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-04-23 Thread Gwen Shapira
I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
broker... there are some essential pieces that are missing in 0.8.2
(Coordinator, Heartbeat, etc).
Maybe I'm missing something. It will be nice if this will work :)

Mind sharing what / how you tested? Were there no errors in broker
logs after your fix?

On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon  wrote:
> Currently the clients consumer (trunk) sends offset commit requests of
> version 2.  The 0.8.2 brokers fail to handle this particular request
> with a:
>
> java.lang.AssertionError: assertion failed: Version 2 is invalid for
> OffsetCommitRequest. Valid versions are 0 or 1.
>
> I was able to make this work via a forceful downgrade of this
> particular request, but I would like some feedback on whether a
> "enable.commit.downgrade" configuration would be a tolerable method to
> allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
> interested in this even being a goal worth pursuing.
>
> Thanks,
> Sean


Leap second coming up!

2015-04-23 Thread Gwen Shapira
Hi,

Looks like we are getting a new leap second on June 30.

Is anyone planning on testing Kafka with leap seconds before it
happens in production?

Gwen


Re: Remote kafka - Connection refused.

2015-04-23 Thread Gwen Shapira
Perhaps this will help:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan'tmyconsumers/producersconnecttothebrokers?

On Thu, Apr 23, 2015 at 3:24 PM, madhavan kumar
 wrote:
> Dear all,
> I am trying to connect my python consumer to a remote kafka server. But
> in kafka/conn.py#reinit, the socket call socket.create_connection throws
> "connection refused" error.
> to confirm, it is not a firewall issue, i tried connecting to other servers
> (in the same machine as kafka server) from my local m/c and the socket
> connection works. connection refused happens, only for kafka server.
> i am running kafka-0.8.2 and in which log file, can i see the stacktraces
> (or) more useful information for errors related to connection requests?
>
> thanks,
> saravana


Re: Leap second coming up!

2015-04-23 Thread Ewen Cheslack-Postava
That sounds like a great candidate for a system test.


On Thu, Apr 23, 2015 at 7:48 PM, Gwen Shapira  wrote:

> Hi,
>
> Looks like we are getting a new leap second on June 30.
>
> Is anyone planning on testing Kafka with leap seconds before it
> happens in production?
>
> Gwen
>



-- 
Thanks,
Ewen


Re: Leap second coming up!

2015-04-23 Thread Gwen Shapira
Any ideas on how to simulate leap seconds on test environments?

On Thu, Apr 23, 2015 at 7:51 PM, Ewen Cheslack-Postava
 wrote:
> That sounds like a great candidate for a system test.
>
>
> On Thu, Apr 23, 2015 at 7:48 PM, Gwen Shapira  wrote:
>
>> Hi,
>>
>> Looks like we are getting a new leap second on June 30.
>>
>> Is anyone planning on testing Kafka with leap seconds before it
>> happens in production?
>>
>> Gwen
>>
>
>
>
> --
> Thanks,
> Ewen


[jira] [Commented] (KAFKA-2145) An option to add topic owners.

2015-04-23 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14510409#comment-14510409
 ] 

Neelesh Srinivas Salian commented on KAFKA-2145:


I'd like to work on this, please assign the JIRA to me.

Thank you.

> An option to add topic owners. 
> ---
>
> Key: KAFKA-2145
> URL: https://issues.apache.org/jira/browse/KAFKA-2145
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Parth Brahmbhatt
>
> We need to expose a way so users can identify users/groups that share 
> ownership of topic. We discussed adding this as part of 
> https://issues.apache.org/jira/browse/KAFKA-2035 and agreed that it will be 
> simpler to add owner as a logconfig. 
> The owner field can be used for auditing and also by authorization layer to 
> grant access without having to explicitly configure acls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Leap second coming up!

2015-04-23 Thread Ewen Cheslack-Postava
Not sure of the best solution, but a couple of ideas:

* A lot of code is already setup to use the Time interface. You could use
an modification of SystemTime that allows you to start at a specific
timestamp. There aren't as many System.currentTimeMillis as I thought there
would be so getting them all to use the Time interface might not be so bad.
The more annoying part is all the places we have a default parameter of
SystemTime and would want to make sure a non-default Time object is
actually passed in.
* Use ptrace to intercept and modify the relevant syscalls. You can
definitely do this to modify syscall input, I'd assume it's easy to hook in
right after the syscall and modify the output as well.
* If you can, just modify system time :) This isn't unreasonable if you're
willing to test in VMs.

I think either of the first 2 would be better because if you really want to
test this then you probably want to use > 1s jumps to make it more likely
to trigger any issues. Additionally, since both would allow you to make
arbitrary adjustments, you wouldn't even need to adjust the time to when a
leap second was supposed to occur, you could just simulate one by
adding/removing the second anytime you wanted.

If anyone else is curious, I had to go check the effect of leap seconds on
Unix time: http://en.wikipedia.org/wiki/Unix_time#Leap_seconds The upcoming
leap second is an insertion. The first second of the day effectively occurs
twice, so there's a 1 second jump backwards.

Could similar issues arise with ntp adjustments? Arguably many of the
places we use currentTimeMillis we should be using nanoTime instead since
we only need elapsed time, not an absolute timestamp. I believe that's
backed by clock_gettime's monotonic timer on Linux, which might avoid some
of these issues, but unfortunately that's not specified, so we probably
couldn't rely on it.

-Ewen


On Thu, Apr 23, 2015 at 8:06 PM, Gwen Shapira  wrote:

> Any ideas on how to simulate leap seconds on test environments?
>
> On Thu, Apr 23, 2015 at 7:51 PM, Ewen Cheslack-Postava
>  wrote:
> > That sounds like a great candidate for a system test.
> >
> >
> > On Thu, Apr 23, 2015 at 7:48 PM, Gwen Shapira 
> wrote:
> >
> >> Hi,
> >>
> >> Looks like we are getting a new leap second on June 30.
> >>
> >> Is anyone planning on testing Kafka with leap seconds before it
> >> happens in production?
> >>
> >> Gwen
> >>
> >
> >
> >
> > --
> > Thanks,
> > Ewen
>



-- 
Thanks,
Ewen


Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-04-23 Thread Neha Narkhede
My understanding is that ideally the 0.8.3 consumer should work with an
0.8.2 broker if the offset commit config was set to "zookeeper".

The only thing that might not work is offset commit to Kafka, which makes
sense since the 0.8.2 broker does not support Kafka based offset
management.

If we broke all kinds of offset commits, then it seems like a regression,
no?

On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira  wrote:

> I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
> broker... there are some essential pieces that are missing in 0.8.2
> (Coordinator, Heartbeat, etc).
> Maybe I'm missing something. It will be nice if this will work :)
>
> Mind sharing what / how you tested? Were there no errors in broker
> logs after your fix?
>
> On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon  wrote:
> > Currently the clients consumer (trunk) sends offset commit requests of
> > version 2.  The 0.8.2 brokers fail to handle this particular request
> > with a:
> >
> > java.lang.AssertionError: assertion failed: Version 2 is invalid for
> > OffsetCommitRequest. Valid versions are 0 or 1.
> >
> > I was able to make this work via a forceful downgrade of this
> > particular request, but I would like some feedback on whether a
> > "enable.commit.downgrade" configuration would be a tolerable method to
> > allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
> > interested in this even being a goal worth pursuing.
> >
> > Thanks,
> > Sean
>



-- 
Thanks,
Neha


Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-04-23 Thread Ewen Cheslack-Postava
@Neha I think you're mixing up the 0.8.1/0.8.2 updates and the 0.8.2/0.8.3
that's being discussed here?

I think the original question was about using the *new* consumer ("clients
consumer") with 0.8.2. Gwen's right, it will use features not even
implemented in the broker in trunk yet, let alone the 0.8.2.

I don't think the "enable.commit.downgrade" type option, or supporting the
old protocol with the new consumer at all, makes much sense. You'd end up
with some weird hybrid of simple and high-level consumers -- you could use
offset storage, but you'd have to manage rebalancing yourself since none of
the coordinator support would be there.


On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede  wrote:

> My understanding is that ideally the 0.8.3 consumer should work with an
> 0.8.2 broker if the offset commit config was set to "zookeeper".
>
> The only thing that might not work is offset commit to Kafka, which makes
> sense since the 0.8.2 broker does not support Kafka based offset
> management.
>
> If we broke all kinds of offset commits, then it seems like a regression,
> no?
>
> On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira 
> wrote:
>
> > I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
> > broker... there are some essential pieces that are missing in 0.8.2
> > (Coordinator, Heartbeat, etc).
> > Maybe I'm missing something. It will be nice if this will work :)
> >
> > Mind sharing what / how you tested? Were there no errors in broker
> > logs after your fix?
> >
> > On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon 
> wrote:
> > > Currently the clients consumer (trunk) sends offset commit requests of
> > > version 2.  The 0.8.2 brokers fail to handle this particular request
> > > with a:
> > >
> > > java.lang.AssertionError: assertion failed: Version 2 is invalid for
> > > OffsetCommitRequest. Valid versions are 0 or 1.
> > >
> > > I was able to make this work via a forceful downgrade of this
> > > particular request, but I would like some feedback on whether a
> > > "enable.commit.downgrade" configuration would be a tolerable method to
> > > allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
> > > interested in this even being a goal worth pursuing.
> > >
> > > Thanks,
> > > Sean
> >
>
>
>
> --
> Thanks,
> Neha
>



-- 
Thanks,
Ewen