Re: KIP-28 does not allow Processor to specify partition of output message

2015-10-14 Thread Randall Hauch
Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a PR with 
the proposed change.

Thanks!


On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) wrote:

Thanks!

On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch  wrote:
Ok, cool. I agree we want something simple.  I'll create an issue and create a 
pull request with a proposal. Look for it tomorrow. 

On Oct 13, 2015, at 10:25 PM, Guozhang Wang  wrote:

I see your point. Yeah I think it is a good way to add a Partitioner into 
addSink(...) but the Partitioner interface in producer is a bit overkill:

"partition(String topic, Object key, byte[] keyBytes, Object value, byte[] 
valueBytes, Cluster cluster)"

whereas for us we only want to partition on (K key, V value).

Perhaps we should add a new Partitioner interface in Kafka Streams?

Guozhang

On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch  wrote:
This overrides the partitioning logic for all topics, right? That means I have 
to explicitly call the default partitioning logic for all topics except those 
that my Producer forwards. I’m guess the best way to do by extending 
org.apache.kafka.clients.producer.DefaultProducer. Of course, with multiple 
sinks in my topology, I have to put all of the partitioning logic inside a 
single class.

What would you think about adding an overloaded TopologyBuilder.addSink(…) 
method that takes a Partitioner (or better yet a smaller functional interface). 
The resulting SinkProcessor could use that Partitioner instance to set the 
partition number? That’d be super convenient for users, would keep the logic 
where it belongs (where the topology defines the sinks), and best of all the 
implementations won't have to worry about any other topics, such as those used 
by stores, metrics, or other sinks.

Best regards,

Randall


On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) wrote:

Hi Randall,

You can try to set the partitioner class as
ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface
can be found in

org.apache.kafka.clients.producer.Partitioner

Let me know if it works for you.

Guozhang

On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch  wrote:

> The new streams API added with KIP-28 is great. I’ve been using it on a
> prototype for a few weeks, and I’m looking forward to it being included in
> 0.9.0. However, at the moment, a Processor implementation is not able to
> specify the partition number when it outputs messages.
>
> I’d be happy to log a JIRA and create a PR to add it to the API, but
> without knowing all of the history I’m wondering if leaving it out of the
> API was intentional.
>
> Thoughts?
>
> Best regards,
>
> Randall Hauch
>



--
-- Guozhang



--
-- Guozhang



--
-- Guozhang


[GitHub] kafka pull request: KAFKA-2650: Change ConfigCommand --deleted-con...

2015-10-14 Thread granthenke
GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/308

KAFKA-2650: Change ConfigCommand --deleted-config option to align wit…

…h TopicCommand

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka configcommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #308


commit b8fdd0138b8281d132fe63b6ff649857512ba8f1
Author: Grant Henke 
Date:   2015-10-14T15:19:06Z

KAFKA-2650: Change ConfigCommand --deleted-config option to align with 
TopicCommand




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-2649) Add support for custom partitioner in sink nodes

2015-10-14 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-2649:
-
Status: Patch Available  (was: Open)

> Add support for custom partitioner in sink nodes
> 
>
> Key: KAFKA-2649
> URL: https://issues.apache.org/jira/browse/KAFKA-2649
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kafka streams
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.9.0.0
>
>
> The only way for Processor implementations to control partitioning of 
> forwarded messages is to set the partitioner class as property 
> {{ProducerConfig.PARTITIONER_CLASS_CONFIG}} in the StreamsConfig, which 
> should be set to the name of a 
> {{org.apache.kafka.clients.producer.Partitioner}} implementation. However, 
> doing this requires the partitioner knowing how to properly partition *all* 
> topics, not just the one or few topics used by the Processor.
> Instead, Kafka Streams should make it easy to optionally add a partitioning 
> function for each sink used in a topology. Each sink represents a single 
> output topic, and thus is far simpler to implement. Additionally, the sink is 
> already typed with the key and value types (via serdes for the keys and 
> values), so the partitioner can be also be typed with the key and value 
> types. Finally, this also keeps the logic of choosing partitioning strategies 
> where it belongs, as part of building the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2651) Remove deprecated config alteration from TopicCommand in 0.9.1.0

2015-10-14 Thread Grant Henke (JIRA)
Grant Henke created KAFKA-2651:
--

 Summary: Remove deprecated config alteration from TopicCommand in 
0.9.1.0
 Key: KAFKA-2651
 URL: https://issues.apache.org/jira/browse/KAFKA-2651
 Project: Kafka
  Issue Type: Task
Affects Versions: 0.9.0.0
Reporter: Grant Henke






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2650) Change ConfigCommand --deleted-config option to align with TopicCommand

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957081#comment-14957081
 ] 

ASF GitHub Bot commented on KAFKA-2650:
---

GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/308

KAFKA-2650: Change ConfigCommand --deleted-config option to align wit…

…h TopicCommand

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka configcommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #308


commit b8fdd0138b8281d132fe63b6ff649857512ba8f1
Author: Grant Henke 
Date:   2015-10-14T15:19:06Z

KAFKA-2650: Change ConfigCommand --deleted-config option to align with 
TopicCommand




> Change ConfigCommand --deleted-config option to align with TopicCommand
> ---
>
> Key: KAFKA-2650
> URL: https://issues.apache.org/jira/browse/KAFKA-2650
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.9.0.0
>
>
> In order to avoid confusion, change ConfigCommand's --deleted-config 
> parameter to --delete-config. 
> At the same time change --added-config to --add-config, to make all options 
> present/future tense instead of past tense. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2650) Change ConfigCommand --deleted-config option to align with TopicCommand

2015-10-14 Thread Grant Henke (JIRA)
Grant Henke created KAFKA-2650:
--

 Summary: Change ConfigCommand --deleted-config option to align 
with TopicCommand
 Key: KAFKA-2650
 URL: https://issues.apache.org/jira/browse/KAFKA-2650
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.0
Reporter: Grant Henke
Assignee: Grant Henke
 Fix For: 0.9.0.0


In order to avoid confusion, change ConfigCommand's --deleted-config parameter 
to --delete-config. 

At the same time change --added-config to --add-config, to make all options 
present/future tense instead of past tense. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2650) Change ConfigCommand --deleted-config option to align with TopicCommand

2015-10-14 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2650:
---
Status: Patch Available  (was: Open)

> Change ConfigCommand --deleted-config option to align with TopicCommand
> ---
>
> Key: KAFKA-2650
> URL: https://issues.apache.org/jira/browse/KAFKA-2650
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.9.0.0
>
>
> In order to avoid confusion, change ConfigCommand's --deleted-config 
> parameter to --delete-config. 
> At the same time change --added-config to --add-config, to make all options 
> present/future tense instead of past tense. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2649) Add support for custom partitioner in sink nodes

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957113#comment-14957113
 ] 

ASF GitHub Bot commented on KAFKA-2649:
---

GitHub user rhauch opened a pull request:

https://github.com/apache/kafka/pull/309

KAFKA-2649 Add support for custom partitioning in topology sinks

Added option to use custom partitioning logic within each topology sink.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rhauch/kafka kafka-2649

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/309.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #309


commit 4a5a2adf6667bcaeccc7b49d63923b1b22e19d16
Author: Randall Hauch 
Date:   2015-10-14T15:19:43Z

KAFKA-2649 Add support for custom partitioning in topology sinks

Added option to use custom partitioning logic within each topology sink.




> Add support for custom partitioner in sink nodes
> 
>
> Key: KAFKA-2649
> URL: https://issues.apache.org/jira/browse/KAFKA-2649
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kafka streams
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.9.0.0
>
>
> The only way for Processor implementations to control partitioning of 
> forwarded messages is to set the partitioner class as property 
> {{ProducerConfig.PARTITIONER_CLASS_CONFIG}} in the StreamsConfig, which 
> should be set to the name of a 
> {{org.apache.kafka.clients.producer.Partitioner}} implementation. However, 
> doing this requires the partitioner knowing how to properly partition *all* 
> topics, not just the one or few topics used by the Processor.
> Instead, Kafka Streams should make it easy to optionally add a partitioning 
> function for each sink used in a topology. Each sink represents a single 
> output topic, and thus is far simpler to implement. Additionally, the sink is 
> already typed with the key and value types (via serdes for the keys and 
> values), so the partitioner can be also be typed with the key and value 
> types. Finally, this also keeps the logic of choosing partitioning strategies 
> where it belongs, as part of building the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2649 Add support for custom partitioning...

2015-10-14 Thread rhauch
GitHub user rhauch opened a pull request:

https://github.com/apache/kafka/pull/309

KAFKA-2649 Add support for custom partitioning in topology sinks

Added option to use custom partitioning logic within each topology sink.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rhauch/kafka kafka-2649

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/309.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #309


commit 4a5a2adf6667bcaeccc7b49d63923b1b22e19d16
Author: Randall Hauch 
Date:   2015-10-14T15:19:43Z

KAFKA-2649 Add support for custom partitioning in topology sinks

Added option to use custom partitioning logic within each topology sink.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-14 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957268#comment-14957268
 ] 

Jason Gustafson commented on KAFKA-2017:


I think persistence in Kafka is a promising idea. Reusing the consumer offsets 
topic would be nice if possible. The current schema for the key in the consumer 
offsets topic looks like this: . If we reused 
this topic to store group data, would we use a new key version, or would we use 
a truncated form of the existing version?

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2516) Rename o.a.k.client.tools to o.a.k.tools

2015-10-14 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2516:
---
Status: Patch Available  (was: Open)

> Rename o.a.k.client.tools to o.a.k.tools
> 
>
> Key: KAFKA-2516
> URL: https://issues.apache.org/jira/browse/KAFKA-2516
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Grant Henke
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Currently our new performance tools are in o.a.k.client.tools but packaged in 
> kafka-tools not kafka-clients. This is a bit confusing.
> Since they deserve their own jar (you don't want our client tools packaged in 
> your app), lets give them a separate package and call it o.a.k.tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2516) Rename o.a.k.client.tools to o.a.k.tools

2015-10-14 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke reassigned KAFKA-2516:
--

Assignee: Grant Henke

> Rename o.a.k.client.tools to o.a.k.tools
> 
>
> Key: KAFKA-2516
> URL: https://issues.apache.org/jira/browse/KAFKA-2516
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Grant Henke
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Currently our new performance tools are in o.a.k.client.tools but packaged in 
> kafka-tools not kafka-clients. This is a bit confusing.
> Since they deserve their own jar (you don't want our client tools packaged in 
> your app), lets give them a separate package and call it o.a.k.tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-14 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957348#comment-14957348
 ] 

Joel Koshy commented on KAFKA-2017:
---

If we reused this topic, then I think we could end up doing something like this:

* Bump up the key format version and add a new field: key-type (which could be 
either _offset_ or _consumer group data_)
* The actual key-bytes would be either the group-topic-partition schema (for 
keys with type _offset_) and just group (for keys with type _consumer group 
data_)
* If the key is of type _offset_ then the value will be the same as what we 
currently have for offsets
* If the key is of type _consumer group data_ then the value will be a new 
schema (for the consumer group data). Now we may end up needing to support 
different value schemas for different group management use-cases.
Overall, it can get a bit ugly so I'm wondering if the above indicates that it 
is cleaner to just use a separate topic for group state and refactor the offset 
manager to become a simple state store that can be easily repurposed for these 
use-cases (offset state storage and group state storage).


> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KIP-28 does not allow Processor to specify partition of output message

2015-10-14 Thread Yasuhiro Matsuda
A partitioning scheme should be a cluster wide thing. Letting each sink
have a different partitioning scheme does not make sense to me. A
partitioning scheme is not specific to a stream job, each task or a sink. I
think specifying it at sink level is more error prone.

If a user wants to customize a partitioning scheme, he/she also want to
manage it at some central place, maybe a code repo, or a jar file. All
application must use the same logic, otherwise data will be messed up.
Thus, a single class representing all partitioning logic is not a bad thing
at all. (The code organization wise, all logic does not necessarily in the
single class, of course.)


On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch  wrote:

> Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a
> PR with the proposed change.
>
> Thanks!
>
>
> On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)
> wrote:
>
> Thanks!
>
> On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch  wrote:
> Ok, cool. I agree we want something simple.  I'll create an issue and
> create a pull request with a proposal. Look for it tomorrow.
>
> On Oct 13, 2015, at 10:25 PM, Guozhang Wang  wrote:
>
> I see your point. Yeah I think it is a good way to add a Partitioner into
> addSink(...) but the Partitioner interface in producer is a bit overkill:
>
> "partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
> valueBytes, Cluster cluster)"
>
> whereas for us we only want to partition on (K key, V value).
>
> Perhaps we should add a new Partitioner interface in Kafka Streams?
>
> Guozhang
>
> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch  wrote:
> This overrides the partitioning logic for all topics, right? That means I
> have to explicitly call the default partitioning logic for all topics
> except those that my Producer forwards. I’m guess the best way to do by
> extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
> with multiple sinks in my topology, I have to put all of the partitioning
> logic inside a single class.
>
> What would you think about adding an overloaded TopologyBuilder.addSink(…)
> method that takes a Partitioner (or better yet a smaller functional
> interface). The resulting SinkProcessor could use that Partitioner instance
> to set the partition number? That’d be super convenient for users, would
> keep the logic where it belongs (where the topology defines the sinks), and
> best of all the implementations won't have to worry about any other topics,
> such as those used by stores, metrics, or other sinks.
>
> Best regards,
>
> Randall
>
>
> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)
> wrote:
>
> Hi Randall,
>
> You can try to set the partitioner class as
> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface
> can be found in
>
> org.apache.kafka.clients.producer.Partitioner
>
> Let me know if it works for you.
>
> Guozhang
>
> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch  wrote:
>
> > The new streams API added with KIP-28 is great. I’ve been using it on a
> > prototype for a few weeks, and I’m looking forward to it being included
> in
> > 0.9.0. However, at the moment, a Processor implementation is not able to
> > specify the partition number when it outputs messages.
> >
> > I’d be happy to log a JIRA and create a PR to add it to the API, but
> > without knowing all of the history I’m wondering if leaving it out of the
> > API was intentional.
> >
> > Thoughts?
> >
> > Best regards,
> >
> > Randall Hauch
> >
>
>
>
> --
> -- Guozhang
>
>
>
> --
> -- Guozhang
>
>
>
> --
> -- Guozhang
>


[jira] [Commented] (KAFKA-1436) Idempotent Producer / Duplicate Detection

2015-10-14 Thread Manish (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957497#comment-14957497
 ] 

Manish commented on KAFKA-1436:
---

Hi Neha,

Is there an ETA for getting this issue fixed?

Thanks,
Manish

> Idempotent Producer / Duplicate Detection
> -
>
> Key: KAFKA-1436
> URL: https://issues.apache.org/jira/browse/KAFKA-1436
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, producer 
>Affects Versions: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2.0, 0.10.0.0
>Reporter: James Thornton
>Assignee: Neha Narkhede
>
> Dealing with duplicate messages is one of the major issues for teams using 
> Kafka, and Jay Kreps posted a page on implementing an Idempotent Producer to 
> address this issue:
> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
> MapDB 1.0 (https://github.com/jankotek/MapDB) was just released, and either 
> it or Java Chronicle (https://github.com/OpenHFT/Java-Chronicle/) could be 
> embedded within each broker to provide a high-performance, random-access, 
> off-heap store for request IDs.
> As Jay points out in his post, global unique request IDs probably aren't 
> needed, but if that need should arise, Twitter's Snowflake service 
> (https://github.com/twitter/snowflake/) might be useful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-14 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957204#comment-14957204
 ] 

Joel Koshy commented on KAFKA-2017:
---

I agree with those benefits. If we go that route then I would prefer compaction 
over other retention policies. You do lose some history, but _typically_ you 
would have the last couple of state entries available if you overpartition and 
time-based or size-based retention would anyway keep only a certain amount of 
history. I think you can merge this in with the offsets topic (and rename it to 
something like __consumer_state or something like that). If we merge then we 
will end up having a heterogeneous topic with different keys 
(group-topic-partition for offsets and group for state) but that should be fine.

WRT implementation complexity that was referenced above: I agree it is more 
complicated to implement than ZK storage if we implement it from scratch but I 
don't think we need to right? i.e., all of the fault-tolerance and caching 
logic is already there in offset manager. So on coordinator failover the new 
coordinator just reads from the state partitions that it now leads and loads 
into memory (as we already do for offsets). Is there any other implementation 
complexity that I'm missing?

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2516) Rename o.a.k.client.tools to o.a.k.tools

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957253#comment-14957253
 ] 

ASF GitHub Bot commented on KAFKA-2516:
---

GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/310

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka tools-packaging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #310


commit f1cf0a01fc4ea46a03bc0cbb37cdf763a91825e5
Author: Grant Henke 
Date:   2015-10-14T16:51:08Z

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools




> Rename o.a.k.client.tools to o.a.k.tools
> 
>
> Key: KAFKA-2516
> URL: https://issues.apache.org/jira/browse/KAFKA-2516
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Currently our new performance tools are in o.a.k.client.tools but packaged in 
> kafka-tools not kafka-clients. This is a bit confusing.
> Since they deserve their own jar (you don't want our client tools packaged in 
> your app), lets give them a separate package and call it o.a.k.tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-37 - Add namespaces in Kafka

2015-10-14 Thread Jason Gustafson
Hey Ashish, thanks for the write-up. I think having a namespace capability
is a useful feature for Kafka, in particular with the addition of the
authorization layer. I probably prefer Jay's hierarchical approach if we're
going to embed the namespace in the topic name since it seems more general.
That said, one advantage of having a namespace independent of the topic
name is that it simplifies replication between namespaces a bit since you
don't have to parse and rewrite topic names. Assuming that hierarchical
topics will happen eventually anyway, I imagine a common pattern would be
to preserve the same directory structure in multiple namespaces, so having
an easy mechanism for applications to switch between them would be nice.
The namespace is kind of analogous to a chroot in this case. Of course you
can achieve the same thing by having a configurable topic prefix, just you
have to do all the topic rewriting, which I'm guessing will be a little
annoying to implement in all of the clients and tools. However, the
tradeoff (as you mention in the KIP) is that all request schemas have to be
updated, which is also annoying.

-Jason

On Wed, Oct 14, 2015 at 12:03 AM, Ashish Singh  wrote:

> On Mon, Oct 12, 2015 at 7:37 PM, Gwen Shapira  wrote:
>
> > This works really nicely from the consumer side, but what about the
> > producer? If there are no more topics,do we allow producing to a
> directory
> > and have the Partitioner hash-partition messages between all partitions
> in
> > the multiple levels in a directory?
> >
> Good point.
>
> I am personally in favor of maintaining current behavior for producer,
> i.e., letting users to only produce to a topic. This is different for
> consumers, the suggested behavior is inline with current behavior. One can
> use regex subscription to achieve the same even today.
>
> >
> > Also, I think we want to preserve the consumer terminology of "subscribe"
> > to topics / directories, but "assign" partitions - since the consumer
> > behavior is different in those cases.
> >
> > On Mon, Oct 12, 2015 at 7:16 PM, Jay Kreps  wrote:
> >
> > > Okay this is similar to what I think we have talked about before. Let
> me
> > > elaborate on the idea that I think has been floating around--it's
> pretty
> > > similar with a few differences.
> > >
> > > I think what you are calling the "default namespace" is basically what
> I
> > > would call the "current working directory" with paths not beginning
> with
> > > '/' being interpreted relative to this directory as in the fs.
> > >
> > > One thing you have to work out is what levels in this hierarchy you can
> > > actually subscribe to. I think you are assuming only what we currently
> > > consider a "topic", i.e. the first level of directories but not the
> > > partitions or parent dirs, would be subscribable. If you think about
> it,
> > > though, that constraint is a bit arbitrary.
> > >
> > > I'd propose instead the semantics that:
> > > - Subscribing to /a/b/c/0 means subscribing to the 0th partition of
> topic
> > > "c" in directory /a/b
> > > - Subscribing to /a/b/c means subscribing to all partitions in
> > > topic/directory "c"
> > > - Subscribing to /a/b means subscribing to all partitions in all
> > > topics/subdirectories under a/b recursively
> > >
> > > Effectively the concept of topics goes away entirely--you just have
> > > partitions/logs and directories. In this respect rather than adding new
> > > concepts this new feature would actually just generalizes what we have
> > > (which I think is a good thing).
> > >
> > > -Jay
> > >
> > > On Mon, Oct 12, 2015 at 6:24 PM, Ashish Singh 
> > wrote:
> > >
> > > > On Mon, Oct 12, 2015 at 5:42 PM, Jay Kreps  wrote:
> > > >
> > > > > Great. I definitely would strongly favor carrying over user's
> > intuition
> > > > > from FS unless we think we need a very different model. The minor
> > > details
> > > > > like the seperator and namespace term will help with that.
> > > > >
> > > > > Follow-up question, say I have a layout like
> > > > >/chicago-datacenter/user-events/pageviews
> > > > > Can I subscribe to
> > > > >/chicago-datacenter/user-events
> > > > >
> > > > Yes, however they will have need a regex like
> > > > /chicago-datacenter/user-events/*
> > > >
> > > > > to get the full firehose of user events from chicago? Can I
> subscribe
> > > to
> > > > >/*/user-events
> > > > > to get user events originating from all datacenters?
> > > > >
> > > > Yes, however they will have need a regex like
> > > > /chicago-datacenter/user-events/*
> > > > Yes
> > > >
> > > > >
> > > > > (Assuming, for now, that these are all in the same cluster...)
> > > > >
> > > > > Also, just to confirm, it sounds from the proposal like config
> > > overrides
> > > > > would become fully hierarchical so you can override config at any
> > > > directory
> > > > > point. This will add complexity in 

[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957330#comment-14957330
 ] 

Guozhang Wang commented on KAFKA-2017:
--

I am not sure if we want to piggy-back the membership information into the 
offsets topic if we go this route, since in practice offset commits are much 
more frequent than membership changes, hence piggybacking will result in lots 
of duplicate membership data. 

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2516: Rename o.a.k.client.tools to o.a.k...

2015-10-14 Thread granthenke
GitHub user granthenke opened a pull request:

https://github.com/apache/kafka/pull/310

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka tools-packaging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #310


commit f1cf0a01fc4ea46a03bc0cbb37cdf763a91825e5
Author: Grant Henke 
Date:   2015-10-14T16:51:08Z

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-37 - Add namespaces in Kafka

2015-10-14 Thread Ashish Singh
On Mon, Oct 12, 2015 at 7:37 PM, Gwen Shapira  wrote:

> This works really nicely from the consumer side, but what about the
> producer? If there are no more topics,do we allow producing to a directory
> and have the Partitioner hash-partition messages between all partitions in
> the multiple levels in a directory?
>
Good point.

I am personally in favor of maintaining current behavior for producer,
i.e., letting users to only produce to a topic. This is different for
consumers, the suggested behavior is inline with current behavior. One can
use regex subscription to achieve the same even today.

>
> Also, I think we want to preserve the consumer terminology of "subscribe"
> to topics / directories, but "assign" partitions - since the consumer
> behavior is different in those cases.
>
> On Mon, Oct 12, 2015 at 7:16 PM, Jay Kreps  wrote:
>
> > Okay this is similar to what I think we have talked about before. Let me
> > elaborate on the idea that I think has been floating around--it's pretty
> > similar with a few differences.
> >
> > I think what you are calling the "default namespace" is basically what I
> > would call the "current working directory" with paths not beginning with
> > '/' being interpreted relative to this directory as in the fs.
> >
> > One thing you have to work out is what levels in this hierarchy you can
> > actually subscribe to. I think you are assuming only what we currently
> > consider a "topic", i.e. the first level of directories but not the
> > partitions or parent dirs, would be subscribable. If you think about it,
> > though, that constraint is a bit arbitrary.
> >
> > I'd propose instead the semantics that:
> > - Subscribing to /a/b/c/0 means subscribing to the 0th partition of topic
> > "c" in directory /a/b
> > - Subscribing to /a/b/c means subscribing to all partitions in
> > topic/directory "c"
> > - Subscribing to /a/b means subscribing to all partitions in all
> > topics/subdirectories under a/b recursively
> >
> > Effectively the concept of topics goes away entirely--you just have
> > partitions/logs and directories. In this respect rather than adding new
> > concepts this new feature would actually just generalizes what we have
> > (which I think is a good thing).
> >
> > -Jay
> >
> > On Mon, Oct 12, 2015 at 6:24 PM, Ashish Singh 
> wrote:
> >
> > > On Mon, Oct 12, 2015 at 5:42 PM, Jay Kreps  wrote:
> > >
> > > > Great. I definitely would strongly favor carrying over user's
> intuition
> > > > from FS unless we think we need a very different model. The minor
> > details
> > > > like the seperator and namespace term will help with that.
> > > >
> > > > Follow-up question, say I have a layout like
> > > >/chicago-datacenter/user-events/pageviews
> > > > Can I subscribe to
> > > >/chicago-datacenter/user-events
> > > >
> > > Yes, however they will have need a regex like
> > > /chicago-datacenter/user-events/*
> > >
> > > > to get the full firehose of user events from chicago? Can I subscribe
> > to
> > > >/*/user-events
> > > > to get user events originating from all datacenters?
> > > >
> > > Yes, however they will have need a regex like
> > > /chicago-datacenter/user-events/*
> > > Yes
> > >
> > > >
> > > > (Assuming, for now, that these are all in the same cluster...)
> > > >
> > > > Also, just to confirm, it sounds from the proposal like config
> > overrides
> > > > would become fully hierarchical so you can override config at any
> > > directory
> > > > point. This will add complexity in implementation but I think will
> > likely
> > > > be much more operator friendly.
> > > >
> > > Yes, that is the idea.
> > >
> > > >
> > > > There are about a thousand details to discuss in terms of how this
> > would
> > > > impact the metadata request, various zk entries, and various other
> > > aspects,
> > > > but probably it makes sense to first agree on how we would want it to
> > > work
> > > > and then start to dive into how to implement that.
> > > >
> > > Agreed.
> > >
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Oct 12, 2015 at 5:28 PM, Ashish Singh 
> > > wrote:
> > > >
> > > > > Hey Jay, thanks for reviewing the proposal. Answers inline.
> > > > >
> > > > > On Mon, Oct 12, 2015 at 10:53 AM, Jay Kreps 
> > wrote:
> > > > >
> > > > > > Hey guys,
> > > > > >
> > > > > > I think this is an important feature and one we've talked about
> > for a
> > > > > > while. I really think trying to invent a new nomenclature is
> going
> > to
> > > > > make
> > > > > > it hard for people to understand, though. As such I recommend we
> > call
> > > > > > namespaces "directories" and denote them with '/'--this will make
> > the
> > > > > > feature 1000x more understandable to people.
> > > > >
> > > > > Essentially you are suggesting two things here.
> > > > > 1. Use "Directory" instead of "Namespace" as it is more intuitive.
> I
> > > > agree.
> > > > > 2. 

Re: [DISCUSS] KIP-37 - Add namespaces in Kafka

2015-10-14 Thread Ashish Singh
On Mon, Oct 12, 2015 at 7:16 PM, Jay Kreps  wrote:

> Okay this is similar to what I think we have talked about before. Let me
> elaborate on the idea that I think has been floating around--it's pretty
> similar with a few differences.
>
> I think what you are calling the "default namespace" is basically what I
> would call the "current working directory" with paths not beginning with
> '/' being interpreted relative to this directory as in the fs.
>
> One thing you have to work out is what levels in this hierarchy you can
> actually subscribe to. I think you are assuming only what we currently
> consider a "topic", i.e. the first level of directories but not the
> partitions or parent dirs, would be subscribable. If you think about it,
> though, that constraint is a bit arbitrary.
>
> I'd propose instead the semantics that:
> - Subscribing to /a/b/c/0 means subscribing to the 0th partition of topic
> "c" in directory /a/b
> - Subscribing to /a/b/c means subscribing to all partitions in
> topic/directory "c"
> - Subscribing to /a/b means subscribing to all partitions in all
> topics/subdirectories under a/b recursively
>
Seems reasonable.

>
> Effectively the concept of topics goes away entirely--you just have
> partitions/logs and directories. In this respect rather than adding new
> concepts this new feature would actually just generalizes what we have
> (which I think is a good thing).
>
> -Jay
>
> On Mon, Oct 12, 2015 at 6:24 PM, Ashish Singh  wrote:
>
> > On Mon, Oct 12, 2015 at 5:42 PM, Jay Kreps  wrote:
> >
> > > Great. I definitely would strongly favor carrying over user's intuition
> > > from FS unless we think we need a very different model. The minor
> details
> > > like the seperator and namespace term will help with that.
> > >
> > > Follow-up question, say I have a layout like
> > >/chicago-datacenter/user-events/pageviews
> > > Can I subscribe to
> > >/chicago-datacenter/user-events
> > >
> > Yes, however they will have need a regex like
> > /chicago-datacenter/user-events/*
> >
> > > to get the full firehose of user events from chicago? Can I subscribe
> to
> > >/*/user-events
> > > to get user events originating from all datacenters?
> > >
> > Yes, however they will have need a regex like
> > /chicago-datacenter/user-events/*
> > Yes
> >
> > >
> > > (Assuming, for now, that these are all in the same cluster...)
> > >
> > > Also, just to confirm, it sounds from the proposal like config
> overrides
> > > would become fully hierarchical so you can override config at any
> > directory
> > > point. This will add complexity in implementation but I think will
> likely
> > > be much more operator friendly.
> > >
> > Yes, that is the idea.
> >
> > >
> > > There are about a thousand details to discuss in terms of how this
> would
> > > impact the metadata request, various zk entries, and various other
> > aspects,
> > > but probably it makes sense to first agree on how we would want it to
> > work
> > > and then start to dive into how to implement that.
> > >
> > Agreed.
> >
> > >
> > > -Jay
> > >
> > > On Mon, Oct 12, 2015 at 5:28 PM, Ashish Singh 
> > wrote:
> > >
> > > > Hey Jay, thanks for reviewing the proposal. Answers inline.
> > > >
> > > > On Mon, Oct 12, 2015 at 10:53 AM, Jay Kreps 
> wrote:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > I think this is an important feature and one we've talked about
> for a
> > > > > while. I really think trying to invent a new nomenclature is going
> to
> > > > make
> > > > > it hard for people to understand, though. As such I recommend we
> call
> > > > > namespaces "directories" and denote them with '/'--this will make
> the
> > > > > feature 1000x more understandable to people.
> > > >
> > > > Essentially you are suggesting two things here.
> > > > 1. Use "Directory" instead of "Namespace" as it is more intuitive. I
> > > agree.
> > > > 2. Make '/' as delimiter instead of ':'. Fine with me and I agree if
> we
> > > > call these directories, '/' is the way to go.
> > > >
> > > > I think we should inheret the
> > > > > semantics of normal unix fs in so far as it makes sense.
> > > > >
> > > > > In this approach we get rid of topics entirely, instead we really
> > just
> > > > have
> > > > > partitions which are the equivalent of a file and retain their
> > numeric
> > > > > names, and the existing topic concept is just the first directory
> > level
> > > > but
> > > > > we generalize to allow arbitrarily many more levels of nesting.
> This
> > > > allows
> > > > > categorization of data, such as
> /datacenter1/user-events/page-views/3
> > > and
> > > > > you can subscribe, apply configs or permissions at any level of the
> > > > > hierarchy.
> > > > >
> > > > +1. This actually requires just a minor change to existing proposal,
> > > i.e.,
> > > > "some:namespace:topic" becomes "some/namespace/topic".
> > > >
> > > > >
> > > > > I'm actually 

Re: Kafka Monitoring Framework

2015-10-14 Thread Dong Lin
Hey Lin,

Thanks for your interest. As Kartik mentioned in the blog post, LinkedIn
Kafka team is developing a Kafka monitoring framework to help detect
problem that may only be found, for example, when you run server/client
with different versions for a long time using production traffic. We hope
it can be used in complement to existing unit test and Ducktape test to
make Kafka more robust.

But we don't know when it can be released -- it is still in its early stage
of development.


Thanks,
Dong



On Tue, Oct 13, 2015 at 10:47 PM, Lin Ma  wrote:

> Hi, I read from website that effort has been put on developing Kafka
> monitoring framework recently (detailed below).
> Anyone knows when this framework can be released into kafka major version?
>
> Thanks.
>
>
> https://engineering.linkedin.com/apache-kafka/how-we_re-improving-and-advancing-kafka-linkedin
> Kafka Monitoring Framework
> We have recently started working on this effort to have a standardized way
> to monitor our Kafka clusters. The idea here is to run a set of test
> applications which are producing and consuming data into kafka topics and
> validating the basic guarantees (order, guaranteed delivery, data integrity
> etc.) in addition to the end to end latencies for publishing and consuming
> data. This effort is independent of the monitoring that we already have
> where we monitor metrics emitted by the Kafka brokers.
>
> In addition to monitoring production systems, it will also be used for
> validating a new Kafka build that we might pick from the open source trunk
> to deploy to our production clusters. This will also help us ensure that
> new versions of the kafka broker don't break existing older clients.
>


Re: KIP-28 does not allow Processor to specify partition of output message

2015-10-14 Thread Guozhang Wang
Thanks!

On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch  wrote:

> Ok, cool. I agree we want something simple.  I'll create an issue and
> create a pull request with a proposal. Look for it tomorrow.
>
> On Oct 13, 2015, at 10:25 PM, Guozhang Wang  wrote:
>
> I see your point. Yeah I think it is a good way to add a Partitioner into
> addSink(...) but the Partitioner interface in producer is a bit overkill:
>
> "partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
> valueBytes, Cluster cluster)"
>
> whereas for us we only want to partition on (K key, V value).
>
> Perhaps we should add a new Partitioner interface in Kafka Streams?
>
> Guozhang
>
> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch  wrote:
>
>> This overrides the partitioning logic for all topics, right? That means I
>> have to explicitly call the default partitioning logic for all topics
>> except those that my Producer forwards. I’m guess the best way to do by
>> extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
>> with multiple sinks in my topology, I have to put all of the partitioning
>> logic inside a single class.
>>
>> What would you think about adding an overloaded
>> TopologyBuilder.addSink(…) method that takes a Partitioner (or better yet a
>> smaller functional interface). The resulting SinkProcessor could use that
>> Partitioner instance to set the partition number? That’d be super
>> convenient for users, would keep the logic where it belongs (where the
>> topology defines the sinks), and best of all the implementations won't have
>> to worry about any other topics, such as those used by stores, metrics, or
>> other sinks.
>>
>> Best regards,
>>
>> Randall
>>
>>
>> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)
>> wrote:
>>
>> Hi Randall,
>>
>> You can try to set the partitioner class as
>> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its
>> interface
>> can be found in
>>
>> org.apache.kafka.clients.producer.Partitioner
>>
>> Let me know if it works for you.
>>
>> Guozhang
>>
>> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch 
>> wrote:
>>
>> > The new streams API added with KIP-28 is great. I’ve been using it on a
>> > prototype for a few weeks, and I’m looking forward to it being included
>> in
>> > 0.9.0. However, at the moment, a Processor implementation is not able
>> to
>> > specify the partition number when it outputs messages.
>> >
>> > I’d be happy to log a JIRA and create a PR to add it to the API, but
>> > without knowing all of the history I’m wondering if leaving it out of
>> the
>> > API was intentional.
>> >
>> > Thoughts?
>> >
>> > Best regards,
>> >
>> > Randall Hauch
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>>
>
>
> --
> -- Guozhang
>
>


-- 
-- Guozhang


[jira] [Updated] (KAFKA-2641) Upgrade path for ZK authentication

2015-10-14 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-2641:

Description: 
If a cluster isn't secure and wants to migrate to secure, then it will need to 
execute the following steps with respect to ZK:
# Perform a rolling restart passing the appropriate JAAS file to enable the ZK 
client to authenticate against the server 
# Run a ZkSecurityMigrationTool that will set the appropriate ACLs 
# Perform a second rolling restart to set `zookeeper.enable.secure.acls`

This recipe assumes that the ZK ensemble has security on. 

  was:Add necessary configuration and scripting to make sure that existing 
clusters can turn security on with minimal disruption.


> Upgrade path for ZK authentication
> --
>
> Key: KAFKA-2641
> URL: https://issues.apache.org/jira/browse/KAFKA-2641
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>
> If a cluster isn't secure and wants to migrate to secure, then it will need 
> to execute the following steps with respect to ZK:
> # Perform a rolling restart passing the appropriate JAAS file to enable the 
> ZK client to authenticate against the server 
> # Run a ZkSecurityMigrationTool that will set the appropriate ACLs 
> # Perform a second rolling restart to set `zookeeper.enable.secure.acls`
> This recipe assumes that the ZK ensemble has security on. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2644) Run relevant ducktape tests with SASL_PLAINTEXT and SASL_SSL

2015-10-14 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2644:
---
Summary: Run relevant ducktape tests with SASL_PLAINTEXT and SASL_SSL  
(was: Run relevant ducktape tests with SASL_PLAIN and SASL_SSL)

> Run relevant ducktape tests with SASL_PLAINTEXT and SASL_SSL
> 
>
> Key: KAFKA-2644
> URL: https://issues.apache.org/jira/browse/KAFKA-2644
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 0.9.0.0
>
>
> We need to define which of the existing ducktape tests are relevant. cc 
> [~rsivaram]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2643) Run mirror maker tests in ducktape with SSL

2015-10-14 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2643:
---
Issue Type: Sub-task  (was: Test)
Parent: KAFKA-1682

> Run mirror maker tests in ducktape with SSL
> ---
>
> Key: KAFKA-2643
> URL: https://issues.apache.org/jira/browse/KAFKA-2643
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.9.0.0
>
>
> Mirror maker tests are currently run only with PLAINTEXT. Should be run with 
> SSL as well. This requires console consumer timeout in new consumers which is 
> being added in KAFKA-2603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2642) Run replication tests in ducktape with SSL for clients

2015-10-14 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2642:
---
Issue Type: Sub-task  (was: Test)
Parent: KAFKA-1682

> Run replication tests in ducktape with SSL for clients
> --
>
> Key: KAFKA-2642
> URL: https://issues.apache.org/jira/browse/KAFKA-2642
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.9.0.0
>
>
> Under KAFKA-2581, replication tests were parametrized to run with SSL for 
> interbroker communication, but not for clients. When KAFKA-2603 is committed, 
> the tests should be able to use SSL for clients as well,



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1554) Corrupt index found on clean startup

2015-10-14 Thread Bo Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14956579#comment-14956579
 ] 

Bo Wang commented on KAFKA-1554:


@ Jun Rao, @Mayuresh Gharat  , I don't think this patch is available.
The method given in patch appears to be useless, because the sanityCheck() for 
loop in advance, then logSegments is empty, so the code useless.


> Corrupt index found on clean startup
> 
>
> Key: KAFKA-1554
> URL: https://issues.apache.org/jira/browse/KAFKA-1554
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
> Environment: ubuntu 12.04, oracle jdk 1.7
>Reporter: Alexis Midon
>Assignee: Mayuresh Gharat
>Priority: Critical
> Fix For: 0.10.0.0
>
> Attachments: KAFKA-1554.patch
>
>
> On a clean start up, corrupted index files are found.
> After investigations, it appears that some pre-allocated index files are not 
> "compacted" correctly and the end of the file is full of zeroes.
> As a result, on start up, the last relative offset is zero which yields an 
> offset equal to the base offset.
> The workaround is to delete all index files of size 10MB (the size of the 
> pre-allocated files), and restart. Index files will be re-created.
> {code}
> find $your_data_directory -size 10485760c -name *.index #-delete
> {code}
> This is issue might be related/similar to 
> https://issues.apache.org/jira/browse/KAFKA-1112
> {code}
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,696 
> INFO main kafka.server.KafkaServer.info - [Kafka Server 847605514], starting
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,698 
> INFO main kafka.server.KafkaServer.info - [Kafka Server 847605514], 
> Connecting to zookeeper on 
> zk-main0.XXX:2181,zk-main1.XXX:2181,zk-main2.:2181/production/kafka/main
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,708 
> INFO 
> ZkClient-EventThread-14-zk-main0.XXX.com:2181,zk-main1.XXX.com:2181,zk-main2.XXX.com:2181,zk-main3.XXX.com:2181,zk-main4.XXX.com:2181/production/kafka/main
>  org.I0Itec.zkclient.ZkEventThread.run - Starting ZkClient event thread.
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,714 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,714 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:host.name=i-6b948138.inst.aws.airbnb.com
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,714 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.version=1.7.0_55
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.vendor=Oracle Corporation
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.home=/usr/lib/jvm/jre-7-oracle-x64/jre
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.class.path=libs/snappy-java-1.0.5.jar:libs/scala-library-2.10.1.jar:libs/slf4j-api-1.7.2.jar:libs/jopt-simple-3.2.jar:libs/metrics-annotation-2.2.0.jar:libs/log4j-1.2.15.jar:libs/kafka_2.10-0.8.1.jar:libs/zkclient-0.3.jar:libs/zookeeper-3.3.4.jar:libs/metrics-core-2.2.0.jar
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.io.tmpdir=/tmp
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.compiler=
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:os.name=Linux
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:os.arch=amd64
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,717 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:os.version=3.2.0-61-virtual
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,717 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> 

[jira] [Commented] (KAFKA-2629) Enable getting SSL password from an executable rather than passing plaintext password

2015-10-14 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957626#comment-14957626
 ] 

Ashish K Singh commented on KAFKA-2629:
---

[~junrao] [~gwenshap] what do you guys think. As [~yoderme] mentioned, this 
should enable users to have more secured solution. I can provide a patch if 
there isn't a strong no.

> Enable getting SSL password from an executable rather than passing plaintext 
> password
> -
>
> Key: KAFKA-2629
> URL: https://issues.apache.org/jira/browse/KAFKA-2629
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.9.0.0
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> Currently there are a couple of options to pass SSL passwords to Kafka, i.e., 
> via properties file or via command line argument. Both of these are not 
> recommended security practices.
> * A password on a command line is a no-no: it's trivial to see that password 
> just by using the 'ps' utility.
> * Putting a password into a file, and then passing the location to that file, 
> is the next best option. The access to the file will be governed by unix 
> access permissions which we all know and love. The downside is that the 
> password is still just sitting there in a file, and those who have access can 
> still see it trivially.
> * The most general, secure solution is to provide a layer of abstraction: 
> provide functionality to get the password from "somewhere else".  The most 
> flexible and generic way to do this is to simply call an executable which 
> returns the desired password. 
> ** The executable is again protected with normal file system privileges
> ** The simplest form, a script that looks like "echo 'my-password'", devolves 
> back to putting the password in a file
> ** A more interesting implementation could open up a local encrypted password 
> store and extract the password from it
> ** A maximally secure implementation could contact an external secret manager 
> with centralized control and audit functionality.
> ** In short: getting the password as the output of a script/executable is 
> maximally generic and enables both simple and complex use cases.
> This JIRA intend to add a config param to enable passing an executable to 
> Kafka for SSL passwords.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-14 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957570#comment-14957570
 ] 

Jason Gustafson commented on KAFKA-2017:


One additional point worth considering is how administrative tools will be able 
to query group metadata. We have discussed previously adding a DescribeGroup 
request, which is useful when the groupId is known, but not as useful when you 
want to get a list of groups in the cluster. This latter use case seems more 
straightforward with Zookeeper. With Kafka persistence, the admin client would 
either have to query each coordinator separately or consume the group metadata 
topic.

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-37 - Add namespaces in Kafka

2015-10-14 Thread Jay Kreps
Hey Jason,

I actually think this is one of the advantages. The problem we have today
is that you can't really do bidirectional replication between clusters
because it would actually be a feedback loop.

So the intended use would be that you would have a structure where the
top-level directory was DIFFERENT but the topic names were the same, so if
you maintain
  /chicago-datacenter/actual-topics
  /oregon-datacenter/actual topics
  etc.
Then you replicate
  /chicago-datacenter/* => /oregon-datacenter
and
  /oregon-datacenter/* => /chicago-datacenter

People who want the aggregate feed subscribe to /*/my-event.

The nice thing about this is it gives a unified namespace across all
locations.

Basically exactly what we do now but you no longer need to add new clusters
to get the namespacing.

-Jay


On Wed, Oct 14, 2015 at 11:24 AM, Jason Gustafson 
wrote:

> Hey Ashish, thanks for the write-up. I think having a namespace capability
> is a useful feature for Kafka, in particular with the addition of the
> authorization layer. I probably prefer Jay's hierarchical approach if we're
> going to embed the namespace in the topic name since it seems more general.
> That said, one advantage of having a namespace independent of the topic
> name is that it simplifies replication between namespaces a bit since you
> don't have to parse and rewrite topic names. Assuming that hierarchical
> topics will happen eventually anyway, I imagine a common pattern would be
> to preserve the same directory structure in multiple namespaces, so having
> an easy mechanism for applications to switch between them would be nice.
> The namespace is kind of analogous to a chroot in this case. Of course you
> can achieve the same thing by having a configurable topic prefix, just you
> have to do all the topic rewriting, which I'm guessing will be a little
> annoying to implement in all of the clients and tools. However, the
> tradeoff (as you mention in the KIP) is that all request schemas have to be
> updated, which is also annoying.
>
> -Jason
>
> On Wed, Oct 14, 2015 at 12:03 AM, Ashish Singh 
> wrote:
>
> > On Mon, Oct 12, 2015 at 7:37 PM, Gwen Shapira  wrote:
> >
> > > This works really nicely from the consumer side, but what about the
> > > producer? If there are no more topics,do we allow producing to a
> > directory
> > > and have the Partitioner hash-partition messages between all partitions
> > in
> > > the multiple levels in a directory?
> > >
> > Good point.
> >
> > I am personally in favor of maintaining current behavior for producer,
> > i.e., letting users to only produce to a topic. This is different for
> > consumers, the suggested behavior is inline with current behavior. One
> can
> > use regex subscription to achieve the same even today.
> >
> > >
> > > Also, I think we want to preserve the consumer terminology of
> "subscribe"
> > > to topics / directories, but "assign" partitions - since the consumer
> > > behavior is different in those cases.
> > >
> > > On Mon, Oct 12, 2015 at 7:16 PM, Jay Kreps  wrote:
> > >
> > > > Okay this is similar to what I think we have talked about before. Let
> > me
> > > > elaborate on the idea that I think has been floating around--it's
> > pretty
> > > > similar with a few differences.
> > > >
> > > > I think what you are calling the "default namespace" is basically
> what
> > I
> > > > would call the "current working directory" with paths not beginning
> > with
> > > > '/' being interpreted relative to this directory as in the fs.
> > > >
> > > > One thing you have to work out is what levels in this hierarchy you
> can
> > > > actually subscribe to. I think you are assuming only what we
> currently
> > > > consider a "topic", i.e. the first level of directories but not the
> > > > partitions or parent dirs, would be subscribable. If you think about
> > it,
> > > > though, that constraint is a bit arbitrary.
> > > >
> > > > I'd propose instead the semantics that:
> > > > - Subscribing to /a/b/c/0 means subscribing to the 0th partition of
> > topic
> > > > "c" in directory /a/b
> > > > - Subscribing to /a/b/c means subscribing to all partitions in
> > > > topic/directory "c"
> > > > - Subscribing to /a/b means subscribing to all partitions in all
> > > > topics/subdirectories under a/b recursively
> > > >
> > > > Effectively the concept of topics goes away entirely--you just have
> > > > partitions/logs and directories. In this respect rather than adding
> new
> > > > concepts this new feature would actually just generalizes what we
> have
> > > > (which I think is a good thing).
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Oct 12, 2015 at 6:24 PM, Ashish Singh 
> > > wrote:
> > > >
> > > > > On Mon, Oct 12, 2015 at 5:42 PM, Jay Kreps 
> wrote:
> > > > >
> > > > > > Great. I definitely would strongly favor carrying over user's
> > > intuition
> > > > > > from FS unless we 

[jira] [Commented] (KAFKA-2477) Replicas spuriously deleting all segments in partition

2015-10-14 Thread Chinmay Soman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957645#comment-14957645
 ] 

Chinmay Soman commented on KAFKA-2477:
--

[~ewencp] Totally agree. Its just that the current 0.8.2.0 version we're using 
: has been "tampered" with. I see a lot of commits from our previous team here 
and I need to be careful not to break anything. At the moment, I've applied the 
patch on top of our current version and tested in staging. I'll be rolling it 
out on our biggest cluster soon to validate whether it works.

> Replicas spuriously deleting all segments in partition
> --
>
> Key: KAFKA-2477
> URL: https://issues.apache.org/jira/browse/KAFKA-2477
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Håkon Hitland
>Assignee: Jiangjie Qin
> Fix For: 0.9.0.0
>
> Attachments: Screen Shot 2015-10-10 at 6.54.44 PM.png, kafka_log.txt, 
> kafka_log_trace.txt
>
>
> We're seeing some strange behaviour in brokers: a replica will sometimes 
> schedule all segments in a partition for deletion, and then immediately start 
> replicating them back, triggering our check for under-replicating topics.
> This happens on average a couple of times a week, for different brokers and 
> topics.
> We have per-topic retention.ms and retention.bytes configuration, the topics 
> where we've seen this happen are hitting the size limit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2603: Add timeout arg to ConsoleConsumer...

2015-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/274


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: KIP-28 does not allow Processor to specify partition of output message

2015-10-14 Thread Randall Hauch
It absolutely is important that the partitioning logic for a single topic be 
the same across an entire cluster. IOW, if a topology has a single sink, then 
no matter where that topology is run in the cluster, it had better use the same 
partitioning logic. I would argue that when the partitioning logic varies from 
the default logic, it’s far better to encapsulate it within the topology’s 
definition, and adding it to the sink is a very easy way to do this (and very 
natural for the developer using Kafka Streams).

However, centralizing the partitioning logic for all streams is certainly not 
ideal, primarily because different topics will likely need to be partitioned in 
different ways. This is especially true for stateful stream processing, which 
depends on messages with the same key going to the same processor instance that 
owns that keyed data. IOW, the partitioning logic used by a producer is 
strongly informed by how the *downstream stateful consumers* are 
organized/clustered. It gets far more complicated when considering built-in 
topics used by offset management, state storage, and metrics. 

The bottom line is that *different* topics will likely need to be partitioned 
differently.

On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda 
(yasuhiro.mats...@gmail.com) wrote:

A partitioning scheme should be a cluster wide thing. Letting each sink  
have a different partitioning scheme does not make sense to me. A  
partitioning scheme is not specific to a stream job, each task or a sink. I  
think specifying it at sink level is more error prone.  

If a user wants to customize a partitioning scheme, he/she also want to  
manage it at some central place, maybe a code repo, or a jar file. All  
application must use the same logic, otherwise data will be messed up.  
Thus, a single class representing all partitioning logic is not a bad thing  
at all. (The code organization wise, all logic does not necessarily in the  
single class, of course.)  


On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch  wrote:  

> Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a  
> PR with the proposed change.  
>  
> Thanks!  
>  
>  
> On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)  
> wrote:  
>  
> Thanks!  
>  
> On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch  wrote:  
> Ok, cool. I agree we want something simple. I'll create an issue and  
> create a pull request with a proposal. Look for it tomorrow.  
>  
> On Oct 13, 2015, at 10:25 PM, Guozhang Wang  wrote:  
>  
> I see your point. Yeah I think it is a good way to add a Partitioner into  
> addSink(...) but the Partitioner interface in producer is a bit overkill:  
>  
> "partition(String topic, Object key, byte[] keyBytes, Object value, byte[]  
> valueBytes, Cluster cluster)"  
>  
> whereas for us we only want to partition on (K key, V value).  
>  
> Perhaps we should add a new Partitioner interface in Kafka Streams?  
>  
> Guozhang  
>  
> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch  wrote:  
> This overrides the partitioning logic for all topics, right? That means I  
> have to explicitly call the default partitioning logic for all topics  
> except those that my Producer forwards. I’m guess the best way to do by  
> extending org.apache.kafka.clients.producer.DefaultProducer. Of course,  
> with multiple sinks in my topology, I have to put all of the partitioning  
> logic inside a single class.  
>  
> What would you think about adding an overloaded TopologyBuilder.addSink(…)  
> method that takes a Partitioner (or better yet a smaller functional  
> interface). The resulting SinkProcessor could use that Partitioner instance  
> to set the partition number? That’d be super convenient for users, would  
> keep the logic where it belongs (where the topology defines the sinks), and  
> best of all the implementations won't have to worry about any other topics,  
> such as those used by stores, metrics, or other sinks.  
>  
> Best regards,  
>  
> Randall  
>  
>  
> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)  
> wrote:  
>  
> Hi Randall,  
>  
> You can try to set the partitioner class as  
> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface  
> can be found in  
>  
> org.apache.kafka.clients.producer.Partitioner  
>  
> Let me know if it works for you.  
>  
> Guozhang  
>  
> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch  wrote:  
>  
> > The new streams API added with KIP-28 is great. I’ve been using it on a  
> > prototype for a few weeks, and I’m looking forward to it being included  
> in  
> > 0.9.0. However, at the moment, a Processor implementation is not able to  
> > specify the partition number when it outputs messages.  
> >  
> > I’d be happy to log a JIRA and create a PR to add it to the API, but  
> > without knowing all of the history I’m 

Re: KIP-28 does not allow Processor to specify partition of output message

2015-10-14 Thread Guozhang Wang
I think part of Yasu's motivation for cross-cluster partitioning is that,
for example, there could be multiple stream jobs reading / writing to some
shared topics but controlled by different teams or services inside an
organization, and if one team mistakenly specifying the partitioning in a
wrong way it will interfere with other teams, hence a global management of
partitioning scheme may be required just like a global schema registry
service for Kafka.

With the producer's Partitioner interface, we can still make different
partitioning schemes for different topics in a single class: just
switch-branch on the topic name and cast the key-value types, but that
would be a bit awkward. So I am preferring to a customizable partitioner in
the sink spec for better user programmability.

Guozhang

On Wed, Oct 14, 2015 at 1:03 PM, Randall Hauch  wrote:

> It absolutely is important that the partitioning logic for a single topic
> be the same across an entire cluster. IOW, if a topology has a single sink,
> then no matter where that topology is run in the cluster, it had better use
> the same partitioning logic. I would argue that when the partitioning logic
> varies from the default logic, it’s far better to encapsulate it within the
> topology’s definition, and adding it to the sink is a very easy way to do
> this (and very natural for the developer using Kafka Streams).
>
> However, centralizing the partitioning logic for all streams is certainly
> not ideal, primarily because different topics will likely need to be
> partitioned in different ways. This is especially true for stateful stream
> processing, which depends on messages with the same key going to the same
> processor instance that owns that keyed data. IOW, the partitioning logic
> used by a producer is strongly informed by how the *downstream stateful
> consumers* are organized/clustered. It gets far more complicated when
> considering built-in topics used by offset management, state storage, and
> metrics.
>
> The bottom line is that *different* topics will likely need to be
> partitioned differently.
>
> On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda (
> yasuhiro.mats...@gmail.com) wrote:
>
> A partitioning scheme should be a cluster wide thing. Letting each sink
> have a different partitioning scheme does not make sense to me. A
> partitioning scheme is not specific to a stream job, each task or a sink.
> I
> think specifying it at sink level is more error prone.
>
> If a user wants to customize a partitioning scheme, he/she also want to
> manage it at some central place, maybe a code repo, or a jar file. All
> application must use the same logic, otherwise data will be messed up.
> Thus, a single class representing all partitioning logic is not a bad
> thing
> at all. (The code organization wise, all logic does not necessarily in the
> single class, of course.)
>
>
> On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch  wrote:
>
> > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a
> > PR with the proposed change.
> >
> > Thanks!
> >
> >
> > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)
> > wrote:
> >
> > Thanks!
> >
> > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch 
> wrote:
> > Ok, cool. I agree we want something simple. I'll create an issue and
> > create a pull request with a proposal. Look for it tomorrow.
> >
> > On Oct 13, 2015, at 10:25 PM, Guozhang Wang  wrote:
> >
> > I see your point. Yeah I think it is a good way to add a Partitioner
> into
> > addSink(...) but the Partitioner interface in producer is a bit
> overkill:
> >
> > "partition(String topic, Object key, byte[] keyBytes, Object value,
> byte[]
> > valueBytes, Cluster cluster)"
> >
> > whereas for us we only want to partition on (K key, V value).
> >
> > Perhaps we should add a new Partitioner interface in Kafka Streams?
> >
> > Guozhang
> >
> > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch 
> wrote:
> > This overrides the partitioning logic for all topics, right? That means
> I
> > have to explicitly call the default partitioning logic for all topics
> > except those that my Producer forwards. I’m guess the best way to do by
> > extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
> > with multiple sinks in my topology, I have to put all of the
> partitioning
> > logic inside a single class.
> >
> > What would you think about adding an overloaded
> TopologyBuilder.addSink(…)
> > method that takes a Partitioner (or better yet a smaller functional
> > interface). The resulting SinkProcessor could use that Partitioner
> instance
> > to set the partition number? That’d be super convenient for users, would
> > keep the logic where it belongs (where the topology defines the sinks),
> and
> > best of all the implementations won't have to worry about any other
> topics,
> > such as those used by stores, metrics, or other sinks.

Re: KIP-28 does not allow Processor to specify partition of output message

2015-10-14 Thread Yasuhiro Matsuda
>The bottom line is that *different* topics will likely need to be
partitioned differently.

You can do it with the existing Partitioner interface. Centralizing the
logic doesn't mean all topics must use the same partitioning scheme.


On Wed, Oct 14, 2015 at 1:03 PM, Randall Hauch  wrote:

> It absolutely is important that the partitioning logic for a single topic
> be the same across an entire cluster. IOW, if a topology has a single sink,
> then no matter where that topology is run in the cluster, it had better use
> the same partitioning logic. I would argue that when the partitioning logic
> varies from the default logic, it’s far better to encapsulate it within the
> topology’s definition, and adding it to the sink is a very easy way to do
> this (and very natural for the developer using Kafka Streams).
>
> However, centralizing the partitioning logic for all streams is certainly
> not ideal, primarily because different topics will likely need to be
> partitioned in different ways. This is especially true for stateful stream
> processing, which depends on messages with the same key going to the same
> processor instance that owns that keyed data. IOW, the partitioning logic
> used by a producer is strongly informed by how the *downstream stateful
> consumers* are organized/clustered. It gets far more complicated when
> considering built-in topics used by offset management, state storage, and
> metrics.
>
> The bottom line is that *different* topics will likely need to be
> partitioned differently.
>
> On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda (
> yasuhiro.mats...@gmail.com) wrote:
>
> A partitioning scheme should be a cluster wide thing. Letting each sink
> have a different partitioning scheme does not make sense to me. A
> partitioning scheme is not specific to a stream job, each task or a sink.
> I
> think specifying it at sink level is more error prone.
>
> If a user wants to customize a partitioning scheme, he/she also want to
> manage it at some central place, maybe a code repo, or a jar file. All
> application must use the same logic, otherwise data will be messed up.
> Thus, a single class representing all partitioning logic is not a bad
> thing
> at all. (The code organization wise, all logic does not necessarily in the
> single class, of course.)
>
>
> On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch  wrote:
>
> > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a
> > PR with the proposed change.
> >
> > Thanks!
> >
> >
> > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)
> > wrote:
> >
> > Thanks!
> >
> > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch 
> wrote:
> > Ok, cool. I agree we want something simple. I'll create an issue and
> > create a pull request with a proposal. Look for it tomorrow.
> >
> > On Oct 13, 2015, at 10:25 PM, Guozhang Wang  wrote:
> >
> > I see your point. Yeah I think it is a good way to add a Partitioner
> into
> > addSink(...) but the Partitioner interface in producer is a bit
> overkill:
> >
> > "partition(String topic, Object key, byte[] keyBytes, Object value,
> byte[]
> > valueBytes, Cluster cluster)"
> >
> > whereas for us we only want to partition on (K key, V value).
> >
> > Perhaps we should add a new Partitioner interface in Kafka Streams?
> >
> > Guozhang
> >
> > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch 
> wrote:
> > This overrides the partitioning logic for all topics, right? That means
> I
> > have to explicitly call the default partitioning logic for all topics
> > except those that my Producer forwards. I’m guess the best way to do by
> > extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
> > with multiple sinks in my topology, I have to put all of the
> partitioning
> > logic inside a single class.
> >
> > What would you think about adding an overloaded
> TopologyBuilder.addSink(…)
> > method that takes a Partitioner (or better yet a smaller functional
> > interface). The resulting SinkProcessor could use that Partitioner
> instance
> > to set the partition number? That’d be super convenient for users, would
> > keep the logic where it belongs (where the topology defines the sinks),
> and
> > best of all the implementations won't have to worry about any other
> topics,
> > such as those used by stores, metrics, or other sinks.
> >
> > Best regards,
> >
> > Randall
> >
> >
> > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)
> > wrote:
> >
> > Hi Randall,
> >
> > You can try to set the partitioner class as
> > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its
> interface
> > can be found in
> >
> > org.apache.kafka.clients.producer.Partitioner
> >
> > Let me know if it works for you.
> >
> > Guozhang
> >
> > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch 
> wrote:
> >
> > > The new streams API added with KIP-28 is great. I’ve 

[jira] [Commented] (KAFKA-2397) leave group request

2015-10-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957718#comment-14957718
 ] 

Guozhang Wang commented on KAFKA-2397:
--

For consumer shutdown, in the ZK-based consumer we will immediately delete its 
ephemeral node so that other members will be notified, hence now in the new 
consumer without this fix we are effectively introducing an regression.

For consumer hard failure cases though, with ZK-based old consumer it also 
takes a session timeout period to be detected; so I feel modifying the socket 
server to penetrate client-id information to pass to coordinator in order to 
improve on this case may be an overkill.

> leave group request
> ---
>
> Key: KAFKA-2397
> URL: https://issues.apache.org/jira/browse/KAFKA-2397
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
> Fix For: 0.9.0.0
>
>
> Let's say every consumer in a group has session timeout s. Currently, if a 
> consumer leaves the group, the worst case time to stabilize the group is 2s 
> (s to detect the consumer failure + s for the rebalance window). If a 
> consumer instead can declare they are leaving the group, the worst case time 
> to stabilize the group would just be the s associated with the rebalance 
> window.
> This is a low priority optimization!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: Kafka Monitoring Framework

2015-10-14 Thread Lin Ma
Thanks Dong. Then I bet it will take a while before its release. 

Lin


-Original Message-
From: Dong Lin [mailto:lindon...@gmail.com] 
Sent: Tuesday, October 13, 2015 11:21 PM
To: dev@kafka.apache.org
Subject: Re: Kafka Monitoring Framework

Hey Lin,

Thanks for your interest. As Kartik mentioned in the blog post, LinkedIn
Kafka team is developing a Kafka monitoring framework to help detect
problem that may only be found, for example, when you run server/client
with different versions for a long time using production traffic. We hope
it can be used in complement to existing unit test and Ducktape test to
make Kafka more robust.

But we don't know when it can be released -- it is still in its early stage
of development.


Thanks,
Dong



On Tue, Oct 13, 2015 at 10:47 PM, Lin Ma  wrote:

> Hi, I read from website that effort has been put on developing Kafka
> monitoring framework recently (detailed below).
> Anyone knows when this framework can be released into kafka major version?
>
> Thanks.
>
>
> https://engineering.linkedin.com/apache-kafka/how-we_re-improving-and-advancing-kafka-linkedin
> Kafka Monitoring Framework
> We have recently started working on this effort to have a standardized way
> to monitor our Kafka clusters. The idea here is to run a set of test
> applications which are producing and consuming data into kafka topics and
> validating the basic guarantees (order, guaranteed delivery, data integrity
> etc.) in addition to the end to end latencies for publishing and consuming
> data. This effort is independent of the monitoring that we already have
> where we monitor metrics emitted by the Kafka brokers.
>
> In addition to monitoring production systems, it will also be used for
> validating a new Kafka build that we might pick from the open source trunk
> to deploy to our production clusters. This will also help us ensure that
> new versions of the kafka broker don't break existing older clients.
>


[jira] [Commented] (KAFKA-2397) leave group request

2015-10-14 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957550#comment-14957550
 ] 

Jay Kreps commented on KAFKA-2397:
--

Dunno if we closed the loop on the approach.

[~onurkaraman] Yeah the pro of the TCP approach is that all clients get it 
automatically even in hard app failure cases.

The downside is that the implementation on the server side is more involved and 
there is some risk of unnecessary rebalances if there are situations that cause 
the connection to be lost.

Another aspect is the ability to implement a shutdown without rebalance for 
quick restarts. This is particularly useful for stream processing where there 
is associated state that takes work to rebuild. I don't think this can be 
implemented easily with the TCP connection approach.

I think I'm on board with doing it explicitly.

I think the other question is whether it is cleaner to add a field to the ack 
or make a custom request. I don't have a strong opinion on that though in 
general I think fewer requests is better.

> leave group request
> ---
>
> Key: KAFKA-2397
> URL: https://issues.apache.org/jira/browse/KAFKA-2397
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
> Fix For: 0.9.0.0
>
>
> Let's say every consumer in a group has session timeout s. Currently, if a 
> consumer leaves the group, the worst case time to stabilize the group is 2s 
> (s to detect the consumer failure + s for the rebalance window). If a 
> consumer instead can declare they are leaving the group, the worst case time 
> to stabilize the group would just be the s associated with the rebalance 
> window.
> This is a low priority optimization!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KIP-28 does not allow Processor to specify partition of output message

2015-10-14 Thread Guozhang Wang
I agree that cluster-wide partitioning would be preferable in cases where
multiple producers from different services sharing the same topics, and
this may well be resolved by the same manner like the schema registry
service. On the other hand, I think this is not a problem that would be
solved at the KStream level which only considers per-job configs.

I think a KStream-layer partitioner in addition to the Producer partitioner
may not be a bad thing in that:

1) like Randall mentioned, with the producer Partitioner we basically need
to switch on ALL possible topics, and casting the Object key / value into
the proper types, and then do the partitioning logic; this is awkward
especially for cases that users only want to have customized partitioning
for a small subset of topics while being OK to leave the others to default
behavior (i.e. murmur hash on key). Extending the DefaultPartitioner may
partially solve this problem but not all.

2) with KStream-layer partitioner, it basically allows us to "overwrite"
the partitioning for probably a subset of partitions while leaving others
to the producer's default partitioner. This partitioner would be exposed to
users working on the lower-level processor API layer, while on the KStream
DSL layer we may well "infer" the partitioning scheme for most cases based
on join / aggregation specs, hence would not incur much burden for users.

Guozhang

On Wed, Oct 14, 2015 at 10:57 AM, Yasuhiro Matsuda <
yasuhiro.mats...@gmail.com> wrote:

> A partitioning scheme should be a cluster wide thing. Letting each sink
> have a different partitioning scheme does not make sense to me. A
> partitioning scheme is not specific to a stream job, each task or a sink. I
> think specifying it at sink level is more error prone.
>
> If a user wants to customize a partitioning scheme, he/she also want to
> manage it at some central place, maybe a code repo, or a jar file. All
> application must use the same logic, otherwise data will be messed up.
> Thus, a single class representing all partitioning logic is not a bad thing
> at all. (The code organization wise, all logic does not necessarily in the
> single class, of course.)
>
>
> On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch  wrote:
>
>> Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a
>> PR with the proposed change.
>>
>> Thanks!
>>
>>
>> On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)
>> wrote:
>>
>> Thanks!
>>
>> On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch  wrote:
>> Ok, cool. I agree we want something simple.  I'll create an issue and
>> create a pull request with a proposal. Look for it tomorrow.
>>
>> On Oct 13, 2015, at 10:25 PM, Guozhang Wang  wrote:
>>
>> I see your point. Yeah I think it is a good way to add a Partitioner into
>> addSink(...) but the Partitioner interface in producer is a bit overkill:
>>
>> "partition(String topic, Object key, byte[] keyBytes, Object value,
>> byte[] valueBytes, Cluster cluster)"
>>
>> whereas for us we only want to partition on (K key, V value).
>>
>> Perhaps we should add a new Partitioner interface in Kafka Streams?
>>
>> Guozhang
>>
>> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch  wrote:
>> This overrides the partitioning logic for all topics, right? That means I
>> have to explicitly call the default partitioning logic for all topics
>> except those that my Producer forwards. I’m guess the best way to do by
>> extending org.apache.kafka.clients.producer.DefaultProducer. Of course,
>> with multiple sinks in my topology, I have to put all of the partitioning
>> logic inside a single class.
>>
>> What would you think about adding an overloaded
>> TopologyBuilder.addSink(…) method that takes a Partitioner (or better yet a
>> smaller functional interface). The resulting SinkProcessor could use that
>> Partitioner instance to set the partition number? That’d be super
>> convenient for users, would keep the logic where it belongs (where the
>> topology defines the sinks), and best of all the implementations won't have
>> to worry about any other topics, such as those used by stores, metrics, or
>> other sinks.
>>
>> Best regards,
>>
>> Randall
>>
>>
>> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)
>> wrote:
>>
>> Hi Randall,
>>
>> You can try to set the partitioner class as
>> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its
>> interface
>> can be found in
>>
>> org.apache.kafka.clients.producer.Partitioner
>>
>> Let me know if it works for you.
>>
>> Guozhang
>>
>> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch  wrote:
>>
>> > The new streams API added with KIP-28 is great. I’ve been using it on a
>> > prototype for a few weeks, and I’m looking forward to it being included
>> in
>> > 0.9.0. However, at the moment, a Processor implementation is not able to
>> > specify the partition number when it outputs messages.

[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957665#comment-14957665
 ] 

Guozhang Wang commented on KAFKA-2017:
--

Here is my two cents comparing those two approaches:

1. ZK-based approach:

Pros: simple implementation, and easy tooling for querying consumer group 
states.
Cons: ZK-writes (and perhaps also ZK-reads, which may be optimized further in 
the loading process).

2. Kafka-based approach:

Pros: reuse offset topic, no ZK burden.
Cons: not-so-simple implementation, admin tools may need sending the 
consumer-group-metadata-request only to the coordinator.

Now about the tradeoffs, I personally think the conceptual cleanness of 
"putting broker metadata in ZK while consumer metadata in Kafka" should not put 
much weights in the design, since what really matters is just the read / write 
workloads. For example we decided to move consumer offsets from ZK to Kafka not 
primarily because we want to separate it from broker registry but just that its 
write frequency is too high for ZK (of course later there are some other 
motivations like security / multi-tenancy so that we want to make consumer 
ZK-free), while broker registry changes are relatively infrequent so that it 
can live in ZK. Consumer group changes are somewhere between this two 
workloads, but I assume it would still be closer to broker registry changes in 
the spectrum compared with consumer offset changes.

In addition, I feel it is not a best solution in general to persist all data in 
a logging format: it gives you better write performance in the trade of worse 
read performance. For our case, we could unnecessarily increase the loading 
time upon coordinator migration, either we piggy-back it in offsets topic or in 
another topic (BTW I agree with [~jjkoshy] that piggy-backing in the offset 
topic is a bit tricky). If we agree that consumer membership change workload is 
rather write-light instead of write-heavy, then this trade may not be 
worth-while.

I also second [~hachikuji]'s point about ops benefits in storing membership in 
ZK: it allows all brokers to handle consumer group metadata request, and in 
addition for ops team to get around admin requests (KIP-4) but directly query 
ZK.

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2603) Add timeout to ConsoleConsumer running with new consumer

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957693#comment-14957693
 ] 

ASF GitHub Bot commented on KAFKA-2603:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/274


> Add timeout to ConsoleConsumer running with new consumer
> 
>
> Key: KAFKA-2603
> URL: https://issues.apache.org/jira/browse/KAFKA-2603
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.9.0.0
>
>
> ConsoleConsumer exits when no messages are received for a timeout period when 
> run with the old consumer since the old consumer had a timeout parameter. 
> This behaviour is not available with the new consumer and hence ducktape 
> tests which rely on the timeout cannot be run with the new consumer. 
> [~granders] has suggested a solution in KAFKA-2581.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2603) Add timeout to ConsoleConsumer running with new consumer

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2603:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 274
[https://github.com/apache/kafka/pull/274]

> Add timeout to ConsoleConsumer running with new consumer
> 
>
> Key: KAFKA-2603
> URL: https://issues.apache.org/jira/browse/KAFKA-2603
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.9.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.9.0.0
>
>
> ConsoleConsumer exits when no messages are received for a timeout period when 
> run with the old consumer since the old consumer had a timeout parameter. 
> This behaviour is not available with the new consumer and hence ducktape 
> tests which rely on the timeout cannot be run with the new consumer. 
> [~granders] has suggested a solution in KAFKA-2581.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2629) Enable getting SSL password from an executable rather than passing plaintext password

2015-10-14 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957726#comment-14957726
 ] 

Gwen Shapira commented on KAFKA-2629:
-

[~singhashish], I see the need for this type of integration - I also ran into 
customers with these requirements, whether or not they make sense :)

Due to the tight timeline for 0.9.0.0 (two weeks until branching!) and our need 
to stabilize the security interfaces for now and start testing them, would you 
be OK with tabling the discussion on this feature for now and planning it for 
0.9.1.0 instead?

This will allow us to stabilize the security features and making sure they are 
usable without adding more interfaces and complexity.





> Enable getting SSL password from an executable rather than passing plaintext 
> password
> -
>
> Key: KAFKA-2629
> URL: https://issues.apache.org/jira/browse/KAFKA-2629
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.9.0.0
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> Currently there are a couple of options to pass SSL passwords to Kafka, i.e., 
> via properties file or via command line argument. Both of these are not 
> recommended security practices.
> * A password on a command line is a no-no: it's trivial to see that password 
> just by using the 'ps' utility.
> * Putting a password into a file, and then passing the location to that file, 
> is the next best option. The access to the file will be governed by unix 
> access permissions which we all know and love. The downside is that the 
> password is still just sitting there in a file, and those who have access can 
> still see it trivially.
> * The most general, secure solution is to provide a layer of abstraction: 
> provide functionality to get the password from "somewhere else".  The most 
> flexible and generic way to do this is to simply call an executable which 
> returns the desired password. 
> ** The executable is again protected with normal file system privileges
> ** The simplest form, a script that looks like "echo 'my-password'", devolves 
> back to putting the password in a file
> ** A more interesting implementation could open up a local encrypted password 
> store and extract the password from it
> ** A maximally secure implementation could contact an external secret manager 
> with centralized control and audit functionality.
> ** In short: getting the password as the output of a script/executable is 
> maximally generic and enables both simple and complex use cases.
> This JIRA intend to add a config param to enable passing an executable to 
> Kafka for SSL passwords.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Monitoring Framework

2015-10-14 Thread Dong Lin
Lin: Yes, I think so.


On Wed, Oct 14, 2015 at 1:48 PM, Lin Ma  wrote:

> Thanks Dong. Then I bet it will take a while before its release.
>
> Lin
>
>
> -Original Message-
> From: Dong Lin [mailto:lindon...@gmail.com]
> Sent: Tuesday, October 13, 2015 11:21 PM
> To: dev@kafka.apache.org
> Subject: Re: Kafka Monitoring Framework
>
> Hey Lin,
>
> Thanks for your interest. As Kartik mentioned in the blog post, LinkedIn
> Kafka team is developing a Kafka monitoring framework to help detect
> problem that may only be found, for example, when you run server/client
> with different versions for a long time using production traffic. We hope
> it can be used in complement to existing unit test and Ducktape test to
> make Kafka more robust.
>
> But we don't know when it can be released -- it is still in its early stage
> of development.
>
>
> Thanks,
> Dong
>
>
>
> On Tue, Oct 13, 2015 at 10:47 PM, Lin Ma  wrote:
>
> > Hi, I read from website that effort has been put on developing Kafka
> > monitoring framework recently (detailed below).
> > Anyone knows when this framework can be released into kafka major
> version?
> >
> > Thanks.
> >
> >
> >
> https://engineering.linkedin.com/apache-kafka/how-we_re-improving-and-advancing-kafka-linkedin
> > Kafka Monitoring Framework
> > We have recently started working on this effort to have a standardized
> way
> > to monitor our Kafka clusters. The idea here is to run a set of test
> > applications which are producing and consuming data into kafka topics and
> > validating the basic guarantees (order, guaranteed delivery, data
> integrity
> > etc.) in addition to the end to end latencies for publishing and
> consuming
> > data. This effort is independent of the monitoring that we already have
> > where we monitor metrics emitted by the Kafka brokers.
> >
> > In addition to monitoring production systems, it will also be used for
> > validating a new Kafka build that we might pick from the open source
> trunk
> > to deploy to our production clusters. This will also help us ensure that
> > new versions of the kafka broker don't break existing older clients.
> >
>


[jira] [Updated] (KAFKA-2593) KeyValueStores should not require use of the context's default serializers and deserializers

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2593:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 255
[https://github.com/apache/kafka/pull/255]

> KeyValueStores should not require use of the context's default serializers 
> and deserializers
> 
>
> Key: KAFKA-2593
> URL: https://issues.apache.org/jira/browse/KAFKA-2593
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kafka streams
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.9.0.0
>
>
> Currently the {{InMemoryKeyValueStore}} is only able to use the key and value 
> serializers and deserializers (aka, "serdes") from the {{ProcessingContext}}. 
> This means that a {{Processor}} implementation that wants to use the 
> {{InMemoryKeyValueStore}} can only do this if the key and value types match 
> those set up as the default serdes in the topology's configuration.
> Additionally, the {{RocksDBKeyValueStore}} is only capable of {{byte[]}} keys 
> and values.
> Both of these key-value stores should allow the component using them to 
> specify the serdes for both the keys and values. As a convenience, the 
> current behavior should still be supported, as should a way to infer the 
> serdes for the "built-in" serializers and deserializers (e.g., strings, 
> integers, longs, and byte arrays).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2653) Stateful operations in the KStream DSL layer

2015-10-14 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-2653:


 Summary: Stateful operations in the KStream DSL layer
 Key: KAFKA-2653
 URL: https://issues.apache.org/jira/browse/KAFKA-2653
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang


This includes the interface design the implementation for stateful operations 
including:

0. table representation in KStream.
1. stream-stream join.
2. stream-table join.
3. table-table join.
4. stream / table aggregations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2397) leave group request

2015-10-14 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957784#comment-14957784
 ] 

Onur Karaman commented on KAFKA-2397:
-

Cool. It sounds like we all generally agree on the explicit request. Does a 
committer want to review the pull request?

> leave group request
> ---
>
> Key: KAFKA-2397
> URL: https://issues.apache.org/jira/browse/KAFKA-2397
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
> Fix For: 0.9.0.0
>
>
> Let's say every consumer in a group has session timeout s. Currently, if a 
> consumer leaves the group, the worst case time to stabilize the group is 2s 
> (s to detect the consumer failure + s for the rebalance window). If a 
> consumer instead can declare they are leaving the group, the worst case time 
> to stabilize the group would just be the s associated with the rebalance 
> window.
> This is a low priority optimization!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2655) Consumer.poll(0)'s overhead too large

2015-10-14 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-2655:


 Summary: Consumer.poll(0)'s overhead too large
 Key: KAFKA-2655
 URL: https://issues.apache.org/jira/browse/KAFKA-2655
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Jason Gustafson
 Fix For: 0.9.0.1


Currently with a single partition, even if it is paused, calling poll(0) could 
still be costing as much as 1ms since it triggers a few system calls. Some of 
those can possibly be optimized away.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2536: topics tool should allow users to ...

2015-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/305


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request: TRIVIAL: add @throws ConsumerWakeupException i...

2015-10-14 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/311

TRIVIAL: add @throws ConsumerWakeupException in KafkaConsumer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka wakeupComments

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #311






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-2656) Default SSL keystore and truststore config are unusable

2015-10-14 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-2656:
--
Status: Patch Available  (was: Open)

Removed default keystore and truststore for Kafka server and clients. Have 
tested in our environment both with default JVM cacerts truststore and JVM 
options to set defaults. Since the code was already handling null values 
correctly, the only changes required were removal of defaults in Kafka configs.

> Default SSL keystore and truststore config are unusable
> ---
>
> Key: KAFKA-2656
> URL: https://issues.apache.org/jira/browse/KAFKA-2656
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 0.9.0.0
>
>
> Default truststore for clients and default key and truststore for Kafka 
> server are set to files in /tmp along with simplistic passwords. Since no 
> sample stores are packaged with Kafka anyway, there is no value in hardcoded 
> paths and passwords as defaults. 
> Moreover these defaults prevent the use of standard javax.net.ssl properties. 
> And they force truststores to be set in Kafka configuration even when 
> certificates are signed by a trusted authority included in the Java cacerts.
> Default keystores and truststores should be replaced with JVM defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2590) Kafka Streams Checklist

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2590:
-
Issue Type: New Feature  (was: Task)

> Kafka Streams Checklist
> ---
>
> Key: KAFKA-2590
> URL: https://issues.apache.org/jira/browse/KAFKA-2590
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Guozhang Wang
>
> This is an umbrella story for the processor client and Kafka Streams feature 
> implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #686

2015-10-14 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-2603: Add timeout arg to ConsoleConsumer for new consumer

--
[...truncated 1825 lines...]

kafka.coordinator.ConsumerGroupMetadataTest > 
testPreparingRebalanceToPreparingRebalanceIllegalTransition PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.log.LogManagerTest > testCleanupSegmentsToMaintainSize PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.LogManagerTest > testGetNonExistentLog PASSED

kafka.log.LogManagerTest > testTwoLogManagersUsingSameDirFails PASSED

kafka.log.LogManagerTest > testLeastLoadedAssignment PASSED

kafka.log.LogManagerTest > testCleanupExpiredSegments PASSED

kafka.log.LogManagerTest > testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest > testTimeBasedFlush PASSED

kafka.log.LogManagerTest > testCreateLog PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogSegmentTest > testRecoveryWithCorruptMessage PASSED

kafka.log.LogSegmentTest > testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest > testReadFromGap PASSED

kafka.log.LogSegmentTest > testTruncate PASSED

kafka.log.LogSegmentTest > testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeAppendMessage PASSED

kafka.log.LogSegmentTest > testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest > testMaxOffset PASSED

kafka.log.LogSegmentTest > testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest > testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest > testReadAfterLast PASSED

kafka.log.LogSegmentTest > testCreateWithInitFileSizeClearShutdown PASSED

kafka.log.LogSegmentTest > testTruncateFull PASSED

kafka.log.FileMessageSetTest > testTruncate PASSED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest > testRead PASSED

kafka.log.FileMessageSetTest > testFileSize PASSED

kafka.log.FileMessageSetTest > testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest > testPreallocateTrue PASSED

kafka.log.FileMessageSetTest > testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest > testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest > testWriteTo PASSED

kafka.log.FileMessageSetTest > testPreallocateFalse PASSED

kafka.log.FileMessageSetTest > testPreallocateClearShutdown PASSED

kafka.log.FileMessageSetTest > testSearch PASSED

kafka.log.FileMessageSetTest > testSizeInBytes PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest[0] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest[1] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest[2] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest[3] PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingTopic PASSED

kafka.log.LogTest > testIndexRebuild PASSED

kafka.log.LogTest > testLogRolls PASSED

kafka.log.LogTest > testMessageSizeCheck PASSED

kafka.log.LogTest > testAsyncDelete PASSED

kafka.log.LogTest > testReadOutOfRange PASSED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testCompactedTopicConstraints PASSED

kafka.log.LogTest > 

[jira] [Updated] (KAFKA-2650) Change ConfigCommand --deleted-config option to align with TopicCommand

2015-10-14 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2650:

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 308
[https://github.com/apache/kafka/pull/308]

> Change ConfigCommand --deleted-config option to align with TopicCommand
> ---
>
> Key: KAFKA-2650
> URL: https://issues.apache.org/jira/browse/KAFKA-2650
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.9.0.0
>
>
> In order to avoid confusion, change ConfigCommand's --deleted-config 
> parameter to --delete-config. 
> At the same time change --added-config to --add-config, to make all options 
> present/future tense instead of past tense. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2650) Change ConfigCommand --deleted-config option to align with TopicCommand

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957758#comment-14957758
 ] 

ASF GitHub Bot commented on KAFKA-2650:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/308


> Change ConfigCommand --deleted-config option to align with TopicCommand
> ---
>
> Key: KAFKA-2650
> URL: https://issues.apache.org/jira/browse/KAFKA-2650
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Assignee: Grant Henke
> Fix For: 0.9.0.0
>
>
> In order to avoid confusion, change ConfigCommand's --deleted-config 
> parameter to --delete-config. 
> At the same time change --added-config to --add-config, to make all options 
> present/future tense instead of past tense. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2650: Change ConfigCommand --deleted-con...

2015-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/308


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-2654) Avoid calling Consumer.poll(0) in each iteration

2015-10-14 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-2654:


 Summary: Avoid calling Consumer.poll(0) in each iteration
 Key: KAFKA-2654
 URL: https://issues.apache.org/jira/browse/KAFKA-2654
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Yasuhiro Matsuda
 Fix For: 0.9.1


Currently we are calling consumer.poll() in each iteration though most time it 
is poll(0).

poll(0) itself is not completely free since it involves a bunch of system 
calls, hence we'd better avoid calling it in each iteration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2629) Enable getting SSL password from an executable rather than passing plaintext password

2015-10-14 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957801#comment-14957801
 ] 

Sriharsha Chintalapani commented on KAFKA-2629:
---

[~singhashish] 
I still disagree with this
"That being said, running an executable is a little different than ordinary 
file access. However I don't believe that it adds risk to the overall security 
of the application."
It does add risk to overall security of the application.

If others want this as feature I am ok with it as long as this is optional 
config rather than default behavior.

> Enable getting SSL password from an executable rather than passing plaintext 
> password
> -
>
> Key: KAFKA-2629
> URL: https://issues.apache.org/jira/browse/KAFKA-2629
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.9.0.0
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> Currently there are a couple of options to pass SSL passwords to Kafka, i.e., 
> via properties file or via command line argument. Both of these are not 
> recommended security practices.
> * A password on a command line is a no-no: it's trivial to see that password 
> just by using the 'ps' utility.
> * Putting a password into a file, and then passing the location to that file, 
> is the next best option. The access to the file will be governed by unix 
> access permissions which we all know and love. The downside is that the 
> password is still just sitting there in a file, and those who have access can 
> still see it trivially.
> * The most general, secure solution is to provide a layer of abstraction: 
> provide functionality to get the password from "somewhere else".  The most 
> flexible and generic way to do this is to simply call an executable which 
> returns the desired password. 
> ** The executable is again protected with normal file system privileges
> ** The simplest form, a script that looks like "echo 'my-password'", devolves 
> back to putting the password in a file
> ** A more interesting implementation could open up a local encrypted password 
> store and extract the password from it
> ** A maximally secure implementation could contact an external secret manager 
> with centralized control and audit functionality.
> ** In short: getting the password as the output of a script/executable is 
> maximally generic and enables both simple and complex use cases.
> This JIRA intend to add a config param to enable passing an executable to 
> Kafka for SSL passwords.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2656) Default SSL keystore and truststore config are unusable

2015-10-14 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-2656:
-

 Summary: Default SSL keystore and truststore config are unusable
 Key: KAFKA-2656
 URL: https://issues.apache.org/jira/browse/KAFKA-2656
 Project: Kafka
  Issue Type: Bug
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
Priority: Critical
 Fix For: 0.9.0.0


Default truststore for clients and default key and truststore for Kafka server 
are set to files in /tmp along with simplistic passwords. Since no sample 
stores are packaged with Kafka anyway, there is no value in hardcoded paths and 
passwords as defaults. 

Moreover these defaults prevent the use of standard javax.net.ssl properties. 
And they force truststores to be set in Kafka configuration even when 
certificates are signed by a trusted authority included in the Java cacerts.

Default keystores and truststores should be replaced with JVM defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #29

2015-10-14 Thread Apache Jenkins Server
See 

Changes:

[cshapi] KAFKA-2536: topics tool should allow users to alter topic configuration

[cshapi] TRIVIAL: add @throws ConsumerWakeupException in KafkaConsumer

--
[...truncated 362 lines...]
:kafka-trunk-jdk8:core:compileScala UP-TO-DATE
:kafka-trunk-jdk8:core:processResources UP-TO-DATE
:kafka-trunk-jdk8:core:classes UP-TO-DATE
:kafka-trunk-jdk8:log4j-appender:javadoc
:kafka-trunk-jdk8:core:javadoc
cache taskArtifacts.bin 
(
 is corrupt. Discarding.
:kafka-trunk-jdk8:core:javadocJar
:kafka-trunk-jdk8:core:scaladoc
[ant:scaladoc] Element 
' 
does not exist.
[ant:scaladoc] 
:277:
 warning: a pure expression does nothing in statement position; you may be 
omitting necessary parentheses
[ant:scaladoc] ControllerStats.uncleanLeaderElectionRate
[ant:scaladoc] ^
[ant:scaladoc] 
:278:
 warning: a pure expression does nothing in statement position; you may be 
omitting necessary parentheses
[ant:scaladoc] ControllerStats.leaderElectionTimer
[ant:scaladoc] ^
[ant:scaladoc] warning: there were 14 feature warning(s); re-run with -feature 
for details
[ant:scaladoc] 
:72:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#offer".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:32:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#offer".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:137:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#poll".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:120:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#poll".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:97:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#put".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:152:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#take".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 9 warnings found
:kafka-trunk-jdk8:core:scaladocJar
:kafka-trunk-jdk8:core:docsJar
:docsJar_2_11_7
Building project 'core' with Scala version 2.11.7
:kafka-trunk-jdk8:clients:compileJavawarning: [options] bootstrap class path 
not set in conjunction with -source 1.7
Note: 

 uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 warning

:kafka-trunk-jdk8:clients:processResources UP-TO-DATE
:kafka-trunk-jdk8:clients:classes
:kafka-trunk-jdk8:clients:determineCommitId UP-TO-DATE
:kafka-trunk-jdk8:clients:createVersionFile
:kafka-trunk-jdk8:clients:jar
:kafka-trunk-jdk8:clients:javadoc
:kafka-trunk-jdk8:log4j-appender:compileJavawarning: [options] bootstrap class 
path not set in conjunction with -source 1.7
1 warning

:kafka-trunk-jdk8:log4j-appender:processResources UP-TO-DATE
:kafka-trunk-jdk8:log4j-appender:classes
:kafka-trunk-jdk8:log4j-appender:jar
:kafka-trunk-jdk8:core:compileJava UP-TO-DATE
:kafka-trunk-jdk8:core:compileScalaJava HotSpot(TM) 64-Bit Server VM warning: 
ignoring option MaxPermSize=512m; support was removed in 8.0

:78:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^

[jira] [Created] (KAFKA-2652) Incorporate the new consumer protocol with partition-group interface

2015-10-14 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-2652:


 Summary: Incorporate the new consumer protocol with 
partition-group interface
 Key: KAFKA-2652
 URL: https://issues.apache.org/jira/browse/KAFKA-2652
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Assignee: Yasuhiro Matsuda
 Fix For: 0.9.0.1


After KAFKA-2464 is checked in, we need to incorporate the new protocol along 
with a partition-group interface.

The first step maybe a couple of pre-defined partitioning scheme that can be 
chosen by user from some configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2516: Rename o.a.k.client.tools to o.a.k...

2015-10-14 Thread granthenke
Github user granthenke closed the pull request at:

https://github.com/apache/kafka/pull/310


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2516) Rename o.a.k.client.tools to o.a.k.tools

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957774#comment-14957774
 ] 

ASF GitHub Bot commented on KAFKA-2516:
---

GitHub user granthenke reopened a pull request:

https://github.com/apache/kafka/pull/310

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka tools-packaging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #310


commit f1cf0a01fc4ea46a03bc0cbb37cdf763a91825e5
Author: Grant Henke 
Date:   2015-10-14T16:51:08Z

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools




> Rename o.a.k.client.tools to o.a.k.tools
> 
>
> Key: KAFKA-2516
> URL: https://issues.apache.org/jira/browse/KAFKA-2516
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Grant Henke
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Currently our new performance tools are in o.a.k.client.tools but packaged in 
> kafka-tools not kafka-clients. This is a bit confusing.
> Since they deserve their own jar (you don't want our client tools packaged in 
> your app), lets give them a separate package and call it o.a.k.tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2516) Rename o.a.k.client.tools to o.a.k.tools

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957773#comment-14957773
 ] 

ASF GitHub Bot commented on KAFKA-2516:
---

Github user granthenke closed the pull request at:

https://github.com/apache/kafka/pull/310


> Rename o.a.k.client.tools to o.a.k.tools
> 
>
> Key: KAFKA-2516
> URL: https://issues.apache.org/jira/browse/KAFKA-2516
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Grant Henke
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Currently our new performance tools are in o.a.k.client.tools but packaged in 
> kafka-tools not kafka-clients. This is a bit confusing.
> Since they deserve their own jar (you don't want our client tools packaged in 
> your app), lets give them a separate package and call it o.a.k.tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2516: Rename o.a.k.client.tools to o.a.k...

2015-10-14 Thread granthenke
GitHub user granthenke reopened a pull request:

https://github.com/apache/kafka/pull/310

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/granthenke/kafka tools-packaging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #310


commit f1cf0a01fc4ea46a03bc0cbb37cdf763a91825e5
Author: Grant Henke 
Date:   2015-10-14T16:51:08Z

KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2536) topics tool should allow users to alter topic configuration

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957819#comment-14957819
 ] 

ASF GitHub Bot commented on KAFKA-2536:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/305


> topics tool should allow users to alter topic configuration
> ---
>
> Key: KAFKA-2536
> URL: https://issues.apache.org/jira/browse/KAFKA-2536
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Gwen Shapira
>Assignee: Grant Henke
> Fix For: 0.9.0.0
>
>
> When we added dynamic config, we added a kafka-config tool (which can be used 
> to maintain configs for non-topic entities), and remove the capability from 
> kafka-topic tool.
> Removing the capability from kafka-topic is:
> 1. Breaks backward compatibility in our most essential tools. This has 
> significant impact on usability.
> 2. Kinda confusing that --create --config works but --alter --config does 
> not. 
> I suggest fixing this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2536) topics tool should allow users to alter topic configuration

2015-10-14 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2536:

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 305
[https://github.com/apache/kafka/pull/305]

> topics tool should allow users to alter topic configuration
> ---
>
> Key: KAFKA-2536
> URL: https://issues.apache.org/jira/browse/KAFKA-2536
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Gwen Shapira
>Assignee: Grant Henke
> Fix For: 0.9.0.0
>
>
> When we added dynamic config, we added a kafka-config tool (which can be used 
> to maintain configs for non-topic entities), and remove the capability from 
> kafka-topic tool.
> Removing the capability from kafka-topic is:
> 1. Breaks backward compatibility in our most essential tools. This has 
> significant impact on usability.
> 2. Kinda confusing that --create --config works but --alter --config does 
> not. 
> I suggest fixing this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #28

2015-10-14 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-2603: Add timeout arg to ConsoleConsumer for new consumer

[wangguoz] KAFKA-2593: Key value stores can use specified serializers and

[cshapi] KAFKA-2650: Change ConfigCommand --deleted-config option to align wit…

--
[...truncated 6316 lines...]

org.apache.kafka.copycat.file.FileStreamSourceConnectorTest > 
testSourceTasksStdin PASSED

org.apache.kafka.copycat.file.FileStreamSourceConnectorTest > testTaskClass 
PASSED

org.apache.kafka.copycat.file.FileStreamSourceConnectorTest > 
testMultipleSourcesInvalid PASSED

org.apache.kafka.copycat.file.FileStreamSinkConnectorTest > testSinkTasks PASSED

org.apache.kafka.copycat.file.FileStreamSinkConnectorTest > testTaskClass PASSED

org.apache.kafka.copycat.file.FileStreamSinkTaskTest > testPutFlush PASSED
:copycat:json:checkstyleMain
:copycat:json:compileTestJavawarning: [options] bootstrap class path not set in 
conjunction with -source 1.7
Note: 

 uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 warning

:copycat:json:processTestResources UP-TO-DATE
:copycat:json:testClasses
:copycat:json:checkstyleTest
:copycat:json:test

org.apache.kafka.copycat.json.JsonConverterTest > longToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > 
testCacheSchemaToJsonConversion PASSED

org.apache.kafka.copycat.json.JsonConverterTest > 
nullSchemaAndMapNonStringKeysToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > floatToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > booleanToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > nullSchemaAndMapToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > stringToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > timestampToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > 
testCopycatSchemaMetadataTranslation PASSED

org.apache.kafka.copycat.json.JsonConverterTest > timestampToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > decimalToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > mapToCopycatStringKeys PASSED

org.apache.kafka.copycat.json.JsonConverterTest > mapToJsonNonStringKeys PASSED

org.apache.kafka.copycat.json.JsonConverterTest > longToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > mismatchSchemaJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > 
testCacheSchemaToCopycatConversion PASSED

org.apache.kafka.copycat.json.JsonConverterTest > 
testJsonSchemaMetadataTranslation PASSED

org.apache.kafka.copycat.json.JsonConverterTest > bytesToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > shortToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > intToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > structToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > stringToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > nullSchemaAndArrayToJson 
PASSED

org.apache.kafka.copycat.json.JsonConverterTest > byteToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > nullSchemaPrimitiveToCopycat 
PASSED

org.apache.kafka.copycat.json.JsonConverterTest > byteToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > intToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > dateToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > noSchemaToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > noSchemaToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > nullSchemaAndPrimitiveToJson 
PASSED

org.apache.kafka.copycat.json.JsonConverterTest > mapToJsonStringKeys PASSED

org.apache.kafka.copycat.json.JsonConverterTest > arrayToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > nullToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > timeToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > structToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > shortToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > dateToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > doubleToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > timeToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > floatToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > decimalToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > arrayToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > booleanToJson PASSED

org.apache.kafka.copycat.json.JsonConverterTest > mapToCopycatNonStringKeys 
PASSED

org.apache.kafka.copycat.json.JsonConverterTest > bytesToCopycat PASSED

org.apache.kafka.copycat.json.JsonConverterTest > doubleToCopycat PASSED

[GitHub] kafka pull request: KAFKA-2656: Remove hardcoded default key and t...

2015-10-14 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/312

KAFKA-2656: Remove hardcoded default key and truststores

Removed default hardcoded keystore and truststore in /tmp so that default 
JVM keystore/truststore may be used when keystore/truststore is not specified 
in Kafka server or client properties

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-2656

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/312.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #312


commit 0544c4dd5d504ec1e8f6dcb31004493912c2c587
Author: Rajini Sivaram 
Date:   2015-10-14T22:16:57Z

KAFKA-2656: Remove hardcoded default key and truststores to enable JVM 
defaults to be used




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2656) Default SSL keystore and truststore config are unusable

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957933#comment-14957933
 ] 

ASF GitHub Bot commented on KAFKA-2656:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/312

KAFKA-2656: Remove hardcoded default key and truststores

Removed default hardcoded keystore and truststore in /tmp so that default 
JVM keystore/truststore may be used when keystore/truststore is not specified 
in Kafka server or client properties

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-2656

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/312.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #312


commit 0544c4dd5d504ec1e8f6dcb31004493912c2c587
Author: Rajini Sivaram 
Date:   2015-10-14T22:16:57Z

KAFKA-2656: Remove hardcoded default key and truststores to enable JVM 
defaults to be used




> Default SSL keystore and truststore config are unusable
> ---
>
> Key: KAFKA-2656
> URL: https://issues.apache.org/jira/browse/KAFKA-2656
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 0.9.0.0
>
>
> Default truststore for clients and default key and truststore for Kafka 
> server are set to files in /tmp along with simplistic passwords. Since no 
> sample stores are packaged with Kafka anyway, there is no value in hardcoded 
> paths and passwords as defaults. 
> Moreover these defaults prevent the use of standard javax.net.ssl properties. 
> And they force truststores to be set in Kafka configuration even when 
> certificates are signed by a trusted authority included in the Java cacerts.
> Default keystores and truststores should be replaced with JVM defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Slow request log in Kafka

2015-10-14 Thread Aditya Auradkar
Hey everyone,

We were recently discussing a small logging improvement for Kafka.
Basically, add a request log for queries that took longer than a certain
configurable time to execute. This can be quite useful for debugging
purposes, in fact it would have proven handy while investigating a recent
issue during one of our deployments at LinkedIn.

There is also supported in several other projects. For example: MySQL and
Postgres both have slow request logs.
https://dev.mysql.com/doc/refman/5.0/en/slow-query-log.html
https://wiki.postgresql.org/wiki/Logging_Difficult_Queries

Thoughts?

Thanks,
Aditya


[GitHub] kafka pull request: KAFKA-2593 Key value stores can use specified ...

2015-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/255


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2593) KeyValueStores should not require use of the context's default serializers and deserializers

2015-10-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957739#comment-14957739
 ] 

ASF GitHub Bot commented on KAFKA-2593:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/255


> KeyValueStores should not require use of the context's default serializers 
> and deserializers
> 
>
> Key: KAFKA-2593
> URL: https://issues.apache.org/jira/browse/KAFKA-2593
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kafka streams
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.9.0.0
>
>
> Currently the {{InMemoryKeyValueStore}} is only able to use the key and value 
> serializers and deserializers (aka, "serdes") from the {{ProcessingContext}}. 
> This means that a {{Processor}} implementation that wants to use the 
> {{InMemoryKeyValueStore}} can only do this if the key and value types match 
> those set up as the default serdes in the topology's configuration.
> Additionally, the {{RocksDBKeyValueStore}} is only capable of {{byte[]}} keys 
> and values.
> Both of these key-value stores should allow the component using them to 
> specify the serdes for both the keys and values. As a convenience, the 
> current behavior should still be supported, as should a way to infer the 
> serdes for the "built-in" serializers and deserializers (e.g., strings, 
> integers, longs, and byte arrays).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2655) Consumer.poll(0)'s overhead too large

2015-10-14 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957824#comment-14957824
 ] 

Ismael Juma commented on KAFKA-2655:


It would be good to get some actual numbers before we try to optimise. JMH 
recommend.

> Consumer.poll(0)'s overhead too large
> -
>
> Key: KAFKA-2655
> URL: https://issues.apache.org/jira/browse/KAFKA-2655
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Jason Gustafson
> Fix For: 0.9.0.1
>
>
> Currently with a single partition, even if it is paused, calling poll(0) 
> could still be costing as much as 1ms since it triggers a few system calls. 
> Some of those can possibly be optimized away.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #687

2015-10-14 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-2593: Key value stores can use specified serializers and

[cshapi] KAFKA-2650: Change ConfigCommand --deleted-config option to align 
wit…

[cshapi] KAFKA-2536: topics tool should allow users to alter topic configuration

--
[...truncated 322 lines...]
:kafka-trunk-jdk7:clients:jar UP-TO-DATE
:kafka-trunk-jdk7:clients:javadoc UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:compileJava UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:processResources UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:classes UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:jar UP-TO-DATE
:kafka-trunk-jdk7:core:compileJava UP-TO-DATE
:kafka-trunk-jdk7:core:compileScala UP-TO-DATE
:kafka-trunk-jdk7:core:processResources UP-TO-DATE
:kafka-trunk-jdk7:core:classes UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:javadoc
:kafka-trunk-jdk7:core:javadoc
cache taskArtifacts.bin 
(
 is corrupt. Discarding.
:kafka-trunk-jdk7:core:javadocJar
:kafka-trunk-jdk7:core:scaladoc
[ant:scaladoc] Element 
' 
does not exist.
[ant:scaladoc] 
:277:
 warning: a pure expression does nothing in statement position; you may be 
omitting necessary parentheses
[ant:scaladoc] ControllerStats.uncleanLeaderElectionRate
[ant:scaladoc] ^
[ant:scaladoc] 
:278:
 warning: a pure expression does nothing in statement position; you may be 
omitting necessary parentheses
[ant:scaladoc] ControllerStats.leaderElectionTimer
[ant:scaladoc] ^
[ant:scaladoc] warning: there were 14 feature warning(s); re-run with -feature 
for details
[ant:scaladoc] 
:72:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#offer".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:32:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#offer".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:137:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#poll".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:120:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#poll".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:97:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#put".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:152:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#take".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 9 warnings found
:kafka-trunk-jdk7:core:scaladocJar
:kafka-trunk-jdk7:core:docsJar
:docsJar_2_11_7
Building project 'core' with Scala version 2.11.7
:kafka-trunk-jdk7:clients:compileJavaNote: 

 uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:kafka-trunk-jdk7:clients:processResources UP-TO-DATE
:kafka-trunk-jdk7:clients:classes
:kafka-trunk-jdk7:clients:determineCommitId UP-TO-DATE
:kafka-trunk-jdk7:clients:createVersionFile
:kafka-trunk-jdk7:clients:jar
:kafka-trunk-jdk7:clients:javadoc
:kafka-trunk-jdk7:log4j-appender:compileJava
:kafka-trunk-jdk7:log4j-appender:processResources UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:classes
:kafka-trunk-jdk7:log4j-appender:jar
:kafka-trunk-jdk7:core:compileJava UP-TO-DATE
:kafka-trunk-jdk7:core:compileScala
:78:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.


Re: [DISCUSS] KIP-36 - Rack aware replica assignment

2015-10-14 Thread Gwen Shapira
Can you clarify the workflow for the following scenarios:

1. I currently have 6 brokers and want to add rack information for each
2. I'm adding a new broker and I want to specify which rack it belongs on
while adding it.

Thanks!

On Tue, Oct 13, 2015 at 2:21 PM, Allen Wang  wrote:

> We discussed the KIP in the hangout today. The recommendation is to make
> rack as a broker property in ZooKeeper. For users with existing rack
> information stored somewhere, they would need to retrieve the information
> at broker start up and dynamically set the rack property, which can be
> implemented as a wrapper to bootstrap broker. There will be no interface or
> pluggable implementation to retrieve the rack information.
>
> The assumption is that you always need to restart the broker to make a
> change to the rack.
>
> Once the rack becomes a broker property, it will be possible to make rack
> part of the meta data to help the consumer choose which in sync replica to
> consume from as part of the future consumer enhancement.
>
> I will update the KIP.
>
> Thanks,
> Allen
>
>
> On Thu, Oct 8, 2015 at 9:23 AM, Allen Wang  wrote:
>
> > I attended Tuesday's KIP hangout but this KIP was not discussed due to
> > time constraint.
> >
> > However, after hearing discussion of KIP-35, I have the feeling that
> > incompatibility (caused by new broker property) between brokers with
> > different versions  will be solved there. In addition, having stack in
> > broker property as meta data may also help consumers in the future. So I
> am
> > open to adding stack property to broker.
> >
> > Hopefully we can discuss this in the next KIP hangout.
> >
> > On Wed, Sep 30, 2015 at 2:46 PM, Allen Wang 
> wrote:
> >
> >> Can you send me the information on the next KIP hangout?
> >>
> >> Currently the broker-rack mapping is not cached. In KafkaApis,
> >> RackLocator.getRackInfo() is called each time the mapping is needed for
> >> auto topic creation. This will ensure latest mapping is used at any
> time.
> >>
> >> The ability to get the complete mapping makes it simple to reuse the
> same
> >> interface in command line tools.
> >>
> >>
> >> On Wed, Sep 30, 2015 at 11:01 AM, Aditya Auradkar <
> >> aaurad...@linkedin.com.invalid> wrote:
> >>
> >>> Perhaps we discuss this during the next KIP hangout?
> >>>
> >>> I do see that a pluggable rack locator can be useful but I do see a few
> >>> concerns:
> >>>
> >>> - The RackLocator (as described in the document), implies that it can
> >>> discover rack information for any node in the cluster. How does it deal
> >>> with rack location changes? For example, if I moved broker id (1) from
> >>> rack
> >>> X to Y, I only have to start that broker with a newer rack config. If
> >>> RackLocator discovers broker -> rack information at start up time, any
> >>> change to a broker will require bouncing the entire cluster since
> >>> createTopic requests can be sent to any node in the cluster.
> >>> For this reason it may be simpler to have each node be aware of its own
> >>> rack and persist it in ZK during start up time.
> >>>
> >>> - A pluggable RackLocator relies on an external service being available
> >>> to
> >>> serve rack information.
> >>>
> >>> Out of curiosity, I looked up how a couple of other systems deal with
> >>> zone/rack awareness.
> >>> For Cassandra some interesting modes are:
> >>> (Property File configuration)
> >>>
> >>>
> http://docs.datastax.com/en/cassandra/2.0/cassandra/architecture/architectureSnitchPFSnitch_t.html
> >>> (Dynamic inference)
> >>>
> >>>
> http://docs.datastax.com/en/cassandra/2.0/cassandra/architecture/architectureSnitchRackInf_c.html
> >>>
> >>> Voldemort does a static node -> zone assignment based on configuration.
> >>>
> >>> Aditya
> >>>
> >>> On Wed, Sep 30, 2015 at 10:05 AM, Allen Wang 
> >>> wrote:
> >>>
> >>> > I would like to see if we can do both:
> >>> >
> >>> > - Make RackLocator pluggable to facilitate migration with existing
> >>> > broker-rack mapping
> >>> >
> >>> > - Make rack an optional property for broker. If rack is available
> from
> >>> > broker, treat it as source of truth. For users with existing
> >>> broker-rack
> >>> > mapping somewhere else, they can use the pluggable way or they can
> >>> transfer
> >>> > the mapping to the broker rack property.
> >>> >
> >>> > One thing I am not sure is what happens at rolling upgrade when we
> have
> >>> > rack as a broker property. For brokers with older version of Kafka,
> >>> will it
> >>> > cause problem for them? If so, is there any workaround? I also think
> it
> >>> > would be better not to have rack in the controller wire protocol but
> >>> not
> >>> > sure if it is achievable.
> >>> >
> >>> > Thanks,
> >>> > Allen
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Mon, Sep 28, 2015 at 4:55 PM, Todd Palino 
> >>> wrote:
> >>> >
> >>> > > I tend to like the idea of a pluggable locator. 

Build failed in Jenkins: kafka-trunk-jdk7 #688

2015-10-14 Thread Apache Jenkins Server
See 

Changes:

[cshapi] TRIVIAL: add @throws ConsumerWakeupException in KafkaConsumer

--
[...truncated 322 lines...]
:kafka-trunk-jdk7:clients:jar UP-TO-DATE
:kafka-trunk-jdk7:clients:javadoc UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:compileJava UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:processResources UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:classes UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:jar UP-TO-DATE
:kafka-trunk-jdk7:core:compileJava UP-TO-DATE
:kafka-trunk-jdk7:core:compileScala UP-TO-DATE
:kafka-trunk-jdk7:core:processResources UP-TO-DATE
:kafka-trunk-jdk7:core:classes UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:javadoc
:kafka-trunk-jdk7:core:javadoc
cache taskArtifacts.bin 
(
 is corrupt. Discarding.
:kafka-trunk-jdk7:core:javadocJar
:kafka-trunk-jdk7:core:scaladoc
[ant:scaladoc] Element 
' 
does not exist.
[ant:scaladoc] 
:277:
 warning: a pure expression does nothing in statement position; you may be 
omitting necessary parentheses
[ant:scaladoc] ControllerStats.uncleanLeaderElectionRate
[ant:scaladoc] ^
[ant:scaladoc] 
:278:
 warning: a pure expression does nothing in statement position; you may be 
omitting necessary parentheses
[ant:scaladoc] ControllerStats.leaderElectionTimer
[ant:scaladoc] ^
[ant:scaladoc] warning: there were 14 feature warning(s); re-run with -feature 
for details
[ant:scaladoc] 
:72:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#offer".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:32:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#offer".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:137:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#poll".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:120:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#poll".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:97:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#put".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 
:152:
 warning: Could not find any member to link for 
"java.util.concurrent.BlockingQueue#take".
[ant:scaladoc]   /**
[ant:scaladoc]   ^
[ant:scaladoc] 9 warnings found
:kafka-trunk-jdk7:core:scaladocJar
:kafka-trunk-jdk7:core:docsJar
:docsJar_2_11_7
Building project 'core' with Scala version 2.11.7
:kafka-trunk-jdk7:clients:compileJavaNote: 

 uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:kafka-trunk-jdk7:clients:processResources UP-TO-DATE
:kafka-trunk-jdk7:clients:classes
:kafka-trunk-jdk7:clients:determineCommitId UP-TO-DATE
:kafka-trunk-jdk7:clients:createVersionFile
:kafka-trunk-jdk7:clients:jar
:kafka-trunk-jdk7:clients:javadoc
:kafka-trunk-jdk7:log4j-appender:compileJava
:kafka-trunk-jdk7:log4j-appender:processResources UP-TO-DATE
:kafka-trunk-jdk7:log4j-appender:classes
:kafka-trunk-jdk7:log4j-appender:jar
:kafka-trunk-jdk7:core:compileJava UP-TO-DATE
:kafka-trunk-jdk7:core:compileScala
:78:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^

[jira] [Updated] (KAFKA-2490) support new consumer in ConsumerGroupCommand

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2490:
-
Reviewer: Guozhang Wang

> support new consumer in ConsumerGroupCommand
> 
>
> Key: KAFKA-2490
> URL: https://issues.apache.org/jira/browse/KAFKA-2490
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Ashish K Singh
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> We need to add an option to enable the new consumer in ConsumerGroupCommand. 
> Since the new consumer no longer stores the owner of each partition in ZK, we 
> need to patch ConsumerGroupCommand accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2500) Make logEndOffset available in the 0.8.3 Consumer

2015-10-14 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958045#comment-14958045
 ] 

James Cheng commented on KAFKA-2500:


Is there any chance this will make it into 0.9.0?

> Make logEndOffset available in the 0.8.3 Consumer
> -
>
> Key: KAFKA-2500
> URL: https://issues.apache.org/jira/browse/KAFKA-2500
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Will Funnell
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.9.0.0
>
>
> Originally created in the old consumer here: 
> https://issues.apache.org/jira/browse/KAFKA-1977
> The requirement is to create a snapshot from the Kafka topic but NOT do 
> continual reads after that point. For example you might be creating a backup 
> of the data to a file.
> This ticket covers the addition of the functionality to the new consumer.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps 
> was to expose the high watermark, as maxEndOffset, from the FetchResponse 
> object through to each MessageAndMetadata object in order to be aware when 
> the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the 
> PartitionTopicInfo, which is updated when a new message arrives in the 
> ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover

2015-10-14 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958100#comment-14958100
 ] 

Ashish K Singh commented on KAFKA-2017:
---

I agree with [~guozhang] that if we are not expecting group membership changes 
to be write-heavy, ZK is the way to go for persisting coordinator state. One of 
the reasons of moving offsets form ZK to Kafka topic, other than the ones 
already mentioned, was consumers needed to be aware of zookeeper. However, that 
is not a concern if we choose to go with ZK here, correct me if I am wrong here.

> Persist Coordinator State for Coordinator Failover
> --
>
> Key: KAFKA-2017
> URL: https://issues.apache.org/jira/browse/KAFKA-2017
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: Onur Karaman
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch, 
> KAFKA-2017_2015-05-21_19:02:47.patch
>
>
> When a coordinator fails, the group membership protocol tries to failover to 
> a new coordinator without forcing all the consumers rejoin their groups. This 
> is possible if the coordinator persists its state so that the state can be 
> transferred during coordinator failover. This state consists of most of the 
> information in GroupRegistry and ConsumerRegistry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2515) handle oversized messages properly in new consumer

2015-10-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958099#comment-14958099
 ] 

Guozhang Wang commented on KAFKA-2515:
--

[~junrao] just to clarify, for step 3) above you have --new-consumer enabled in 
console consumer right?

> handle oversized messages properly in new consumer
> --
>
> Key: KAFKA-2515
> URL: https://issues.apache.org/jira/browse/KAFKA-2515
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Jun Rao
>Assignee: Onur Karaman
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> When there is an oversized message in the broker, it seems that the new 
> consumer just silently gets stuck. We should at least log an error when this 
> happens.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2295) Dynamically loaded classes (encoders, etc.) may not be found by Kafka Producer

2015-10-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958017#comment-14958017
 ] 

Guozhang Wang commented on KAFKA-2295:
--

[~omkreddy] Could you rebase the patch again?

> Dynamically loaded classes (encoders, etc.) may not be found by Kafka 
> Producer 
> ---
>
> Key: KAFKA-2295
> URL: https://issues.apache.org/jira/browse/KAFKA-2295
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Tathagata Das
>Assignee: Manikumar Reddy
>Priority: Blocker
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2295.patch, KAFKA-2295_2015-07-06_11:32:58.patch, 
> KAFKA-2295_2015-08-20_17:44:56.patch
>
>
> Kafka Producer (via CoreUtils.createObject) effectively uses Class.forName to 
> load encoder classes. Class.forName is by design finds classes only in the 
> defining classloader of the enclosing class (which is often the bootstrap 
> class loader). It does not use the current thread context class loader. This 
> can lead to problems in environments where classes are dynamically loaded and 
> therefore may not be present in the bootstrap classloader.
> This leads to ClassNotFound Exceptions in environments like Spark where 
> classes are loaded dynamically using custom classloaders. Issues like this 
> have reported. E.g. - 
> https://www.mail-archive.com/user@spark.apache.org/msg30951.html
> Other references regarding this issue with Class.forName 
> http://stackoverflow.com/questions/21749741/though-my-class-was-loaded-class-forname-throws-classnotfoundexception
> This is a problem we have faced repeatedly in Apache Spark and we solved it 
> by explicitly specifying the class loader to use. See 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L178



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2295) Dynamically loaded classes (encoders, etc.) may not be found by Kafka Producer

2015-10-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958017#comment-14958017
 ] 

Guozhang Wang edited comment on KAFKA-2295 at 10/14/15 11:40 PM:
-

[~omkreddy] Could you rebase the patch again, and probably using PR for the 
rebased patch?


was (Author: guozhang):
[~omkreddy] Could you rebase the patch again?

> Dynamically loaded classes (encoders, etc.) may not be found by Kafka 
> Producer 
> ---
>
> Key: KAFKA-2295
> URL: https://issues.apache.org/jira/browse/KAFKA-2295
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Tathagata Das
>Assignee: Manikumar Reddy
>Priority: Blocker
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2295.patch, KAFKA-2295_2015-07-06_11:32:58.patch, 
> KAFKA-2295_2015-08-20_17:44:56.patch
>
>
> Kafka Producer (via CoreUtils.createObject) effectively uses Class.forName to 
> load encoder classes. Class.forName is by design finds classes only in the 
> defining classloader of the enclosing class (which is often the bootstrap 
> class loader). It does not use the current thread context class loader. This 
> can lead to problems in environments where classes are dynamically loaded and 
> therefore may not be present in the bootstrap classloader.
> This leads to ClassNotFound Exceptions in environments like Spark where 
> classes are loaded dynamically using custom classloaders. Issues like this 
> have reported. E.g. - 
> https://www.mail-archive.com/user@spark.apache.org/msg30951.html
> Other references regarding this issue with Class.forName 
> http://stackoverflow.com/questions/21749741/though-my-class-was-loaded-class-forname-throws-classnotfoundexception
> This is a problem we have faced repeatedly in Apache Spark and we solved it 
> by explicitly specifying the class loader to use. See 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L178



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2449) Update mirror maker (MirrorMaker) docs

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2449:
-
Reviewer: Ismael Juma

> Update mirror maker (MirrorMaker) docs
> --
>
> Key: KAFKA-2449
> URL: https://issues.apache.org/jira/browse/KAFKA-2449
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Geoff Anderson
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> The Kafka docs on Mirror Maker state that it mirrors from N source clusters 
> to 1 destination, but this is no longer the case. Docs should be updated to 
> reflect that it mirrors from single source cluster to single target cluster.
> Docs I've found where this should be updated:
> http://kafka.apache.org/documentation.html#basic_ops_mirror_maker
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2412) Documentation bug: Add information for key.serializer and value.serializer to New Producer Config sections

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2412:
-
Reviewer: Gwen Shapira

> Documentation bug: Add information for key.serializer and value.serializer to 
> New Producer Config sections
> --
>
> Key: KAFKA-2412
> URL: https://issues.apache.org/jira/browse/KAFKA-2412
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeremy Fields
>Assignee: Grayson Chao
>Priority: Blocker
>  Labels: newbie
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2412-r1.diff, KAFKA-2412.diff
>
>
> As key.serializer and value.serializer are required options when using the 
> new producer, they should be mentioned in the documentation ( here and svn 
> http://kafka.apache.org/documentation.html#newproducerconfigs )
> Appropriate values for these options exist in javadoc and producer.java 
> examples; however, not everyone is reading those, as is the case for anyone 
> setting up a producer.config file for mirrormaker.
> A sensible default should be suggested, such as
> org.apache.kafka.common.serialization.StringSerializer
> Or at least a mention of the key.serializer and value.serializer options 
> along with a link to javadoc
> Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2464) Client-side assignment and group generalization

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2464:
-
Reviewer: Guozhang Wang

> Client-side assignment and group generalization
> ---
>
> Key: KAFKA-2464
> URL: https://issues.apache.org/jira/browse/KAFKA-2464
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Add support for client-side assignment and generalization of join group 
> protocol as documented here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2417) Ducktape tests for SSL/TLS

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2417:
-
Reviewer: Jun Rao

> Ducktape tests for SSL/TLS
> --
>
> Key: KAFKA-2417
> URL: https://issues.apache.org/jira/browse/KAFKA-2417
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ismael Juma
>Assignee: Geoff Anderson
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> The tests should be complementary to the unit/integration tests written as 
> part of KAFKA-1685.
> Things to consider:
> * Upgrade/downgrade to turning on/off SSL
> * Failure testing
> * Expired/revoked certificates
> * Renegotiation
> Some changes to ducktape may be required for upgrade scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2391) Blocking call such as position(), partitionsFor(), committed() and listTopics() should have a timeout

2015-10-14 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-2391:

Priority: Blocker  (was: Major)

> Blocking call such as position(), partitionsFor(), committed() and 
> listTopics() should have a timeout
> -
>
> Key: KAFKA-2391
> URL: https://issues.apache.org/jira/browse/KAFKA-2391
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Onur Karaman
>Priority: Blocker
>
> The blocking calls should have a timeout from either configuration or 
> parameter. So far we have position(), partitionsFor(), committed() and 
> listTopics().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2629) Enable getting SSL password from an executable rather than passing plaintext password

2015-10-14 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958020#comment-14958020
 ] 

Ashish K Singh commented on KAFKA-2629:
---

[~gwenshap] sure. We can pick this up once 0.9.0.0 has been branched out.

> Enable getting SSL password from an executable rather than passing plaintext 
> password
> -
>
> Key: KAFKA-2629
> URL: https://issues.apache.org/jira/browse/KAFKA-2629
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.9.0.0
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> Currently there are a couple of options to pass SSL passwords to Kafka, i.e., 
> via properties file or via command line argument. Both of these are not 
> recommended security practices.
> * A password on a command line is a no-no: it's trivial to see that password 
> just by using the 'ps' utility.
> * Putting a password into a file, and then passing the location to that file, 
> is the next best option. The access to the file will be governed by unix 
> access permissions which we all know and love. The downside is that the 
> password is still just sitting there in a file, and those who have access can 
> still see it trivially.
> * The most general, secure solution is to provide a layer of abstraction: 
> provide functionality to get the password from "somewhere else".  The most 
> flexible and generic way to do this is to simply call an executable which 
> returns the desired password. 
> ** The executable is again protected with normal file system privileges
> ** The simplest form, a script that looks like "echo 'my-password'", devolves 
> back to putting the password in a file
> ** A more interesting implementation could open up a local encrypted password 
> store and extract the password from it
> ** A maximally secure implementation could contact an external secret manager 
> with centralized control and audit functionality.
> ** In short: getting the password as the output of a script/executable is 
> maximally generic and enables both simple and complex use cases.
> This JIRA intend to add a config param to enable passing an executable to 
> Kafka for SSL passwords.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2629) Enable getting SSL password from an executable rather than passing plaintext password

2015-10-14 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958025#comment-14958025
 ] 

Ashish K Singh commented on KAFKA-2629:
---

[~sriharsha] I am proposing this as an optional config only. The default 
behavior can remain unchanged. As far as relevance of the proposed way to make 
application more secure is concerned, I think there has been quite a bit of 
discussion above and one can argue in favor or against it. The idea is just to 
enable users, and there are quite a bit of such users, to be able to plugin 
there preferred way of passing SSL passwords. I hope that is a reasonable ask. 

> Enable getting SSL password from an executable rather than passing plaintext 
> password
> -
>
> Key: KAFKA-2629
> URL: https://issues.apache.org/jira/browse/KAFKA-2629
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.9.0.0
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> Currently there are a couple of options to pass SSL passwords to Kafka, i.e., 
> via properties file or via command line argument. Both of these are not 
> recommended security practices.
> * A password on a command line is a no-no: it's trivial to see that password 
> just by using the 'ps' utility.
> * Putting a password into a file, and then passing the location to that file, 
> is the next best option. The access to the file will be governed by unix 
> access permissions which we all know and love. The downside is that the 
> password is still just sitting there in a file, and those who have access can 
> still see it trivially.
> * The most general, secure solution is to provide a layer of abstraction: 
> provide functionality to get the password from "somewhere else".  The most 
> flexible and generic way to do this is to simply call an executable which 
> returns the desired password. 
> ** The executable is again protected with normal file system privileges
> ** The simplest form, a script that looks like "echo 'my-password'", devolves 
> back to putting the password in a file
> ** A more interesting implementation could open up a local encrypted password 
> store and extract the password from it
> ** A maximally secure implementation could contact an external secret manager 
> with centralized control and audit functionality.
> ** In short: getting the password as the output of a script/executable is 
> maximally generic and enables both simple and complex use cases.
> This JIRA intend to add a config param to enable passing an executable to 
> Kafka for SSL passwords.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2516) Rename o.a.k.client.tools to o.a.k.tools

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2516:
-
Reviewer: Gwen Shapira

> Rename o.a.k.client.tools to o.a.k.tools
> 
>
> Key: KAFKA-2516
> URL: https://issues.apache.org/jira/browse/KAFKA-2516
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Grant Henke
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> Currently our new performance tools are in o.a.k.client.tools but packaged in 
> kafka-tools not kafka-clients. This is a bit confusing.
> Since they deserve their own jar (you don't want our client tools packaged in 
> your app), lets give them a separate package and call it o.a.k.tools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2441) SSL/TLS in official docs

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2441:
-
Reviewer: Jun Rao

> SSL/TLS in official docs
> 
>
> Key: KAFKA-2441
> URL: https://issues.apache.org/jira/browse/KAFKA-2441
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ismael Juma
>Assignee: Sriharsha Chintalapani
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> We need to add a section in the official documentation regarding SSL/TLS:
> http://kafka.apache.org/documentation.html
> There is already a wiki page where some of the information is already present:
> https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2562) check Kafka scripts for 0.9.0.0

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2562:
-
Reviewer: Ismael Juma

> check Kafka scripts for 0.9.0.0
> ---
>
> Key: KAFKA-2562
> URL: https://issues.apache.org/jira/browse/KAFKA-2562
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Manikumar Reddy
>Priority: Blocker
> Fix For: 0.9.0.0
>
>
> We need to make a pass to make sure all scripts in bin/ are up to date for 
> 0.9.0.0. For example, bin/kafka-producer-perf-test.sh currently still points 
> to kafka.tools.ProducerPerformance and it should be changed to 
> org.apache.kafka.clients.tools.ProducerPerformance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2106) Partition balance tool between borkers

2015-10-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2106:
-
Assignee: chenshangan

> Partition balance tool between borkers
> --
>
> Key: KAFKA-2106
> URL: https://issues.apache.org/jira/browse/KAFKA-2106
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin
>Reporter: chenshangan
>Assignee: chenshangan
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2106.3, KAFKA-2106.patch, KAFKA-2106.patch.2
>
>
> The default partition assignment algorithm can work well in a static kafka 
> cluster(number of brokers seldom change). Actually, in production env, number 
> of brokers is always increasing according to the business data. When new 
> brokers added to the cluster, it's better to provide a tool that can help to 
> move existing data to new brokers. Currently, users need to choose topic or 
> partitions manually and use the Reassign Partitions Tool 
> (kafka-reassign-partitions.sh) to achieve the goal. It's a time-consuming 
> task when there's a lot of topics in the cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2146) adding partition did not find the correct startIndex

2015-10-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958050#comment-14958050
 ] 

Guozhang Wang commented on KAFKA-2146:
--

[~chenshangan...@163.com] your updated patch seems not apply cleanly on trunk, 
could you rebase?

> adding partition did not find the correct startIndex 
> -
>
> Key: KAFKA-2146
> URL: https://issues.apache.org/jira/browse/KAFKA-2146
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.8.2.0
>Reporter: chenshangan
>Priority: Minor
> Fix For: 0.9.0.0
>
> Attachments: KAFKA-2146.2.patch, KAFKA-2146.patch
>
>
> TopicCommand provide a tool to add partitions for existing topics. It try to 
> find the startIndex from existing partitions. There's a minor flaw in this 
> process, it try to use the first partition fetched from zookeeper as the 
> start partition, and use the first replica id in this partition as the 
> startIndex.
> One thing, the first partition fetched from zookeeper is not necessary to be 
> the start partition. As partition id begin from zero, we should use partition 
> with id zero as the start partition.
> The other, broker id does not necessary begin from 0, so the startIndex is 
> not necessary to be the first replica id in the start partition. 
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2649) Add support for custom partitioner in sink nodes

2015-10-14 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-2649:


 Summary: Add support for custom partitioner in sink nodes
 Key: KAFKA-2649
 URL: https://issues.apache.org/jira/browse/KAFKA-2649
 Project: Kafka
  Issue Type: Sub-task
  Components: kafka streams
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 0.9.0.0


The only way for Processor implementations to control partitioning of forwarded 
messages is to set the partitioner class as property "{{
ProducerConfig.PARTITIONER_CLASS_CONFIG}}" in the StreamsConfig, which should 
be set to the name of a {{org.apache.kafka.clients.producer.Partitioner}} 
implementation. However, doing this requires the partitioner knowing how to 
properly partition *all* topics, not just the one or few topics used by the 
Processor.

Instead, Kafka Streams should make it easy to optionally add a partitioning 
function for each sink used in a topology. Each sink represents a single output 
topic, and thus is far simpler to implement. Additionally, the sink is already 
typed with the key and value types (via serdes for the keys and values), so the 
partitioner can be also be typed with the key and value types. Finally, this 
also keeps the logic of choosing partitioning strategies where it belongs, as 
part of building the topology.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2649) Add support for custom partitioner in sink nodes

2015-10-14 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-2649:
-
Description: 
The only way for Processor implementations to control partitioning of forwarded 
messages is to set the partitioner class as property 
{{ProducerConfig.PARTITIONER_CLASS_CONFIG}} in the StreamsConfig, which should 
be set to the name of a {{org.apache.kafka.clients.producer.Partitioner}} 
implementation. However, doing this requires the partitioner knowing how to 
properly partition *all* topics, not just the one or few topics used by the 
Processor.

Instead, Kafka Streams should make it easy to optionally add a partitioning 
function for each sink used in a topology. Each sink represents a single output 
topic, and thus is far simpler to implement. Additionally, the sink is already 
typed with the key and value types (via serdes for the keys and values), so the 
partitioner can be also be typed with the key and value types. Finally, this 
also keeps the logic of choosing partitioning strategies where it belongs, as 
part of building the topology.


  was:
The only way for Processor implementations to control partitioning of forwarded 
messages is to set the partitioner class as property "{{
ProducerConfig.PARTITIONER_CLASS_CONFIG}}" in the StreamsConfig, which should 
be set to the name of a {{org.apache.kafka.clients.producer.Partitioner}} 
implementation. However, doing this requires the partitioner knowing how to 
properly partition *all* topics, not just the one or few topics used by the 
Processor.

Instead, Kafka Streams should make it easy to optionally add a partitioning 
function for each sink used in a topology. Each sink represents a single output 
topic, and thus is far simpler to implement. Additionally, the sink is already 
typed with the key and value types (via serdes for the keys and values), so the 
partitioner can be also be typed with the key and value types. Finally, this 
also keeps the logic of choosing partitioning strategies where it belongs, as 
part of building the topology.



> Add support for custom partitioner in sink nodes
> 
>
> Key: KAFKA-2649
> URL: https://issues.apache.org/jira/browse/KAFKA-2649
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kafka streams
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.9.0.0
>
>
> The only way for Processor implementations to control partitioning of 
> forwarded messages is to set the partitioner class as property 
> {{ProducerConfig.PARTITIONER_CLASS_CONFIG}} in the StreamsConfig, which 
> should be set to the name of a 
> {{org.apache.kafka.clients.producer.Partitioner}} implementation. However, 
> doing this requires the partitioner knowing how to properly partition *all* 
> topics, not just the one or few topics used by the Processor.
> Instead, Kafka Streams should make it easy to optionally add a partitioning 
> function for each sink used in a topology. Each sink represents a single 
> output topic, and thus is far simpler to implement. Additionally, the sink is 
> already typed with the key and value types (via serdes for the keys and 
> values), so the partitioner can be also be typed with the key and value 
> types. Finally, this also keeps the logic of choosing partitioning strategies 
> where it belongs, as part of building the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1554) Corrupt index found on clean startup

2015-10-14 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958225#comment-14958225
 ] 

Mayuresh Gharat commented on KAFKA-1554:


[~wangbo23] thanks for the review.
We did not iterate on this patch. This was resolved in 
https://issues.apache.org/jira/browse/KAFKA-2012. 


> Corrupt index found on clean startup
> 
>
> Key: KAFKA-1554
> URL: https://issues.apache.org/jira/browse/KAFKA-1554
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
> Environment: ubuntu 12.04, oracle jdk 1.7
>Reporter: Alexis Midon
>Assignee: Mayuresh Gharat
>Priority: Critical
> Fix For: 0.10.0.0
>
> Attachments: KAFKA-1554.patch
>
>
> On a clean start up, corrupted index files are found.
> After investigations, it appears that some pre-allocated index files are not 
> "compacted" correctly and the end of the file is full of zeroes.
> As a result, on start up, the last relative offset is zero which yields an 
> offset equal to the base offset.
> The workaround is to delete all index files of size 10MB (the size of the 
> pre-allocated files), and restart. Index files will be re-created.
> {code}
> find $your_data_directory -size 10485760c -name *.index #-delete
> {code}
> This is issue might be related/similar to 
> https://issues.apache.org/jira/browse/KAFKA-1112
> {code}
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,696 
> INFO main kafka.server.KafkaServer.info - [Kafka Server 847605514], starting
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,698 
> INFO main kafka.server.KafkaServer.info - [Kafka Server 847605514], 
> Connecting to zookeeper on 
> zk-main0.XXX:2181,zk-main1.XXX:2181,zk-main2.:2181/production/kafka/main
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,708 
> INFO 
> ZkClient-EventThread-14-zk-main0.XXX.com:2181,zk-main1.XXX.com:2181,zk-main2.XXX.com:2181,zk-main3.XXX.com:2181,zk-main4.XXX.com:2181/production/kafka/main
>  org.I0Itec.zkclient.ZkEventThread.run - Starting ZkClient event thread.
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,714 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,714 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:host.name=i-6b948138.inst.aws.airbnb.com
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,714 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.version=1.7.0_55
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.vendor=Oracle Corporation
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.home=/usr/lib/jvm/jre-7-oracle-x64/jre
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.class.path=libs/snappy-java-1.0.5.jar:libs/scala-library-2.10.1.jar:libs/slf4j-api-1.7.2.jar:libs/jopt-simple-3.2.jar:libs/metrics-annotation-2.2.0.jar:libs/log4j-1.2.15.jar:libs/kafka_2.10-0.8.1.jar:libs/zkclient-0.3.jar:libs/zookeeper-3.3.4.jar:libs/metrics-core-2.2.0.jar
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,715 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.io.tmpdir=/tmp
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:java.compiler=
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:os.name=Linux
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,716 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:os.arch=amd64
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,717 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:os.version=3.2.0-61-virtual
> 2014-07-11T00:53:17+00:00 i-6b948138 local3.info 2014-07-11 - 00:53:17,717 
> INFO main org.apache.zookeeper.ZooKeeper.logEnv - Client 
> environment:user.name=kafka
> 2014-07-11T00:53:17+00:00 i-6b948138 

  1   2   >