[jira] [Created] (KAFKA-4865) 2X8BF

2017-03-07 Thread Vamsi Jakkula (JIRA)
Vamsi Jakkula created KAFKA-4865:


 Summary: 2X8BF
 Key: KAFKA-4865
 URL: https://issues.apache.org/jira/browse/KAFKA-4865
 Project: Kafka
  Issue Type: Bug
Reporter: Vamsi Jakkula


Creating of an issue using project keys and issue type names using the REST API



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-03-07 Thread Sriram Subramanian
+1. Looks good

On Tue, Mar 7, 2017 at 10:33 PM, Ewen Cheslack-Postava 
wrote:

> Hah, I forgot to do it in the original email, but I suppose I should make
> it explicit: +1 (binding)
>
> -Ewen
>
> On Mon, Mar 6, 2017 at 9:26 PM, Gwen Shapira  wrote:
>
> > +1 (binding)
> >
> > On Mon, Mar 6, 2017 at 7:53 PM, Ewen Cheslack-Postava  >
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to kick off voting for KIP-128: Add ByteArrayConverter for
> Kafka
> > > Connect:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 128%3A+Add+ByteArrayConverter+for+Kafka+Connect
> > >
> > > There was a small amount of discussion, see the original thread here:
> > > https://lists.apache.org/thread.html/62fc2245285ac5d15ebb9b2ebed672
> > > b51e391c8dfe9a51be85f685f3@%3Cdev.kafka.apache.org%3E
> > >
> > > The vote will stay open for at least 72 hours.
> > >
> > > -Ewen
> > >
> >
> >
> >
> > --
> > *Gwen Shapira*
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter  | blog
> > 
> >
>


Re: [VOTE] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-03-07 Thread Ewen Cheslack-Postava
Hah, I forgot to do it in the original email, but I suppose I should make
it explicit: +1 (binding)

-Ewen

On Mon, Mar 6, 2017 at 9:26 PM, Gwen Shapira  wrote:

> +1 (binding)
>
> On Mon, Mar 6, 2017 at 7:53 PM, Ewen Cheslack-Postava 
> wrote:
>
> > Hi all,
> >
> > I'd like to kick off voting for KIP-128: Add ByteArrayConverter for Kafka
> > Connect:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 128%3A+Add+ByteArrayConverter+for+Kafka+Connect
> >
> > There was a small amount of discussion, see the original thread here:
> > https://lists.apache.org/thread.html/62fc2245285ac5d15ebb9b2ebed672
> > b51e391c8dfe9a51be85f685f3@%3Cdev.kafka.apache.org%3E
> >
> > The vote will stay open for at least 72 hours.
> >
> > -Ewen
> >
>
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>


[GitHub] kafka pull request #2655: KAFKA-4864 added correct zookeeper nodes for secur...

2017-03-07 Thread simplesteph
GitHub user simplesteph opened a pull request:

https://github.com/apache/kafka/pull/2655

KAFKA-4864 added correct zookeeper nodes for security migrator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/simplesteph/kafka fix-security-migrator-tool

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2655


commit d8c48067e4a0b93a48c8c83f830268199af0c8c9
Author: simplesteph 
Date:   2017-03-08T06:19:21Z

[KAFKA-4864] added correct zookeeper nodes for security migrator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4864) Kafka Secure Migrator tool doesn't secure all the nodes

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900780#comment-15900780
 ] 

ASF GitHub Bot commented on KAFKA-4864:
---

GitHub user simplesteph opened a pull request:

https://github.com/apache/kafka/pull/2655

KAFKA-4864 added correct zookeeper nodes for security migrator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/simplesteph/kafka fix-security-migrator-tool

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2655


commit d8c48067e4a0b93a48c8c83f830268199af0c8c9
Author: simplesteph 
Date:   2017-03-08T06:19:21Z

[KAFKA-4864] added correct zookeeper nodes for security migrator




> Kafka Secure Migrator tool doesn't secure all the nodes
> ---
>
> Key: KAFKA-4864
> URL: https://issues.apache.org/jira/browse/KAFKA-4864
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Stephane Maarek
>Priority: Critical
>
> It seems that the secure nodes as referred by ZkUtils.scala are the following:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L201
> A couple things:
> - the list is highly outdated, and for example the most important nodes such 
> as kafka-acls don't get secured. That's a huge security risk. Would it be 
> better to just secure all the nodes recursively from the given root?
> - the root of some nodes aren't secured. Ex: /brokers (but many others).
> The result is the following after running the tool:
> zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect 
> zoo1:2181/kafka-test
> [zk: localhost:2181(CONNECTED) 9] getAcl /kafka-test/brokers
> 'world,'anyone
> : cdrwa
> [zk: localhost:2181(CONNECTED) 11] getAcl /kafka-test/brokers/ids
> 'world,'anyone
> : r
> 'sasl,'myzkcli...@example.com
> : cdrwa
> [zk: localhost:2181(CONNECTED) 16] getAcl /kafka-test/kafka-acl
> 'world,'anyone
> : cdrwa
> That seems pretty bad to be honest... A fast enough ZkClient could delete 
> some root nodes, and create the nodes they like before the Acls get set. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4864) Kafka Secure Migrator tool doesn't secure all the nodes

2017-03-07 Thread Stephane Maarek (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek updated KAFKA-4864:
---
Description: 
It seems that the secure nodes as referred by ZkUtils.scala are the following:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L201

A couple things:
- the list is highly outdated, and for example the most important nodes such as 
kafka-acls don't get secured. That's a huge security risk. Would it be better 
to just secure all the nodes recursively from the given root?
- the root of some nodes aren't secured. Ex: /brokers (but many others).

The result is the following after running the tool:
zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect 
zoo1:2181/kafka-test

[zk: localhost:2181(CONNECTED) 9] getAcl /kafka-test/brokers
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 11] getAcl /kafka-test/brokers/ids
'world,'anyone
: r
'sasl,'myzkcli...@example.com
: cdrwa
[zk: localhost:2181(CONNECTED) 16] getAcl /kafka-test/kafka-acl
'world,'anyone
: cdrwa

That seems pretty bad to be honest... A fast enough ZkClient could delete some 
root nodes, and create the nodes they like before the Acls get set. 

  was:
It seems that the secure nodes as referred by ZkUtils.scala are the following:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L201

A couple things:
- the list is highly outdated, and for example the most important nodes such as 
kafka-acls don't get secured. That's a huge security risk. Would it be better 
to just secure all the nodes from the given root?
- the root of some nodes aren't secured. Ex: /brokers (but many others).

The result is the following after running the tool:
zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect 
zoo1:2181/kafka-test

[zk: localhost:2181(CONNECTED) 9] getAcl /kafka-test/brokers
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 11] getAcl /kafka-test/brokers/ids
'world,'anyone
: r
'sasl,'myzkcli...@example.com
: cdrwa
[zk: localhost:2181(CONNECTED) 16] getAcl /kafka-test/kafka-acl
'world,'anyone
: cdrwa

That seems pretty bad to be honest... A fast enough ZkClient could delete some 
root nodes, and create the nodes they like before the Acls get set. 


> Kafka Secure Migrator tool doesn't secure all the nodes
> ---
>
> Key: KAFKA-4864
> URL: https://issues.apache.org/jira/browse/KAFKA-4864
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Stephane Maarek
>Priority: Critical
>
> It seems that the secure nodes as referred by ZkUtils.scala are the following:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L201
> A couple things:
> - the list is highly outdated, and for example the most important nodes such 
> as kafka-acls don't get secured. That's a huge security risk. Would it be 
> better to just secure all the nodes recursively from the given root?
> - the root of some nodes aren't secured. Ex: /brokers (but many others).
> The result is the following after running the tool:
> zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect 
> zoo1:2181/kafka-test
> [zk: localhost:2181(CONNECTED) 9] getAcl /kafka-test/brokers
> 'world,'anyone
> : cdrwa
> [zk: localhost:2181(CONNECTED) 11] getAcl /kafka-test/brokers/ids
> 'world,'anyone
> : r
> 'sasl,'myzkcli...@example.com
> : cdrwa
> [zk: localhost:2181(CONNECTED) 16] getAcl /kafka-test/kafka-acl
> 'world,'anyone
> : cdrwa
> That seems pretty bad to be honest... A fast enough ZkClient could delete 
> some root nodes, and create the nodes they like before the Acls get set. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4864) Kafka Secure Migrator tool doesn't secure all the nodes

2017-03-07 Thread Stephane Maarek (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephane Maarek updated KAFKA-4864:
---
Priority: Critical  (was: Major)

> Kafka Secure Migrator tool doesn't secure all the nodes
> ---
>
> Key: KAFKA-4864
> URL: https://issues.apache.org/jira/browse/KAFKA-4864
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Stephane Maarek
>Priority: Critical
>
> It seems that the secure nodes as referred by ZkUtils.scala are the following:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L201
> A couple things:
> - the list is highly outdated, and for example the most important nodes such 
> as kafka-acls don't get secured. That's a huge security risk. Would it be 
> better to just secure all the nodes from the given root?
> - the root of some nodes aren't secured. Ex: /brokers (but many others).
> The result is the following after running the tool:
> zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect 
> zoo1:2181/kafka-test
> [zk: localhost:2181(CONNECTED) 9] getAcl /kafka-test/brokers
> 'world,'anyone
> : cdrwa
> [zk: localhost:2181(CONNECTED) 11] getAcl /kafka-test/brokers/ids
> 'world,'anyone
> : r
> 'sasl,'myzkcli...@example.com
> : cdrwa
> [zk: localhost:2181(CONNECTED) 16] getAcl /kafka-test/kafka-acl
> 'world,'anyone
> : cdrwa
> That seems pretty bad to be honest... A fast enough ZkClient could delete 
> some root nodes, and create the nodes they like before the Acls get set. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4864) Kafka Secure Migrator tool doesn't secure all the nodes

2017-03-07 Thread Stephane Maarek (JIRA)
Stephane Maarek created KAFKA-4864:
--

 Summary: Kafka Secure Migrator tool doesn't secure all the nodes
 Key: KAFKA-4864
 URL: https://issues.apache.org/jira/browse/KAFKA-4864
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0
Reporter: Stephane Maarek


It seems that the secure nodes as referred by ZkUtils.scala are the following:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L201

A couple things:
- the list is highly outdated, and for example the most important nodes such as 
kafka-acls don't get secured. That's a huge security risk. Would it be better 
to just secure all the nodes from the given root?
- the root of some nodes aren't secured. Ex: /brokers (but many others).

The result is the following after running the tool:
zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect 
zoo1:2181/kafka-test

[zk: localhost:2181(CONNECTED) 9] getAcl /kafka-test/brokers
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 11] getAcl /kafka-test/brokers/ids
'world,'anyone
: r
'sasl,'myzkcli...@example.com
: cdrwa
[zk: localhost:2181(CONNECTED) 16] getAcl /kafka-test/kafka-acl
'world,'anyone
: cdrwa

That seems pretty bad to be honest... A fast enough ZkClient could delete some 
root nodes, and create the nodes they like before the Acls get set. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4863) Querying window store may return unwanted keys

2017-03-07 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-4863:


Assignee: Eno Thereska

> Querying window store may return unwanted keys
> --
>
> Key: KAFKA-4863
> URL: https://issues.apache.org/jira/browse/KAFKA-4863
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Xavier Léauté
>Assignee: Eno Thereska
>Priority: Critical
>
> Using variable length keys in a window store may cause unwanted results to be 
> returned when querying certain ranges.
> Below is a test case for {{RocksDBWindowStoreTest}} that shows the problem. 
> It fails, returning {{\[0001, 0003, 0002, 0004, 0005\]}} instead of {{\[0001, 
> 0003, 0005\]}}.
> {code:java}
> @Test
> public void testPutAndFetchSanity() throws IOException {
> final RocksDBWindowStoreSupplier supplier =
> new RocksDBWindowStoreSupplier<>(
> "window", 60 * 1000L * 2, 3,
> true, Serdes.String(), Serdes.String(),
> windowSize, true, Collections.emptyMap(), 
> false
> );
> final WindowStore store = supplier.get();
> store.init(context, store);
> try {
> store.put("a", "0001", 0);
> store.put("aa", "0002", 0);
> store.put("a", "0003", 1);
> store.put("aa", "0004", 1);
> store.put("a", "0005", 6);
> assertEquals(Utils.mkList("0001", "0003", "0005"), 
> toList(store.fetch("a", 0, Long.MAX_VALUE)));
> } finally {
> store.close();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka-site pull request #50: Typo in kafka 0.10.2 topics operation doc

2017-03-07 Thread jlisam13
GitHub user jlisam13 opened a pull request:

https://github.com/apache/kafka-site/pull/50

Typo in kafka 0.10.2 topics operation doc

I believe it should be "not" and not "noi"

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jlisam13/kafka-site asf-site

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/50.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #50


commit 0d1303ffaa9f425635fa9dff0f22d591004aab92
Author: Javier 
Date:   2017-03-08T04:53:52Z

Typo in kafka 0.10.2 topics operation doc




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4863) Querying window store may return unwanted keys

2017-03-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté updated KAFKA-4863:
-
Affects Version/s: 0.10.2.0

> Querying window store may return unwanted keys
> --
>
> Key: KAFKA-4863
> URL: https://issues.apache.org/jira/browse/KAFKA-4863
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Xavier Léauté
>Priority: Critical
>
> Using variable length keys in a window store may cause unwanted results to be 
> returned when querying certain ranges.
> Below is a test case for {{RocksDBWindowStoreTest}} that shows the problem. 
> It fails, returning {{\[0001, 0003, 0002, 0004, 0005\]}} instead of {{\[0001, 
> 0003, 0005\]}}.
> {code:java}
> @Test
> public void testPutAndFetchSanity() throws IOException {
> final RocksDBWindowStoreSupplier supplier =
> new RocksDBWindowStoreSupplier<>(
> "window", 60 * 1000L * 2, 3,
> true, Serdes.String(), Serdes.String(),
> windowSize, true, Collections.emptyMap(), 
> false
> );
> final WindowStore store = supplier.get();
> store.init(context, store);
> try {
> store.put("a", "0001", 0);
> store.put("aa", "0002", 0);
> store.put("a", "0003", 1);
> store.put("aa", "0004", 1);
> store.put("a", "0005", 6);
> assertEquals(Utils.mkList("0001", "0003", "0005"), 
> toList(store.fetch("a", 0, Long.MAX_VALUE)));
> } finally {
> store.close();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4863) Querying window store may return unwanted keys

2017-03-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté updated KAFKA-4863:
-
Description: 
Using variable length keys in a window store may cause unwanted results to be 
returned when querying certain ranges.

Below is a test case for {{RocksDBWindowStoreTest}} that shows the problem. It 
fails, returning {{\[0001, 0003, 0002, 0004, 0005\]}} instead of {{\[0001, 
0003, 0005\]}}.

{code:java}
@Test
public void testPutAndFetchSanity() throws IOException {
final RocksDBWindowStoreSupplier supplier =
new RocksDBWindowStoreSupplier<>(
"window", 60 * 1000L * 2, 3,
true, Serdes.String(), Serdes.String(),
windowSize, true, Collections.emptyMap(), 
false
);
final WindowStore store = supplier.get();
store.init(context, store);

try {
store.put("a", "0001", 0);
store.put("aa", "0002", 0);
store.put("a", "0003", 1);
store.put("aa", "0004", 1);
store.put("a", "0005", 6);

assertEquals(Utils.mkList("0001", "0003", "0005"), 
toList(store.fetch("a", 0, Long.MAX_VALUE)));
} finally {
store.close();
}
}
{code}

  was:
Using variable length keys in a window store may cause unwanted results to be 
returned when querying certain ranges.

Below is a test case for {{RocksDBWindowStoreTest}} that shows the problem. It 
fails, returning {{\["0001", "0003", "0002"\]}} instead of {{\["0001", 
"0003"\]}}.

{code:java}
@Test
public void testPutAndFetchSanity() throws IOException {
final RocksDBWindowStoreSupplier supplier =
new RocksDBWindowStoreSupplier<>(
"window", 60 * 1000L * 2, 3,
true, Serdes.String(), Serdes.String(),
windowSize, true, Collections.emptyMap(), false
);
final WindowStore store = supplier.get();
store.init(context, store);

try {
store.put("a", "0001", 0);
store.put("aa", "0002", 0);
store.put("a", "0003", 1);

assertEquals(Utils.mkList("0001", "0003"), toList(store.fetch("a", 
0, Long.MAX_VALUE)));
} finally {
store.close();
}
}
{code}


> Querying window store may return unwanted keys
> --
>
> Key: KAFKA-4863
> URL: https://issues.apache.org/jira/browse/KAFKA-4863
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Xavier Léauté
>Priority: Critical
>
> Using variable length keys in a window store may cause unwanted results to be 
> returned when querying certain ranges.
> Below is a test case for {{RocksDBWindowStoreTest}} that shows the problem. 
> It fails, returning {{\[0001, 0003, 0002, 0004, 0005\]}} instead of {{\[0001, 
> 0003, 0005\]}}.
> {code:java}
> @Test
> public void testPutAndFetchSanity() throws IOException {
> final RocksDBWindowStoreSupplier supplier =
> new RocksDBWindowStoreSupplier<>(
> "window", 60 * 1000L * 2, 3,
> true, Serdes.String(), Serdes.String(),
> windowSize, true, Collections.emptyMap(), 
> false
> );
> final WindowStore store = supplier.get();
> store.init(context, store);
> try {
> store.put("a", "0001", 0);
> store.put("aa", "0002", 0);
> store.put("a", "0003", 1);
> store.put("aa", "0004", 1);
> store.put("a", "0005", 6);
> assertEquals(Utils.mkList("0001", "0003", "0005"), 
> toList(store.fetch("a", 0, Long.MAX_VALUE)));
> } finally {
> store.close();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4863) Querying window store may return unwanted keys

2017-03-07 Thread JIRA
Xavier Léauté created KAFKA-4863:


 Summary: Querying window store may return unwanted keys
 Key: KAFKA-4863
 URL: https://issues.apache.org/jira/browse/KAFKA-4863
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Xavier Léauté
Priority: Critical


Using variable length keys in a window store may cause unwanted results to be 
returned when querying certain ranges.

Below is a test case for {{RocksDBWindowStoreTest}} that shows the problem. It 
fails, returning {{\["0001", "0003", "0002"\]}} instead of {{\["0001", 
"0003"\]}}.

{code:java}
@Test
public void testPutAndFetchSanity() throws IOException {
final RocksDBWindowStoreSupplier supplier =
new RocksDBWindowStoreSupplier<>(
"window", 60 * 1000L * 2, 3,
true, Serdes.String(), Serdes.String(),
windowSize, true, Collections.emptyMap(), false
);
final WindowStore store = supplier.get();
store.init(context, store);

try {
store.put("a", "0001", 0);
store.put("aa", "0002", 0);
store.put("a", "0003", 1);

assertEquals(Utils.mkList("0001", "0003"), toList(store.fetch("a", 
0, Long.MAX_VALUE)));
} finally {
store.close();
}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is back to normal : kafka-trunk-jdk8 #1335

2017-03-07 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk7 #1998

2017-03-07 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-4841; NetworkClient should only consider a connection to have

--
[...truncated 918.89 KB...]
org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testRewindOnRebalanceDuringPoll PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitSuccessFollowedByFailure STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testCommitSuccessFollowedByFailure PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToLeader STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToLeader PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToOwner STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnectorRedirectToOwner PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartUnknownTask STARTED


[jira] [Created] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time

2017-03-07 Thread Pengwei (JIRA)
Pengwei created KAFKA-4862:
--

 Summary: Kafka client connect to a shutdown node will block for a 
long time
 Key: KAFKA-4862
 URL: https://issues.apache.org/jira/browse/KAFKA-4862
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0, 0.9.0.0
Reporter: Pengwei
Assignee: Pengwei
 Fix For: 0.11.0.0


Currently in our test env, we found after one of the broker node crash(reboot 
or os crash), the client maybe still connecting to the crash node to send 
metadata request or other request, and it need about several  minutes to aware 
the connection is timeout then try another node to connect to send the request. 
 Then the client may still not aware the metadata change after several minutes.

We don't have a connection timeout for the network client, we should add a 
connection timeout for the client



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2654: KAFKA-3989_follow_up PR: update script to run from...

2017-03-07 Thread bbejeck
GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/2654

KAFKA-3989_follow_up PR: update script to run from kafka root eg ./jm…

…h-benchmarks/jmh.sh

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka KAFKA-3989_follow_up

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2654.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2654


commit 0693378b36c3f36c2e3a6b9bcdbd06dc77c06031
Author: bbejeck 
Date:   2017-03-08T01:57:51Z

KAFKA-3989_follow_up PR: update script to run from kafka root eg 
./jmh-benchmarks/jmh.sh




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-07 Thread radai
just to clarify - ListIterator is a nice API, and doesnt constrain the
implementation a lot more than Iterator (especially if we implement
previous() very inefficiently :-) ), but changing
Iterable headers(String key)
into
ListIterator headers(String key)
would lose us the ability to easily write what i think is the most common
case - a for each loop:

for (Header stop : headers("lineage")) {
   //examine stop
}

On Tue, Mar 7, 2017 at 12:31 PM, radai  wrote:

> ing, as you call it, would probably be implemente


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-07 Thread Matthias J. Sax
Damian, Jun,

Thanks for your input.


About Performance test:

I can follow up with more performance tests using more partitions and
also collecting broker metrics.

However, I want to highlight again, that even if 1000+ partitions would
be problematic, one can simply implement PartitionGrouper interface and
reduce the number of tasks to 250 or 100... So I am not sure, if we
should block this KIP, even if there might be some performance penalty
for currently single partitioned tasks.

About memory usage. JXM max-heap and max-off-heap did report 256MB and
133MB for all experiments (thus I did not put it in the spreadsheet).
Thus, using 100 producers (each using a max of 32MB of memory) was not
an issue with regard to memory consumption. I did not track "current
head/off-heap" memory as this would require a more advance test setup to
monitor it over time. If you think this would be required, we can do
some tests though.

However, as 256 MB was enough memory, and there are other components
next to the producers using memory, I don't expect a severely increased
memory usage. Producer allocate memory on-demand, and if load is shared
over multiple producers, overall memory usage should stay the same as a
single producer should allocate less memory.


About Batching:

As you can see from the benchmarks (in the detailed view -- I also added
some graphs to the summary now) the average batch size gets slightly
decrease with an increased number of partitions. However, there is no
big difference between "producer per thread" and "producer per task"
scenario.


About acks:

This is covered by KIP-98 already. If idempotent producer is use, it's
required to set max.in.flight.requests.per.connection=1 and retries > 0
-- otherwise a config exception will be thrown. For transactions, it's
further required that acks=-1 to avoid a config exception.

Other bits, like min.isr, replication.factor, etc. (ie, all broker/topic
configs) are out of scope, and it's user responsibility to set those
values correctly to ensure transactionality and idempotency.



-Matthias


On 3/7/17 9:32 AM, Jun Rao wrote:
> Hi, Guozhang,
> 
> Thanks for the KIP. A couple of comments.
> 
> 1. About the impact on producer batching. My understanding is that
> typically different sub-topologies in the same task are publishing to
> different topics. Since the producer batching happens at the
> topic/partition level, using a producer per task may not impact batching
> much.
> 
> 2. When processing.guarantee is set to exactly_once, do we want to enforce
> acks to all in the producer? The default acks is 1 and may cause acked data
> to be lost later when the leader changes.
> 
> Thanks,
> 
> Jun
> 
> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy  wrote:
> 
>> Hi Matthias,
>>
>> Thanks. The perf test is a good start but I don't think it goes far enough.
>> 100 partitions is not a lot. What happens when there are thousands of
>> partitions? What is the load on the brokers? How much more memory is used
>> by the Streams App etc?
>>
>> Thanks,
>> Damian
>>
>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax  wrote:
>>
>>> Hi,
>>>
>>> I want to give a first respond:
>>>
>>>
>>>
>>> 1. Producer per task:
>>>
>>> First, we did some performance tests, indicating that the performance
>>> penalty is small. Please have a look here:
>>>
>>> https://docs.google.com/spreadsheets/d/18aGOB13-
>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>>>
>>> For the test, we ran with a trunk version and a modified version that
>>> uses a producer per task (of course, no transactions, but at-least-once
>>> semantics). The scaling factor indicates the number of brokers and
>>> (single threaded) Streams instances. We used SimpleBenchmark that is
>>> part of AK code base.
>>>
>>>
>>> Second, as the design is "producer per task" (and not "producer per
>>> partition") it is possible to specify a custom PartitionGrouper that
>>> assigns multiple partitions to a single task. Thus, it allows to reduce
>>> the number of tasks for scenarios with many partitions. Right now, this
>>> interface must be implemented solely by the user, but we could also add
>>> a new config parameter that specifies the max.number.of.tasks or
>>> partitions.per.task so that the user can configure this instead of
>>> implementing the interface.
>>>
>>> Third, there is the idea of a "Producer Pool" that would allow to share
>>> resources (network connections, memory, etc) over multiple producers.
>>> This would allow to separate multiple transaction on the producer level,
>>> while resources are shared. There is no detailed design document yet and
>>> there would be a KIP for this feature.
>>>
>>> Thus, if there should be any performance problems for high scale
>>> scenarios, there are multiple ways to tackle them while keeping the
>>> "producer per task" design.
>>>
>>> Additionally, a "producer per thread" design would be way more complex
>>> and I summarized the issues in 

[jira] [Updated] (KAFKA-4841) NetworkClient should only consider a connection to be fail after attempt to connect

2017-03-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4841:
---
Fix Version/s: 0.11.0.0

> NetworkClient should only consider a connection to be fail after attempt to 
> connect
> ---
>
> Key: KAFKA-4841
> URL: https://issues.apache.org/jira/browse/KAFKA-4841
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
> Fix For: 0.11.0.0
>
>
> KAFKA-4820 allows new request to be enqueued to unsent by user thread while 
> some other thread does poll(...). This causes problem in the following 
> scenario:
> - Thread A calls poll(...) and is blocked on select(...)
> - Thread B enqueues a request into unsent of ConsumerNetworkClient for node N
> - Thread A calls checkDisconnects(now) -> client.connectionFailed(N)
> Because no attempts have been made to connection to node N yet, there is no 
> state for node N and connectionFailed(N) would throw exception. Note that 
> this problem only occurs when one thread is able to enqueue requests while 
> another thread is in the process of `poll(...)`
> The solution is to only consider a connection has failed if attempts have 
> been made to connect to this node AND the connection state is DISCONNECTED.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4841) NetworkClient should only consider a connection to be fail after attempt to connect

2017-03-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4841.

Resolution: Fixed

> NetworkClient should only consider a connection to be fail after attempt to 
> connect
> ---
>
> Key: KAFKA-4841
> URL: https://issues.apache.org/jira/browse/KAFKA-4841
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
> Fix For: 0.11.0.0
>
>
> KAFKA-4820 allows new request to be enqueued to unsent by user thread while 
> some other thread does poll(...). This causes problem in the following 
> scenario:
> - Thread A calls poll(...) and is blocked on select(...)
> - Thread B enqueues a request into unsent of ConsumerNetworkClient for node N
> - Thread A calls checkDisconnects(now) -> client.connectionFailed(N)
> Because no attempts have been made to connection to node N yet, there is no 
> state for node N and connectionFailed(N) would throw exception. Note that 
> this problem only occurs when one thread is able to enqueue requests while 
> another thread is in the process of `poll(...)`
> The solution is to only consider a connection has failed if attempts have 
> been made to connect to this node AND the connection state is DISCONNECTED.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is back to normal : kafka-0.10.2-jdk7 #98

2017-03-07 Thread Apache Jenkins Server
See 




[jira] [Commented] (KAFKA-4841) NetworkClient should only consider a connection to be fail after attempt to connect

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900516#comment-15900516
 ] 

ASF GitHub Bot commented on KAFKA-4841:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2641


> NetworkClient should only consider a connection to be fail after attempt to 
> connect
> ---
>
> Key: KAFKA-4841
> URL: https://issues.apache.org/jira/browse/KAFKA-4841
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> KAFKA-4820 allows new request to be enqueued to unsent by user thread while 
> some other thread does poll(...). This causes problem in the following 
> scenario:
> - Thread A calls poll(...) and is blocked on select(...)
> - Thread B enqueues a request into unsent of ConsumerNetworkClient for node N
> - Thread A calls checkDisconnects(now) -> client.connectionFailed(N)
> Because no attempts have been made to connection to node N yet, there is no 
> state for node N and connectionFailed(N) would throw exception. Note that 
> this problem only occurs when one thread is able to enqueue requests while 
> another thread is in the process of `poll(...)`
> The solution is to only consider a connection has failed if attempts have 
> been made to connect to this node AND the connection state is DISCONNECTED.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1334

2017-03-07 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-4851: only search available segments during

--
[...truncated 156.93 KB...]

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED


Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-07 Thread Todd Palino
I’ve been following this one on and off, and overall it sounds good to me.

- The SSL question is a good one. However, that type of overhead should be
proportional to the bytes rate, so I think that a bytes rate quota would
still be a suitable way to address it.

- I think it’s better to make the quota percentage of total thread pool
capacity, and not percentage of an individual thread. That way you don’t
have to adjust it when you adjust thread counts (tuning, hardware changes,
etc.)


-Todd



On Tue, Mar 7, 2017 at 2:38 PM, Becket Qin  wrote:

> I see. Good point about SSL.
>
> I just asked Todd to take a look.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 7, 2017 at 2:17 PM, Jun Rao  wrote:
>
> > Hi, Jiangjie,
> >
> > Yes, I agree that byte rate already protects the network threads
> > indirectly. I am not sure if byte rate fully captures the CPU overhead in
> > network due to SSL. So, at the high level, we can use request time limit
> to
> > protect CPU and use byte rate to protect storage and network.
> >
> > Also, do you think you can get Todd to comment on this KIP?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Mar 7, 2017 at 11:21 AM, Becket Qin 
> wrote:
> >
> > > Hi Rajini/Jun,
> > >
> > > The percentage based reasoning sounds good.
> > > One thing I am wondering is that if we assume the network thread are
> just
> > > doing the network IO, can we say bytes rate quota is already sort of
> > > network threads quota?
> > > If we take network threads into the consideration here, would that be
> > > somewhat overlapping with the bytes rate quota?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Jun,
> > > >
> > > > Thank you for the explanation, I hadn't realized you meant percentage
> > of
> > > > the total thread pool. If everyone is OK with Jun's suggestion, I
> will
> > > > update the KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Rajini
> > > >
> > > > On Tue, Mar 7, 2017 at 5:08 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Rajini,
> > > > >
> > > > > Let's take your example. Let's say a user sets the limit to 50%. I
> am
> > > not
> > > > > sure if it's better to apply the same percentage separately to
> > network
> > > > and
> > > > > io thread pool. For example, for produce requests, most of the time
> > > will
> > > > be
> > > > > spent in the io threads whereas for fetch requests, most of the
> time
> > > will
> > > > > be in the network threads. So, using the same percentage in both
> > thread
> > > > > pools means one of the pools' resource will be over allocated.
> > > > >
> > > > > An alternative way is to simply model network and io thread pool
> > > > together.
> > > > > If you get 10 io threads and 5 network threads, you get 1500%
> request
> > > > > processing power. A 50% limit means a total of 750% processing
> power.
> > > We
> > > > > just add up the time a user request spent in either network or io
> > > thread.
> > > > > If that total exceeds 750% (doesn't matter whether it's spent more
> in
> > > > > network or io thread), the request will be throttled. This seems
> more
> > > > > general and is not sensitive to the current implementation detail
> of
> > > > having
> > > > > a separate network and io thread pool. In the future, if the
> > threading
> > > > > model changes, the same concept of quota can still be applied. For
> > now,
> > > > > since it's a bit tricky to add the delay logic in the network
> thread
> > > > pool,
> > > > > we could probably just do the delaying only in the io threads as
> you
> > > > > suggested earlier.
> > > > >
> > > > > There is still the orthogonal question of whether a quota of 50% is
> > out
> > > > of
> > > > > 100% or 100% * #total processing threads. My feeling is that the
> > latter
> > > > is
> > > > > slightly better based on my explanation earlier. The way to
> describe
> > > this
> > > > > quota to the users can be "share of elapsed request processing time
> > on
> > > a
> > > > > single CPU" (similar to top).
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Jun,
> > > > > >
> > > > > > Agree about the two scenarios.
> > > > > >
> > > > > > But still not sure about a single quota covering both network
> > threads
> > > > and
> > > > > > I/O threads with per-thread quota. If there are 10 I/O threads
> and
> > 5
> > > > > > network threads and I want to assign half the quota to userA, the
> > > quota
> > > > > > would be 750%. I imagine, internally, we would convert this to
> 500%
> > > for
> > > > > I/O
> > > > > > and 250% for network threads to allocate 50% of each pool.
> > > > > >
> > > > > > A couple of scenarios:
> > > > > >
> > > > > > 1. Admin adds 1 extra network 

[jira] [Comment Edited] (KAFKA-3795) Transient system test failure upgrade_test.TestUpgrade

2017-03-07 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900450#comment-15900450
 ] 

Apurva Mehta edited comment on KAFKA-3795 at 3/8/17 12:28 AM:
--

Another instance: 
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-03-07--001.144479--apache--trunk--81f9e13/report.html


was (Author: apurva):
Another instance: https://testbreak.confluent.io/kiosk/test_result?id=30370

> Transient system test failure upgrade_test.TestUpgrade
> --
>
> Key: KAFKA-3795
> URL: https://issues.apache.org/jira/browse/KAFKA-3795
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Jason Gustafson
>  Labels: reliability
>
> From a recent build running on the 0.10.0 branch:
> {code}
> test_id:
> 2016-06-06--001.kafkatest.tests.core.upgrade_test.TestUpgrade.test_upgrade.from_kafka_version=0.9.0.1.to_message_format_version=0.9.0.1.compression_types=.snappy.new_consumer=True
> status: FAIL
> run time:   3 minutes 29.166 seconds
> 3522 acked message did not make it to the Consumer. They are: 476524, 
> 476525, 476527, 476528, 476530, 476531, 476533, 476534, 476536, 476537, 
> 476539, 476540, 476542, 476543, 476545, 476546, 476548, 476549, 476551, 
> 476552, ...plus 3482 more. Total Acked: 110437, Total Consumed: 127470. The 
> first 1000 missing messages were validated to ensure they are in Kafka's data 
> files. 1000 were missing. This suggests data loss. Here are some of the 
> messages not found in the data files: [477184, 477185, 477187, 477188, 
> 477190, 477191, 477193, 477194, 477196, 477197]
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/upgrade_test.py",
>  line 113, in test_upgrade
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.perform_upgrade(from_kafka_version,
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in run_produce_consume_validate
> raise e
> AssertionError: 3522 acked message did not make it to the Consumer. They are: 
> 476524, 476525, 476527, 476528, 476530, 476531, 476533, 476534, 476536, 
> 476537, 476539, 476540, 476542, 476543, 476545, 476546, 476548, 476549, 
> 476551, 476552, ...plus 3482 more. Total Acked: 110437, Total Consumed: 
> 127470. The first 1000 missing messages were validated to ensure they are in 
> Kafka's data files. 1000 were missing. This suggests data loss. Here are some 
> of the messages not found in the data files: [477184, 477185, 477187, 477188, 
> 477190, 477191, 477193, 477194, 477196, 477197]
> {code}
> Here's a link to the test data: 
> http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-06-06--001.1465234069--apache--0.10.0--6500b53/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-119: Drop Support for Scala 2.10 in Kafka 0.11

2017-03-07 Thread Ismael Juma
Thanks to everyone who voted and/or provided feedback. +1 from me too.

The vote passed with 6 binding +1s (Guozhang, Ewen, Gwen, Grant, Becket,
Ismael) and 9 non-binding +1s (Dongjin, Molnar, Tom, Eno, Bill, Jozef,
Apurva, Dong, Mickael).

I will update the relevant wiki pages.

Ismael

On Tue, Feb 28, 2017 at 10:40 AM, Ismael Juma  wrote:

> Hi everyone,
>
> Since the few who responded in the discuss thread were in favour and there
> were no objections, I would like to initiate the voting process for
> KIP-119: Drop Support for Scala 2.10 in Kafka 0.11:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 119%3A+Drop+Support+for+Scala+2.10+in+Kafka+0.11
>
> The vote will run for a minimum of 72 hours.
>
> Thanks,
> Ismael
>


[jira] [Assigned] (KAFKA-4858) Long topic names created using old kafka-topics.sh can prevent newer brokers from joining any ISRs

2017-03-07 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4858:
--

Assignee: Vahid Hashemian

> Long topic names created using old kafka-topics.sh can prevent newer brokers 
> from joining any ISRs
> --
>
> Key: KAFKA-4858
> URL: https://issues.apache.org/jira/browse/KAFKA-4858
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>
> I ran into a variant of KAFKA-3219 that resulted in a broker being unable to 
> join any ISRs the cluster.
> Prior to 0.10.0.0, the maximum topic length was 255.
> With 0.10.0.0 and beyond, the maximum topic length is 249.
> The check on topic name length is done by kafka-topics.sh prior to topic 
> creation. Thus, it is possible to use a 0.9.0.1 kafka-topics.sh script to 
> create a 255 character topic on a 0.10.1.1 broker.
> When this happens, you will get the following stack trace (the same one seen 
> in KAFKA-3219)
> {code}
> $ TOPIC=$(printf 'd%.0s' {1..255} ) ; bin/kafka-topics.sh --zookeeper 
> 127.0.0.1 --create --topic $TOPIC --partitions 1 --replication-factor 2
> Created topic 
> "ddd".
> {code}
> {code}
> [2017-03-06 22:01:19,011] ERROR [KafkaApi-2] Error when handling request 
> {controller_id=1,controller_epoch=1,partition_states=[{topic=ddd,partition=0,controller_epoch=1,leader=2,leader_epoch=0,isr=[2,1],zk_version=0,replicas=[2,1]}],live_leaders=[{id=2,host=jchengmbpro15,port=9093}]}
>  (kafka.server.KafkaApis)
> java.lang.NullPointerException
>   at 
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
>   at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>   at kafka.log.Log.loadSegments(Log.scala:155)
>   at kafka.log.Log.(Log.scala:108)
>   at kafka.log.LogManager.createLog(LogManager.scala:362)
>   at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at 
> kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
>   at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
>   at kafka.cluster.Partition.makeLeader(Partition.scala:168)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
>   at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
>   at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
>   at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic does not get created on disk, but the broker thinks the topic is 
> ready. The broker seems functional, for other topics. I can produce/consume 
> to other topics.
> {code}
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1 --describe
> 

[GitHub] kafka pull request #2653: MINOR: Update possible errors in OffsetFetchRespon...

2017-03-07 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/2653

MINOR: Update possible errors in OffsetFetchResponse

Note: None of the use cases for offset fetch would lead to a 
`TOPIC_AUTHORIZATION_FAILED` error (fetching offset of an unauthorized 
partition would return an `UNKNOWN_TOPIC_OR_PARTITION` error). That is why that 
it is being removed from the `PARTITION_ERRORS` list.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka 
minor/update_possible_errors_in_offset_fetch_response

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2653


commit 6e6406302211b31cfabbee660b23b31f018ddefa
Author: Vahid Hashemian 
Date:   2017-03-07T23:16:33Z

MINOR: Update possible errors in OffsetFetchResponse

Note: None of the use cases for offset fetch would lead to a 
`TOPIC_AUTHORIZATION_FAILED` error (fetching offset of an unauthorized 
partition would return an `UNKNOWN_TOPIC_OR_PARTITION` error). That is why that 
it is being removed from the `PARTITION_ERRORS` list.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4851) SessionStore.fetch(key) is a performance bottleneck

2017-03-07 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4851.
--
   Resolution: Fixed
Fix Version/s: 0.10.2.1

Issue resolved by pull request 2645
[https://github.com/apache/kafka/pull/2645]

> SessionStore.fetch(key) is a performance bottleneck
> ---
>
> Key: KAFKA-4851
> URL: https://issues.apache.org/jira/browse/KAFKA-4851
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.11.0.0, 0.10.2.1
>
>
> When flushing the {{CachingSessionStore}} we need to search for the previous 
> value for a session. This involves searching each open RocksDB segment. The 
> code ends up doing a call  {{Segments.segments(0, Long.MAX_VALUE)}} this 
> results in approximately 3 million gets on a {{ConcurrentHashMap}} of which 
> all but 3 of them will be hits. 
> Change this code to restrict the segmentIds to search just to the available 
> set.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4851) SessionStore.fetch(key) is a performance bottleneck

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900385#comment-15900385
 ] 

ASF GitHub Bot commented on KAFKA-4851:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2645


> SessionStore.fetch(key) is a performance bottleneck
> ---
>
> Key: KAFKA-4851
> URL: https://issues.apache.org/jira/browse/KAFKA-4851
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.11.0.0
>
>
> When flushing the {{CachingSessionStore}} we need to search for the previous 
> value for a session. This involves searching each open RocksDB segment. The 
> code ends up doing a call  {{Segments.segments(0, Long.MAX_VALUE)}} this 
> results in approximately 3 million gets on a {{ConcurrentHashMap}} of which 
> all but 3 of them will be hits. 
> Change this code to restrict the segmentIds to search just to the available 
> set.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2645: KAFKA-4851: only search available segments during ...

2017-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2645


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-07 Thread Becket Qin
I see. Good point about SSL.

I just asked Todd to take a look.

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 7, 2017 at 2:17 PM, Jun Rao  wrote:

> Hi, Jiangjie,
>
> Yes, I agree that byte rate already protects the network threads
> indirectly. I am not sure if byte rate fully captures the CPU overhead in
> network due to SSL. So, at the high level, we can use request time limit to
> protect CPU and use byte rate to protect storage and network.
>
> Also, do you think you can get Todd to comment on this KIP?
>
> Thanks,
>
> Jun
>
> On Tue, Mar 7, 2017 at 11:21 AM, Becket Qin  wrote:
>
> > Hi Rajini/Jun,
> >
> > The percentage based reasoning sounds good.
> > One thing I am wondering is that if we assume the network thread are just
> > doing the network IO, can we say bytes rate quota is already sort of
> > network threads quota?
> > If we take network threads into the consideration here, would that be
> > somewhat overlapping with the bytes rate quota?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram  >
> > wrote:
> >
> > > Jun,
> > >
> > > Thank you for the explanation, I hadn't realized you meant percentage
> of
> > > the total thread pool. If everyone is OK with Jun's suggestion, I will
> > > update the KIP.
> > >
> > > Thanks,
> > >
> > > Rajini
> > >
> > > On Tue, Mar 7, 2017 at 5:08 PM, Jun Rao  wrote:
> > >
> > > > Hi, Rajini,
> > > >
> > > > Let's take your example. Let's say a user sets the limit to 50%. I am
> > not
> > > > sure if it's better to apply the same percentage separately to
> network
> > > and
> > > > io thread pool. For example, for produce requests, most of the time
> > will
> > > be
> > > > spent in the io threads whereas for fetch requests, most of the time
> > will
> > > > be in the network threads. So, using the same percentage in both
> thread
> > > > pools means one of the pools' resource will be over allocated.
> > > >
> > > > An alternative way is to simply model network and io thread pool
> > > together.
> > > > If you get 10 io threads and 5 network threads, you get 1500% request
> > > > processing power. A 50% limit means a total of 750% processing power.
> > We
> > > > just add up the time a user request spent in either network or io
> > thread.
> > > > If that total exceeds 750% (doesn't matter whether it's spent more in
> > > > network or io thread), the request will be throttled. This seems more
> > > > general and is not sensitive to the current implementation detail of
> > > having
> > > > a separate network and io thread pool. In the future, if the
> threading
> > > > model changes, the same concept of quota can still be applied. For
> now,
> > > > since it's a bit tricky to add the delay logic in the network thread
> > > pool,
> > > > we could probably just do the delaying only in the io threads as you
> > > > suggested earlier.
> > > >
> > > > There is still the orthogonal question of whether a quota of 50% is
> out
> > > of
> > > > 100% or 100% * #total processing threads. My feeling is that the
> latter
> > > is
> > > > slightly better based on my explanation earlier. The way to describe
> > this
> > > > quota to the users can be "share of elapsed request processing time
> on
> > a
> > > > single CPU" (similar to top).
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram <
> > rajinisiva...@gmail.com>
> > > > wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > Agree about the two scenarios.
> > > > >
> > > > > But still not sure about a single quota covering both network
> threads
> > > and
> > > > > I/O threads with per-thread quota. If there are 10 I/O threads and
> 5
> > > > > network threads and I want to assign half the quota to userA, the
> > quota
> > > > > would be 750%. I imagine, internally, we would convert this to 500%
> > for
> > > > I/O
> > > > > and 250% for network threads to allocate 50% of each pool.
> > > > >
> > > > > A couple of scenarios:
> > > > >
> > > > > 1. Admin adds 1 extra network thread. To retain 50%, admin needs to
> > now
> > > > > allocate 800% for each user. Or increase the quota for a few users.
> > To
> > > > me,
> > > > > it feels like admin needs to convert 50% to 800% and Kafka
> internally
> > > > needs
> > > > > to convert 800% to (500%, 300%). Everyone using just 50% feels a
> lot
> > > > > simpler.
> > > > >
> > > > > 2. We decide to add some other thread to this list. Admin needs to
> > know
> > > > > exactly how many threads form the maximum quota. And we can be
> > changing
> > > > > this between broker versions as we add more to the list. Again a
> > single
> > > > > overall percent would be a lot simpler.
> > > > >
> > > > > There were others who were unconvinced by a single percent from the
> > > > initial
> > > > > proposal and were happier with thread units similar to CPU units,
> so
> > I
> > > am
> > > > > ok 

Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-07 Thread Jun Rao
Hi, Jiangjie,

Yes, I agree that byte rate already protects the network threads
indirectly. I am not sure if byte rate fully captures the CPU overhead in
network due to SSL. So, at the high level, we can use request time limit to
protect CPU and use byte rate to protect storage and network.

Also, do you think you can get Todd to comment on this KIP?

Thanks,

Jun

On Tue, Mar 7, 2017 at 11:21 AM, Becket Qin  wrote:

> Hi Rajini/Jun,
>
> The percentage based reasoning sounds good.
> One thing I am wondering is that if we assume the network thread are just
> doing the network IO, can we say bytes rate quota is already sort of
> network threads quota?
> If we take network threads into the consideration here, would that be
> somewhat overlapping with the bytes rate quota?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram 
> wrote:
>
> > Jun,
> >
> > Thank you for the explanation, I hadn't realized you meant percentage of
> > the total thread pool. If everyone is OK with Jun's suggestion, I will
> > update the KIP.
> >
> > Thanks,
> >
> > Rajini
> >
> > On Tue, Mar 7, 2017 at 5:08 PM, Jun Rao  wrote:
> >
> > > Hi, Rajini,
> > >
> > > Let's take your example. Let's say a user sets the limit to 50%. I am
> not
> > > sure if it's better to apply the same percentage separately to network
> > and
> > > io thread pool. For example, for produce requests, most of the time
> will
> > be
> > > spent in the io threads whereas for fetch requests, most of the time
> will
> > > be in the network threads. So, using the same percentage in both thread
> > > pools means one of the pools' resource will be over allocated.
> > >
> > > An alternative way is to simply model network and io thread pool
> > together.
> > > If you get 10 io threads and 5 network threads, you get 1500% request
> > > processing power. A 50% limit means a total of 750% processing power.
> We
> > > just add up the time a user request spent in either network or io
> thread.
> > > If that total exceeds 750% (doesn't matter whether it's spent more in
> > > network or io thread), the request will be throttled. This seems more
> > > general and is not sensitive to the current implementation detail of
> > having
> > > a separate network and io thread pool. In the future, if the threading
> > > model changes, the same concept of quota can still be applied. For now,
> > > since it's a bit tricky to add the delay logic in the network thread
> > pool,
> > > we could probably just do the delaying only in the io threads as you
> > > suggested earlier.
> > >
> > > There is still the orthogonal question of whether a quota of 50% is out
> > of
> > > 100% or 100% * #total processing threads. My feeling is that the latter
> > is
> > > slightly better based on my explanation earlier. The way to describe
> this
> > > quota to the users can be "share of elapsed request processing time on
> a
> > > single CPU" (similar to top).
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram <
> rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Jun,
> > > >
> > > > Agree about the two scenarios.
> > > >
> > > > But still not sure about a single quota covering both network threads
> > and
> > > > I/O threads with per-thread quota. If there are 10 I/O threads and 5
> > > > network threads and I want to assign half the quota to userA, the
> quota
> > > > would be 750%. I imagine, internally, we would convert this to 500%
> for
> > > I/O
> > > > and 250% for network threads to allocate 50% of each pool.
> > > >
> > > > A couple of scenarios:
> > > >
> > > > 1. Admin adds 1 extra network thread. To retain 50%, admin needs to
> now
> > > > allocate 800% for each user. Or increase the quota for a few users.
> To
> > > me,
> > > > it feels like admin needs to convert 50% to 800% and Kafka internally
> > > needs
> > > > to convert 800% to (500%, 300%). Everyone using just 50% feels a lot
> > > > simpler.
> > > >
> > > > 2. We decide to add some other thread to this list. Admin needs to
> know
> > > > exactly how many threads form the maximum quota. And we can be
> changing
> > > > this between broker versions as we add more to the list. Again a
> single
> > > > overall percent would be a lot simpler.
> > > >
> > > > There were others who were unconvinced by a single percent from the
> > > initial
> > > > proposal and were happier with thread units similar to CPU units, so
> I
> > am
> > > > ok with going with per-thread quotas (as units or percent). Just not
> > sure
> > > > it makes it easier for admin in all cases.
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > >
> > > > On Fri, Mar 3, 2017 at 6:03 AM, Jun Rao  wrote:
> > > >
> > > > > Hi, Rajini,
> > > > >
> > > > > Consider modeling as n * 100% unit. For 2), the question is what's
> > > > causing
> > > > > the I/O threads to be saturated. It's unlikely that 

[jira] [Commented] (KAFKA-4795) Confusion around topic deletion

2017-03-07 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900271#comment-15900271
 ] 

Vahid Hashemian commented on KAFKA-4795:


[~ijuma] [~hachikuji] I'd appreciate your clarification on this JIRA.

> Confusion around topic deletion
> ---
>
> Key: KAFKA-4795
> URL: https://issues.apache.org/jira/browse/KAFKA-4795
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> The topic deletion works like this in 0.10.2.0:
> # {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
> # {{bin/kafka-server-start.sh config/server.properties}} (uses default 
> {{server.properties}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1}} (creates the topic {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> Previously, the last command above returned {{test - marked for deletion}}, 
> which matched the output statement of the {{--delete}} topic command.
> Continuing with the above scenario,
> # stop the broker
> # add the broker config {{delete.topic.enable=true}} in the config file
> # {{bin/kafka-server-start.sh config/server.properties}} (this does not 
> remove the topic {{test}}, as if the topic was never marked for deletion).
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no 
> topics).
> It seems that the "marked for deletion" state for topics no longer exists.
> I opened this JIRA so I can get a confirmation on the expected topic deletion 
> behavior, because in any case, I think the user experience could be improved 
> (either there is a bug in the code, or the command's output statement is 
> misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-03-07 Thread Dong Lin
Hey Becket,

Thanks for the review.

1. I have thought about this before. I think it is fine to delete the node
after controller reads it. On controller failover, the new controller will
always send LeaderAndIsrRequest for all partitions to each broker in order
to learn about offline replicas.

2. This field is not necessary now because we currently only use this znode
for LogDirFailure event. But I envision that it may be useful in the future
e.g. for log directory fix without having to reboot broker. I have updated
the KIP to specify that we use 1 to indicate LogDirFailure event. I think
this field makes the znode more general and extensible. But I am OK to
remove this field from the znode for now and add it in the future.

Thanks,
Dong





On Tue, Mar 7, 2017 at 1:26 PM, Becket Qin  wrote:

> Hi Dong,
>
> Thanks for the KIP, a few more comments:
>
> 1. In the KIP wiki section "A log directory stops working on a broker
> during runtime", the controller deletes the notification node right after
> it reads the znode. It seems safer to do this at last so even though the
> controller fails the new controller will still see the notification.
> 2. In the notification znode we have Event field as an integer. Can we
> document what is the value of LogDirFailure? And also are there any other
> possible values?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 7, 2017 at 11:30 AM, Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > Thanks much for taking time to review the KIP and read through all the
> > discussion!
> >
> > Please see my reply inline.
> >
> > On Tue, Mar 7, 2017 at 9:47 AM, Ismael Juma  wrote:
> >
> > > Hi Dong,
> > >
> > > It took me a while, but I finally went through the whole thread. I
> have a
> > > few minor comments:
> > >
> > > 1. Regarding the metrics, can we include the full name (e.g.
> > > kafka.cluster:type=Partition,name=InSyncReplicasCount,
> > > topic={topic},partition={partition} was defined in KIP-96)?
> > >
> > > Certainly. I have updated the KIP to specify the full name.
> >
> >
> > > 2. We talk about changes in operational procedures for people switching
> > > from RAID to JBOD, but what about people who are already using JBOD?
> > Since
> > > disk failures won't necessarily cause broker failures, some adjustments
> > may
> > > be needed.
> > >
> >
> > Good point. I indeed missed one operational procedure for both the
> existing
> > RAID/JBOD user. I have updated the KIP to specify the following:
> >
> > Administrator will need to detect log directory failure by looking at
> > OfflineLogDirectoriesCount. After log directory failure is detected,
> > administrator needs to fix disks and reboot broker.
> >
> >
> > >
> > > 3. Another point regarding operational procedures, with a large enough
> > > cluster, disk failures may not be that uncommon. It may be worth
> > explaining
> > > the recommended procedure if someone needs to do a rolling bounce of a
> > > cluster with some bad disks. One option is to simply do the bounce and
> > hope
> > > that the bad disks are detected during restart, but we know that this
> is
> > > not guaranteed to happen immediately. A better option may be to remove
> > the
> > > bad log dirs from the broker config until the disk is replaced.
> > >
> >
> > I am not sure if I understand your suggestion here. I think user doesn't
> > need to differentiate between log directory failure during rolling bounce
> > and log directory failure during runtime. All they need to do is to
> detect
> > and handle log directory failure specified above. And they don't have to
> > remove the bad log directory immediately from broker config. The only
> > drawback of keeping log directory there is that a new replica may not be
> > created on the broker. But the chance of that happening is really low,
> > since the controller has to fail in a small window after user initiated
> the
> > topic creation but before it sends LeaderAndIsrRequest with
> > is_new_replica=true to the broker. In practice this shouldn't matter.
> >
> >
> > >
> > > 4. The test plan doesn't mention the number of log directories per
> > broker.
> > > It could be good to specify this. Also, we seem to create one topic
> with
> > > one partition, which means that only one log directory will be
> populated.
> > > It seems like we should have partitions in more than one log directory
> to
> > > verify that the failed log directory doesn't affect the ones that are
> > still
> > > good.
> > >
> >
> > Sure. I have updated the test description to specify that each broker
> will
> > have two log directories.
> >
> > The existing test case will actually create 2 topics to validate that
> > failed log directory won't affect the good ones. You can find them after
> > "Now validate that the previous leader can still serve replicas on the
> good
> > log directories" and "Now validate that the follower can still serve
> > replicas on the good log directories".
> 

[DISCUSS] KIP-131 : Add access to OffsetStorageReader from SourceConnector

2017-03-07 Thread Florian Hussonnois
Hi all,

I've created a new KIP to add access to OffsetStorageReader from
SourceConnector

https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+Add+access+to+OffsetStorageReader+from+SourceConnector

Thanks.

-- 
Florian HUSSONNOIS


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-03-07 Thread Becket Qin
Hi Dong,

Thanks for the KIP, a few more comments:

1. In the KIP wiki section "A log directory stops working on a broker
during runtime", the controller deletes the notification node right after
it reads the znode. It seems safer to do this at last so even though the
controller fails the new controller will still see the notification.
2. In the notification znode we have Event field as an integer. Can we
document what is the value of LogDirFailure? And also are there any other
possible values?

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 7, 2017 at 11:30 AM, Dong Lin  wrote:

> Hey Ismael,
>
> Thanks much for taking time to review the KIP and read through all the
> discussion!
>
> Please see my reply inline.
>
> On Tue, Mar 7, 2017 at 9:47 AM, Ismael Juma  wrote:
>
> > Hi Dong,
> >
> > It took me a while, but I finally went through the whole thread. I have a
> > few minor comments:
> >
> > 1. Regarding the metrics, can we include the full name (e.g.
> > kafka.cluster:type=Partition,name=InSyncReplicasCount,
> > topic={topic},partition={partition} was defined in KIP-96)?
> >
> > Certainly. I have updated the KIP to specify the full name.
>
>
> > 2. We talk about changes in operational procedures for people switching
> > from RAID to JBOD, but what about people who are already using JBOD?
> Since
> > disk failures won't necessarily cause broker failures, some adjustments
> may
> > be needed.
> >
>
> Good point. I indeed missed one operational procedure for both the existing
> RAID/JBOD user. I have updated the KIP to specify the following:
>
> Administrator will need to detect log directory failure by looking at
> OfflineLogDirectoriesCount. After log directory failure is detected,
> administrator needs to fix disks and reboot broker.
>
>
> >
> > 3. Another point regarding operational procedures, with a large enough
> > cluster, disk failures may not be that uncommon. It may be worth
> explaining
> > the recommended procedure if someone needs to do a rolling bounce of a
> > cluster with some bad disks. One option is to simply do the bounce and
> hope
> > that the bad disks are detected during restart, but we know that this is
> > not guaranteed to happen immediately. A better option may be to remove
> the
> > bad log dirs from the broker config until the disk is replaced.
> >
>
> I am not sure if I understand your suggestion here. I think user doesn't
> need to differentiate between log directory failure during rolling bounce
> and log directory failure during runtime. All they need to do is to detect
> and handle log directory failure specified above. And they don't have to
> remove the bad log directory immediately from broker config. The only
> drawback of keeping log directory there is that a new replica may not be
> created on the broker. But the chance of that happening is really low,
> since the controller has to fail in a small window after user initiated the
> topic creation but before it sends LeaderAndIsrRequest with
> is_new_replica=true to the broker. In practice this shouldn't matter.
>
>
> >
> > 4. The test plan doesn't mention the number of log directories per
> broker.
> > It could be good to specify this. Also, we seem to create one topic with
> > one partition, which means that only one log directory will be populated.
> > It seems like we should have partitions in more than one log directory to
> > verify that the failed log directory doesn't affect the ones that are
> still
> > good.
> >
>
> Sure. I have updated the test description to specify that each broker will
> have two log directories.
>
> The existing test case will actually create 2 topics to validate that
> failed log directory won't affect the good ones. You can find them after
> "Now validate that the previous leader can still serve replicas on the good
> log directories" and "Now validate that the follower can still serve
> replicas on the good log directories".
>
>
> >
> > 5. In the protocol definition, we have isNewReplica, but it should
> probably
> > be is_new_replica.
> >
>
> Good point. My bad. It is fixed now.
>
>
> >
> > Thanks,
> > Ismael
> >
> >
> > On Thu, Jan 12, 2017 at 6:46 PM, Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > We created KIP-112: Handle disk failure for JBOD. Please find the KIP
> > wiki
> > > in the link https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 112%3A+Handle+disk+failure+for+JBOD.
> > >
> > > This KIP is related to KIP-113
> > >  > > 113%3A+Support+replicas+movement+between+log+directories>:
> > > Support replicas movement between log directories. They are needed in
> > order
> > > to support JBOD in Kafka. Please help review the KIP. You feedback is
> > > appreciated!
> > >
> > > Thanks,
> > > Dong
> > >
> >
>


[jira] [Updated] (KAFKA-4861) log.message.timestamp.type=LogAppendTime breaks Kafka based consumers

2017-03-07 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4861:
---
Fix Version/s: 0.10.2.1

> log.message.timestamp.type=LogAppendTime breaks Kafka based consumers
> -
>
> Key: KAFKA-4861
> URL: https://issues.apache.org/jira/browse/KAFKA-4861
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Dustin Cote
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 0.10.2.1
>
>
> Using 0.10.2 brokers with the property 
> `log.message.timestamp.type=LogAppendTime` breaks all Kafka-based consumers 
> for the cluster. The consumer will return:
> {code}
> [2017-03-07 15:25:10,215] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
> timestamp of the message is out of acceptable range.
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:535)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:508)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:55)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}
> On the broker side you see:
> {code}
> [2017-03-07 15:25:20,216] INFO [GroupCoordinator 0]: Group 
> console-consumer-73205 with generation 2 is now empty 
> (kafka.coordinator.GroupCoordinator)
> [2017-03-07 15:25:20,217] ERROR [Group Metadata Manager on Broker 0]: 
> Appending metadata message for group console-consumer-73205 generation 2 
> failed due to unexpected error: 
> org.apache.kafka.common.errors.InvalidTimestampException 
> (kafka.coordinator.GroupMetadataManager)
> [2017-03-07 15:25:20,218] WARN [GroupCoordinator 0]: Failed to write empty 
> metadata for group console-consumer-73205: The timestamp of the message is 
> out of acceptable range. (kafka.coordinator.GroupCoordinator)
> {code}
> Marking as a blocker since this appears to be a regression in that it doesn't 
> happen on 0.10.1.1



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-07 Thread radai
where do you see insert-in-the-middle/replace being commonly used?
lineage tracing, as you call it, would probably be implemented by way of:

1. every "stop" along the way appending itself (at the end)
2. some replication technologies, instead of just doing #1, may clear out
everything when they replicate (starting from a clean slate)



On Mon, Mar 6, 2017 at 11:00 AM, Colin McCabe  wrote:

> As others have mentioned, it seems clear that we want to preserve the
> ordering of message headers, so that we can implement things like
> lineage tracing.  (For example, each stage could add a "lineage:"
> header.)  I also think that we want the ability to add and remove
> headers as needed.  It would be really unfortunate if the only way to
> remove a message header or add a header at a certain position was the
> duplicate the whole message and re-create everything.
>
> So how about implementing ListIterator?
> https://docs.oracle.com/javase/7/docs/api/java/util/ListIterator.html
> It supports adding and removing things at arbitrary positions.  For
> people who want to use it as a simple Iterator, it is one (and you can
> use all the fancy syntax such as Java's foreach with it).  For people
> who want to add and remove things at arbitrary locations, they can.  And
> it doesn't expose the implementation, so that can be changed later.  We
> can materialize things in memory lazily if we want to, and so forth.  I
> think using the standard interface is better than rolling our own
> nonstandard collection or iterator interfaces.
>
> regards,
> Colin
>
>
> On Wed, Mar 1, 2017, at 12:59, Becket Qin wrote:
> > Hi Ismael,
> >
> > Thanks for the reply. Please see the comments inline.
> >
> > On Wed, Mar 1, 2017 at 6:47 AM, Ismael Juma  wrote:
> >
> > > Hi Becket,
> > >
> > > Thanks for sharing your thoughts. More inline.
> > >
> > > On Wed, Mar 1, 2017 at 2:54 AM, Becket Qin 
> wrote:
> > >
> > > > As you can imagine if the ProducerRecord has a value as a List and
> the
> > > > Interceptor.onSend() can actually add an element to the List. If the
> > > > producer.send() is called on the same ProducerRecord again, the value
> > > list
> > > > would have one more element than the previously sent ProducerRecord
> even
> > > > though the ProducerRecord itself is not mutable, right? Same thing
> can
> > > > apply to any modifiable class type.
> > > >
> > >
> > > The difference is that the user chooses the value type. They are free
> to
> > > choose a mutable or immutable type. A generic interceptor cannot
> mutate the
> > > value because it doesn't know the type (and it could be immutable). One
> > > could write an interceptor that checked the type of the value at
> runtime
> > > and did things based on that. But again, the user who creates the
> record is
> > > in control.
> > >
> > But there is no generic interceptor, right? The interceptor always takes
> > specific K, V type.
> >
> >
> > > From this standpoint allowing headers to be mutable doesn't really
> weaken
> > > > the mutability we already have. Admittedly a mutable header is kind
> of
> > > > guiding user towards to change the headers in the existing object
> instead
> > > > of creating a new one.
> > >
> > >
> > > Yes, with headers, we are providing a model for the user (the user
> doesn't
> > > get to choose it like with keys and values) and for the interceptors.
> So, I
> > > think it's not the same.
> >
> >
> > >
> > > > But I think reusing an object while it is possible
> > > > to be modified by user code is a risk that users themselves are
> willing
> > > to
> > > > take. And we do not really protect against that.
> > >
> > >
> > > If users want to take that risk, it's fine. But we give them the
> option to
> > > protect themselves. With mutable headers, there is no option.
> >
> > If we want to let the users control the mutability, users can always call
> > headers.close() before calling producer.send() and that will force the
> > interceptor to create new ProducerRecord object.
> >
> > Because the headers are mostly useful for interceptors, unless the users
> > do
> > not want the interceptors to change their records, it seems reasonable to
> > say that by default modification of headers are allowed for the
> > interceptors.
> >
> > >
> > >
> > > > But there still seems
> > > > value to allow the users to not pay the overhead of creating tons of
> > > > objects if they do not reuse an object to send it twice, which is
> > > probably
> > > > a more common case.
> > > >
> > >
> > > I think the assumption that there would be tons of objects created is
> > > incorrect (I suggested a solution that would only involve one
> additional
> > > reference in the `Header` instance). The usability of the immutable API
> > > seemed to be more of an issue.
> > >
> > If we do not allow the users to add headers on existing ProducerRecord
> > objects, each interceptor who wants to add headers will have to create a
> > 

[jira] [Created] (KAFKA-4861) log.message.timestamp.type=LogAppendTime breaks Kafka based consumers

2017-03-07 Thread Dustin Cote (JIRA)
Dustin Cote created KAFKA-4861:
--

 Summary: log.message.timestamp.type=LogAppendTime breaks Kafka 
based consumers
 Key: KAFKA-4861
 URL: https://issues.apache.org/jira/browse/KAFKA-4861
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.2.0
Reporter: Dustin Cote
Assignee: Jason Gustafson
Priority: Blocker


Using 0.10.2 brokers with the property 
`log.message.timestamp.type=LogAppendTime` breaks all Kafka-based consumers for 
the cluster. The consumer will return:
{code}
[2017-03-07 15:25:10,215] ERROR Unknown error when running consumer:  
(kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
timestamp of the message is out of acceptable range.
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:535)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:508)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:55)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
{code}

On the broker side you see:
{code}
[2017-03-07 15:25:20,216] INFO [GroupCoordinator 0]: Group 
console-consumer-73205 with generation 2 is now empty 
(kafka.coordinator.GroupCoordinator)
[2017-03-07 15:25:20,217] ERROR [Group Metadata Manager on Broker 0]: Appending 
metadata message for group console-consumer-73205 generation 2 failed due to 
unexpected error: org.apache.kafka.common.errors.InvalidTimestampException 
(kafka.coordinator.GroupMetadataManager)
[2017-03-07 15:25:20,218] WARN [GroupCoordinator 0]: Failed to write empty 
metadata for group console-consumer-73205: The timestamp of the message is out 
of acceptable range. (kafka.coordinator.GroupCoordinator)
{code}

Marking as a blocker since this appears to be a regression in that it doesn't 
happen on 0.10.1.1



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-03-07 Thread Dong Lin
Hey Ismael,

Thanks much for taking time to review the KIP and read through all the
discussion!

Please see my reply inline.

On Tue, Mar 7, 2017 at 9:47 AM, Ismael Juma  wrote:

> Hi Dong,
>
> It took me a while, but I finally went through the whole thread. I have a
> few minor comments:
>
> 1. Regarding the metrics, can we include the full name (e.g.
> kafka.cluster:type=Partition,name=InSyncReplicasCount,
> topic={topic},partition={partition} was defined in KIP-96)?
>
> Certainly. I have updated the KIP to specify the full name.


> 2. We talk about changes in operational procedures for people switching
> from RAID to JBOD, but what about people who are already using JBOD? Since
> disk failures won't necessarily cause broker failures, some adjustments may
> be needed.
>

Good point. I indeed missed one operational procedure for both the existing
RAID/JBOD user. I have updated the KIP to specify the following:

Administrator will need to detect log directory failure by looking at
OfflineLogDirectoriesCount. After log directory failure is detected,
administrator needs to fix disks and reboot broker.


>
> 3. Another point regarding operational procedures, with a large enough
> cluster, disk failures may not be that uncommon. It may be worth explaining
> the recommended procedure if someone needs to do a rolling bounce of a
> cluster with some bad disks. One option is to simply do the bounce and hope
> that the bad disks are detected during restart, but we know that this is
> not guaranteed to happen immediately. A better option may be to remove the
> bad log dirs from the broker config until the disk is replaced.
>

I am not sure if I understand your suggestion here. I think user doesn't
need to differentiate between log directory failure during rolling bounce
and log directory failure during runtime. All they need to do is to detect
and handle log directory failure specified above. And they don't have to
remove the bad log directory immediately from broker config. The only
drawback of keeping log directory there is that a new replica may not be
created on the broker. But the chance of that happening is really low,
since the controller has to fail in a small window after user initiated the
topic creation but before it sends LeaderAndIsrRequest with
is_new_replica=true to the broker. In practice this shouldn't matter.


>
> 4. The test plan doesn't mention the number of log directories per broker.
> It could be good to specify this. Also, we seem to create one topic with
> one partition, which means that only one log directory will be populated.
> It seems like we should have partitions in more than one log directory to
> verify that the failed log directory doesn't affect the ones that are still
> good.
>

Sure. I have updated the test description to specify that each broker will
have two log directories.

The existing test case will actually create 2 topics to validate that
failed log directory won't affect the good ones. You can find them after
"Now validate that the previous leader can still serve replicas on the good
log directories" and "Now validate that the follower can still serve
replicas on the good log directories".


>
> 5. In the protocol definition, we have isNewReplica, but it should probably
> be is_new_replica.
>

Good point. My bad. It is fixed now.


>
> Thanks,
> Ismael
>
>
> On Thu, Jan 12, 2017 at 6:46 PM, Dong Lin  wrote:
>
> > Hi all,
> >
> > We created KIP-112: Handle disk failure for JBOD. Please find the KIP
> wiki
> > in the link https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 112%3A+Handle+disk+failure+for+JBOD.
> >
> > This KIP is related to KIP-113
> >  > 113%3A+Support+replicas+movement+between+log+directories>:
> > Support replicas movement between log directories. They are needed in
> order
> > to support JBOD in Kafka. Please help review the KIP. You feedback is
> > appreciated!
> >
> > Thanks,
> > Dong
> >
>


Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-03-07 Thread Dong Lin
Hi Jun, Ismael,

I think making the API similar to a future KIP is desirable but not
required. Implementation is easy but discussion of the API may take a lot
of time given that we haven't yet reached agreement on KIP-117. Thus I
prefer to just mark the API in Scala as unstable.

I am OK with either delete or purge in the name.

Thanks,
Dong


On Tue, Mar 7, 2017 at 9:59 AM, Jun Rao  wrote:

> Hi, Dong, Ismael,
>
> 1. I just meant that it would be useful to distinguish between removing the
> whole log vs removing a portion of the log. The exact naming is less
> important.
>
> 4. When we move the purgeBefore() api to the Java AdminClient, it would be
> great if the api looks comparable to what's in KIP-117. For now, perhaps we
> can mark the api in Scala as unstable so that people are aware that it's
> subject to change?
>
> Thanks,
>
> Jun
>
> On Fri, Mar 3, 2017 at 11:25 AM, Dong Lin  wrote:
>
> > Hey Ismael,
> >
> > Thank for the detailed explanation. Here is my thought:
> >
> > 1. purge vs. delete
> >
> > We have originally considered purge, delete, truncate and remove. I don't
> > have a strong preference among them and would be OK with any choice here.
> > That is why I didn't provide specific reasoning for selecting purge and
> > instead asked you and Jun for reason to choose between purge/delete.
> >
> > Can you be more specific where do we use "delete" in AdminClient.scala? I
> > couldn't find any usage of "delete" there.
> >
> > "delete" seems to be the only one that is exposed in the wire protocol
> and
> > script to the user. For example, "delete" as an option for
> kafka-topics.sh.
> > And it is used in the name of "DeleteTopicRequest" and a field name in
> the
> > StopReplicaRequest. That is why I slightly prefer "delete" over "purge".
> >
> > But all these names have been used in the Java API that is not exposed
> > directly to the user. For example, We have Log.truncateTo(),
> > DelayedOperation.purgeCompleted(), and MemoryNavigableLRUCache.remove().
> > Also, we haven't yet exposed any Java API to user that uses any of these
> > choices. Thus there is no unanimous choice here and it should be OK to
> > choose any of the "delete", "purge", "truncate" or "remove" and at this
> > stage. I personally don't have any obvious difference among them and am
> OK
> > with any of them.
> >
> > 2. Message vs. Record vs. data in the Java API name.
> >
> > Both "message" and "record"  are used in the Kafka, e.g. MemoryRecords,
> > ProducerRecord, ConsumerRecords, ReplicaManager.appendRecords(),
> > ReplicaManager.fetchMessages(). I remember there was a patch that
> changed
> > method name from using "message" to "record". Since Record is used more
> > widely, I think we should use Record instead of Message going forward.
> >
> > I agree that data is not used anyway and I prefer to change it to record,
> > e.g. purgeRecordBefore(). Does anyone have comment on this?
> >
> >
> > 3. PurgeRecordRequest vs. PurgeRequest
> >
> > As you said, PurgeRequest is consistent with FetchRequest and
> > ProduceRequest and it makes sense if we reserve the word
> > "Purge" for dealing with records/messages. I am not aware of anything
> other
> > than "record/message" that we may want to purge in the future. Even if
> > there is, I am not sure this would be an issue. For example, we can just
> > create PurgeXXXRequest similar to DeleteTopicsRequest. If we name the new
> > request ad PurgeRecordsRequest, it will be different from FetchRequest
> and
> > ProduceRequest which is probably more confusing to user. Thus I prefer to
> > keep the request name as PurgeRequest.
> >
> >
> > 4. Change method signature to encapsulate the parameters and result as
> does
> > in KIP-117.
> >
> > I don't think we should do it in KIP-107. First, KIP-117 is still under
> > discussion while KIP-107 has been reviewed for a few rounds and is almost
> > ready for commit. Changing the API at this moment will require more
> > discussion and delay progress. We should try to avoid that. Second, I
> think
> > it is OK for KIP-107 to have different API from KIP-117. The later KIP is
> > free to do what it wants and the earlier KIP should not depend on the
> later
> > KIP. User will need to change API anyway when they switch from Scala
> > AdminClient to Java AdminClient.
> >
> > Dong
> >
> >
> > On Fri, Mar 3, 2017 at 6:34 AM, Ismael Juma  wrote:
> >
> > > First of all, sorry to arrive late on this.
> > >
> > > Jun, do you have a reference that states that "purge" means to remove a
> > > portion? If I do "define: purge" on Google, one of the definitions is
> > > "physically remove (something) completely."
> > >
> > > In the PR, I was asking about the reasoning more than suggesting a
> > change.
> > > But let me clarify my thoughts. There are 2 separate things to think
> > about:
> > >
> > > 1. The protocol change.
> > >
> > > It's currently called Purge with no mention of what it's purging. This

Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-07 Thread Becket Qin
Hi Rajini/Jun,

The percentage based reasoning sounds good.
One thing I am wondering is that if we assume the network thread are just
doing the network IO, can we say bytes rate quota is already sort of
network threads quota?
If we take network threads into the consideration here, would that be
somewhat overlapping with the bytes rate quota?

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 7, 2017 at 11:04 AM, Rajini Sivaram 
wrote:

> Jun,
>
> Thank you for the explanation, I hadn't realized you meant percentage of
> the total thread pool. If everyone is OK with Jun's suggestion, I will
> update the KIP.
>
> Thanks,
>
> Rajini
>
> On Tue, Mar 7, 2017 at 5:08 PM, Jun Rao  wrote:
>
> > Hi, Rajini,
> >
> > Let's take your example. Let's say a user sets the limit to 50%. I am not
> > sure if it's better to apply the same percentage separately to network
> and
> > io thread pool. For example, for produce requests, most of the time will
> be
> > spent in the io threads whereas for fetch requests, most of the time will
> > be in the network threads. So, using the same percentage in both thread
> > pools means one of the pools' resource will be over allocated.
> >
> > An alternative way is to simply model network and io thread pool
> together.
> > If you get 10 io threads and 5 network threads, you get 1500% request
> > processing power. A 50% limit means a total of 750% processing power. We
> > just add up the time a user request spent in either network or io thread.
> > If that total exceeds 750% (doesn't matter whether it's spent more in
> > network or io thread), the request will be throttled. This seems more
> > general and is not sensitive to the current implementation detail of
> having
> > a separate network and io thread pool. In the future, if the threading
> > model changes, the same concept of quota can still be applied. For now,
> > since it's a bit tricky to add the delay logic in the network thread
> pool,
> > we could probably just do the delaying only in the io threads as you
> > suggested earlier.
> >
> > There is still the orthogonal question of whether a quota of 50% is out
> of
> > 100% or 100% * #total processing threads. My feeling is that the latter
> is
> > slightly better based on my explanation earlier. The way to describe this
> > quota to the users can be "share of elapsed request processing time on a
> > single CPU" (similar to top).
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram 
> > wrote:
> >
> > > Jun,
> > >
> > > Agree about the two scenarios.
> > >
> > > But still not sure about a single quota covering both network threads
> and
> > > I/O threads with per-thread quota. If there are 10 I/O threads and 5
> > > network threads and I want to assign half the quota to userA, the quota
> > > would be 750%. I imagine, internally, we would convert this to 500% for
> > I/O
> > > and 250% for network threads to allocate 50% of each pool.
> > >
> > > A couple of scenarios:
> > >
> > > 1. Admin adds 1 extra network thread. To retain 50%, admin needs to now
> > > allocate 800% for each user. Or increase the quota for a few users. To
> > me,
> > > it feels like admin needs to convert 50% to 800% and Kafka internally
> > needs
> > > to convert 800% to (500%, 300%). Everyone using just 50% feels a lot
> > > simpler.
> > >
> > > 2. We decide to add some other thread to this list. Admin needs to know
> > > exactly how many threads form the maximum quota. And we can be changing
> > > this between broker versions as we add more to the list. Again a single
> > > overall percent would be a lot simpler.
> > >
> > > There were others who were unconvinced by a single percent from the
> > initial
> > > proposal and were happier with thread units similar to CPU units, so I
> am
> > > ok with going with per-thread quotas (as units or percent). Just not
> sure
> > > it makes it easier for admin in all cases.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > > On Fri, Mar 3, 2017 at 6:03 AM, Jun Rao  wrote:
> > >
> > > > Hi, Rajini,
> > > >
> > > > Consider modeling as n * 100% unit. For 2), the question is what's
> > > causing
> > > > the I/O threads to be saturated. It's unlikely that all users'
> > > utilization
> > > > have increased at the same. A more likely case is that a few isolated
> > > > users' utilization have increased. If so, after increasing the number
> > of
> > > > threads, the admin just needs to adjust the quota for a few isolated
> > > users,
> > > > which is expected and is less work.
> > > >
> > > > Consider modeling as 1 * 100% unit. For 1), all users' quota need to
> be
> > > > adjusted, which is unexpected and is more work.
> > > >
> > > > So, to me, the n * 100% model seems more convenient.
> > > >
> > > > As for future extension to cover network thread utilization, I was
> > > thinking
> > > > that one way is to simply model the capacity as (n + m) * 

[jira] [Updated] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-07 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4859:
---
Component/s: streams

> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-07 Thread Rajini Sivaram
Jun,

Thank you for the explanation, I hadn't realized you meant percentage of
the total thread pool. If everyone is OK with Jun's suggestion, I will
update the KIP.

Thanks,

Rajini

On Tue, Mar 7, 2017 at 5:08 PM, Jun Rao  wrote:

> Hi, Rajini,
>
> Let's take your example. Let's say a user sets the limit to 50%. I am not
> sure if it's better to apply the same percentage separately to network and
> io thread pool. For example, for produce requests, most of the time will be
> spent in the io threads whereas for fetch requests, most of the time will
> be in the network threads. So, using the same percentage in both thread
> pools means one of the pools' resource will be over allocated.
>
> An alternative way is to simply model network and io thread pool together.
> If you get 10 io threads and 5 network threads, you get 1500% request
> processing power. A 50% limit means a total of 750% processing power. We
> just add up the time a user request spent in either network or io thread.
> If that total exceeds 750% (doesn't matter whether it's spent more in
> network or io thread), the request will be throttled. This seems more
> general and is not sensitive to the current implementation detail of having
> a separate network and io thread pool. In the future, if the threading
> model changes, the same concept of quota can still be applied. For now,
> since it's a bit tricky to add the delay logic in the network thread pool,
> we could probably just do the delaying only in the io threads as you
> suggested earlier.
>
> There is still the orthogonal question of whether a quota of 50% is out of
> 100% or 100% * #total processing threads. My feeling is that the latter is
> slightly better based on my explanation earlier. The way to describe this
> quota to the users can be "share of elapsed request processing time on a
> single CPU" (similar to top).
>
> Thanks,
>
> Jun
>
>
> On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram 
> wrote:
>
> > Jun,
> >
> > Agree about the two scenarios.
> >
> > But still not sure about a single quota covering both network threads and
> > I/O threads with per-thread quota. If there are 10 I/O threads and 5
> > network threads and I want to assign half the quota to userA, the quota
> > would be 750%. I imagine, internally, we would convert this to 500% for
> I/O
> > and 250% for network threads to allocate 50% of each pool.
> >
> > A couple of scenarios:
> >
> > 1. Admin adds 1 extra network thread. To retain 50%, admin needs to now
> > allocate 800% for each user. Or increase the quota for a few users. To
> me,
> > it feels like admin needs to convert 50% to 800% and Kafka internally
> needs
> > to convert 800% to (500%, 300%). Everyone using just 50% feels a lot
> > simpler.
> >
> > 2. We decide to add some other thread to this list. Admin needs to know
> > exactly how many threads form the maximum quota. And we can be changing
> > this between broker versions as we add more to the list. Again a single
> > overall percent would be a lot simpler.
> >
> > There were others who were unconvinced by a single percent from the
> initial
> > proposal and were happier with thread units similar to CPU units, so I am
> > ok with going with per-thread quotas (as units or percent). Just not sure
> > it makes it easier for admin in all cases.
> >
> > Regards,
> >
> > Rajini
> >
> >
> > On Fri, Mar 3, 2017 at 6:03 AM, Jun Rao  wrote:
> >
> > > Hi, Rajini,
> > >
> > > Consider modeling as n * 100% unit. For 2), the question is what's
> > causing
> > > the I/O threads to be saturated. It's unlikely that all users'
> > utilization
> > > have increased at the same. A more likely case is that a few isolated
> > > users' utilization have increased. If so, after increasing the number
> of
> > > threads, the admin just needs to adjust the quota for a few isolated
> > users,
> > > which is expected and is less work.
> > >
> > > Consider modeling as 1 * 100% unit. For 1), all users' quota need to be
> > > adjusted, which is unexpected and is more work.
> > >
> > > So, to me, the n * 100% model seems more convenient.
> > >
> > > As for future extension to cover network thread utilization, I was
> > thinking
> > > that one way is to simply model the capacity as (n + m) * 100% unit,
> > where
> > > n and m are the number of network and i/o threads, respectively. Then,
> > for
> > > each user, we can just add up the utilization in the network and the
> i/o
> > > thread. If we do this, we don't need a new type of quota.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Mar 2, 2017 at 12:27 PM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Jun,
> > > >
> > > > If we use request.percentage as the percentage used in a single I/O
> > > thread,
> > > > the total percentage being allocated will be num.io.threads * 100 for
> > I/O
> > > > threads and num.network.threads * 100 for network threads. A single
> > quota
> > 

Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-03-07 Thread Jun Rao
Hi, Dong, Ismael,

1. I just meant that it would be useful to distinguish between removing the
whole log vs removing a portion of the log. The exact naming is less
important.

4. When we move the purgeBefore() api to the Java AdminClient, it would be
great if the api looks comparable to what's in KIP-117. For now, perhaps we
can mark the api in Scala as unstable so that people are aware that it's
subject to change?

Thanks,

Jun

On Fri, Mar 3, 2017 at 11:25 AM, Dong Lin  wrote:

> Hey Ismael,
>
> Thank for the detailed explanation. Here is my thought:
>
> 1. purge vs. delete
>
> We have originally considered purge, delete, truncate and remove. I don't
> have a strong preference among them and would be OK with any choice here.
> That is why I didn't provide specific reasoning for selecting purge and
> instead asked you and Jun for reason to choose between purge/delete.
>
> Can you be more specific where do we use "delete" in AdminClient.scala? I
> couldn't find any usage of "delete" there.
>
> "delete" seems to be the only one that is exposed in the wire protocol and
> script to the user. For example, "delete" as an option for kafka-topics.sh.
> And it is used in the name of "DeleteTopicRequest" and a field name in the
> StopReplicaRequest. That is why I slightly prefer "delete" over "purge".
>
> But all these names have been used in the Java API that is not exposed
> directly to the user. For example, We have Log.truncateTo(),
> DelayedOperation.purgeCompleted(), and MemoryNavigableLRUCache.remove().
> Also, we haven't yet exposed any Java API to user that uses any of these
> choices. Thus there is no unanimous choice here and it should be OK to
> choose any of the "delete", "purge", "truncate" or "remove" and at this
> stage. I personally don't have any obvious difference among them and am OK
> with any of them.
>
> 2. Message vs. Record vs. data in the Java API name.
>
> Both "message" and "record"  are used in the Kafka, e.g. MemoryRecords,
> ProducerRecord, ConsumerRecords, ReplicaManager.appendRecords(),
> ReplicaManager.fetchMessages(). I remember there was a patch that changed
> method name from using "message" to "record". Since Record is used more
> widely, I think we should use Record instead of Message going forward.
>
> I agree that data is not used anyway and I prefer to change it to record,
> e.g. purgeRecordBefore(). Does anyone have comment on this?
>
>
> 3. PurgeRecordRequest vs. PurgeRequest
>
> As you said, PurgeRequest is consistent with FetchRequest and
> ProduceRequest and it makes sense if we reserve the word
> "Purge" for dealing with records/messages. I am not aware of anything other
> than "record/message" that we may want to purge in the future. Even if
> there is, I am not sure this would be an issue. For example, we can just
> create PurgeXXXRequest similar to DeleteTopicsRequest. If we name the new
> request ad PurgeRecordsRequest, it will be different from FetchRequest and
> ProduceRequest which is probably more confusing to user. Thus I prefer to
> keep the request name as PurgeRequest.
>
>
> 4. Change method signature to encapsulate the parameters and result as does
> in KIP-117.
>
> I don't think we should do it in KIP-107. First, KIP-117 is still under
> discussion while KIP-107 has been reviewed for a few rounds and is almost
> ready for commit. Changing the API at this moment will require more
> discussion and delay progress. We should try to avoid that. Second, I think
> it is OK for KIP-107 to have different API from KIP-117. The later KIP is
> free to do what it wants and the earlier KIP should not depend on the later
> KIP. User will need to change API anyway when they switch from Scala
> AdminClient to Java AdminClient.
>
> Dong
>
>
> On Fri, Mar 3, 2017 at 6:34 AM, Ismael Juma  wrote:
>
> > First of all, sorry to arrive late on this.
> >
> > Jun, do you have a reference that states that "purge" means to remove a
> > portion? If I do "define: purge" on Google, one of the definitions is
> > "physically remove (something) completely."
> >
> > In the PR, I was asking about the reasoning more than suggesting a
> change.
> > But let me clarify my thoughts. There are 2 separate things to think
> about:
> >
> > 1. The protocol change.
> >
> > It's currently called Purge with no mention of what it's purging. This is
> > consistent with Fetch and Produce and it makes sense if we reserve the
> word
> > "Purge" for dealing with records/messages. Having said that, I don't
> think
> > this is particularly intuitive for people who are not familiar with Kafka
> > and its history. The number of APIs in the protocol keeps growing and it
> > would be better to be explicit about what is being purged/deleted, in my
> > opinion. If we are explicit, then we need to decide what to call it,
> since
> > there is no precedent. A few options: PurgeRecords, PurgeMessages,
> > PurgeData, DeleteRecords, DeleteMessages, DeleteData (I personally don't
> > 

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-07 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899847#comment-15899847
 ] 

Edoardo Comar commented on KAFKA-4669:
--

[~ijuma] It's the same NPE as

https://issues.apache.org/jira/browse/KAFKA-3689?focusedCommentId=15383936=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15383936

could it be caused by an exotic client ?

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4844) kafka is holding open file descriptors

2017-03-07 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899848#comment-15899848
 ] 

Colin P. McCabe commented on KAFKA-4844:


http://nfs.sourceforge.net/#faq_d2

bq. D2. What is a "silly rename"? Why do these .nfsX files keep showing up?
bq. A. Unix applications often open a scratch file and then unlink it. They do 
this so that the file is not visible in the file system name space to any other 
applications, and so that the system will automatically clean up (delete) the 
file when the application exits. This is known as "delete on last close", and 
is a tradition among Unix applications. Because of the design of the NFS 
protocol, there is no way for a file to be deleted from the name space but 
still remain in use by an application. Thus NFS clients have to emulate this 
using what already exists in the protocol. If an open file is unlinked, an NFS 
client renames it to a special name that looks like ".nfsX". This "hides" 
the file while it remains in use. This is known as a "silly rename." Note that 
NFS servers have nothing to do with this behavior.  After all applications on a 
client have closed the silly-renamed file, the client automatically finishes 
the unlink by deleting the file on the server. Generally this is effective, but 
if the client crashes before the file is removed, it will leave the .nfsX 
file. If you are sure that the applications using these files are no longer 
running, it is safe to delete these files manually.  The NFS version 4 protocol 
is stateful, and could actually support delete-on-last-close. Unfortunately 
there isn't an easy way to do this and remain backwards-compatible with version 
2 and 3 accessors.

> kafka is holding open file descriptors
> --
>
> Key: KAFKA-4844
> URL: https://issues.apache.org/jira/browse/KAFKA-4844
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: chao
>Priority: Critical
>
> We found strange issue on Kafka 0.9.0.1 , kafka is holding opne file 
> descriptors , and not allowing disk space to be reclaimed
> my question:
> 1. what does file (nfsX) mean ??? 
> 2. why kafka is holding file ?? 
> $ sudo lsof /nas/kafka_logs/kafka/Order-6/.nfs04550ffcbd61
> COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
> java 97465 kafka mem REG 0,25 10485760 72683516 
> /nas/kafka_logs/kafka/Order-6/.nfs04550ffcbd61



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-07 Thread Becket Qin
Hi Dong,

Thanks for the comments.

The patch is mostly for proof of concept in case there is any concern about
the implementation which is indeed a little tricky.

The new metric has already been mentioned in the Public Interface Change
section.

I added the reasoning about how the compression ratio improving/deteriorate
steps are determined in the wiki.

Thanks,

Jiangjie (Becket) Qin


On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin  wrote:

> Hey Becket,
>
> I am wondering if we should first vote for the KIP before reviewing the
> patch. I have two comments below:
>
> - Should we specify the new sensors as part of interface change in the KIP?
> - The KIP proposes to increase estimated compression ratio by 0.05 for each
> underestimation and decrement the estimation by 0.005 for each
> overestimation. Why are these two values chosen? I think there is some
> tradeoff in selecting the value. Can the KIP be more explicit about the
> tradeoff and explain how these two values would impact producer's
> performance?
>
> Thanks,
> Dong
>
>
> On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin  wrote:
>
> > I have updated the KIP based on the latest discussion. Please check and
> let
> > me know if there is any further concern.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin 
> wrote:
> >
> > > Actually second thought on this, rate might be better for two reasons:
> > > 1. Most of the metrics in the producer we already have are using rate
> > > instead of count.
> > > 2. If a service is bounced, the count will be reset to 0, but it does
> not
> > > affect rate.
> > >
> > > I'll make the change.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin 
> > wrote:
> > >
> > >> Hi Dong,
> > >>
> > >> Yes, there is a sensor in the patch about the split occurrence.
> > >>
> > >> Currently it is a count instead of rate. In practice, it seems count
> is
> > >> easier to use in this case. But I am open to change.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin  wrote:
> > >>
> > >>> Hey Becket,
> > >>>
> > >>> I haven't looked at the patch yet. But since we are going to try the
> > >>> split-on-oversize solution, should the KIP also add a sensor that
> shows
> > >>> the
> > >>> rate of split per second and the probability of split?
> > >>>
> > >>> Thanks,
> > >>> Dong
> > >>>
> > >>>
> > >>> On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin 
> > wrote:
> > >>>
> > >>> > Just to clarify, the implementation is basically what I mentioned
> > above
> > >>> > (split/resend + adjusted estimation evolving algorithm) and
> changing
> > >>> the
> > >>> > compression ratio estimation to be per topic.
> > >>> >
> > >>> > Thanks,
> > >>> >
> > >>> > Jiangjie (Becket) Qin
> > >>> >
> > >>> > On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin 
> > >>> wrote:
> > >>> >
> > >>> > > I went ahead and have a patch submitted here:
> > >>> > > https://github.com/apache/kafka/pull/2638
> > >>> > >
> > >>> > > Per Joel's suggestion, I changed the compression ratio to be per
> > >>> topic as
> > >>> > > well. It seems working well. Since there is an important behavior
> > >>> change
> > >>> > > and a new sensor is added, I'll keep the KIP and update it
> > according.
> > >>> > >
> > >>> > > Thanks,
> > >>> > >
> > >>> > > Jiangjie (Becket) Qin
> > >>> > >
> > >>> > > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy  >
> > >>> wrote:
> > >>> > >
> > >>> > >> >
> > >>> > >> > Lets say we sent the batch over the wire and received a
> > >>> > >> > RecordTooLargeException, how do we split it as once we add the
> > >>> message
> > >>> > >> to
> > >>> > >> > the batch we loose the message level granularity. We will have
> > to
> > >>> > >> > decompress, do deep iteration and split and again compress.
> > right?
> > >>> > This
> > >>> > >> > looks like a performance bottle neck in case of multi topic
> > >>> producers
> > >>> > >> like
> > >>> > >> > mirror maker.
> > >>> > >> >
> > >>> > >>
> > >>> > >> Yes, but these should be outliers if we do estimation on a
> > per-topic
> > >>> > basis
> > >>> > >> and if we target a conservative-enough compression ratio. The
> > >>> producer
> > >>> > >> should also avoid sending over the wire if it can be made aware
> of
> > >>> the
> > >>> > >> max-message size limit on the broker, and split if it determines
> > >>> that a
> > >>> > >> record exceeds the broker's config. Ideally this should be part
> of
> > >>> topic
> > >>> > >> metadata but is not - so it could be off a periodic
> > describe-configs
> > >>> > >>  > >>> > >> Command+line+and+centralized+administrative+operations#KIP-
> > >>> > >> 

[jira] [Commented] (KAFKA-4566) Can't Symlink to Kafka bins

2017-03-07 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899843#comment-15899843
 ] 

Colin P. McCabe commented on KAFKA-4566:


bq. I know this is a bit more than intended than the original JIRA, but have 
you thought about wrapping these functions into a single CLI, where these kinds 
of reusable methods/tasks would all be in one place?

We have talked about creating a monocommand in the past.  As you said, though, 
it's a much bigger scope than this jira

> Can't Symlink to Kafka bins
> ---
>
> Key: KAFKA-4566
> URL: https://issues.apache.org/jira/browse/KAFKA-4566
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Stephane Maarek
>Assignee: Akhilesh Naidu
>  Labels: newbie
>
> in the kafka consumer for example, the last line is :
> https://github.com/apache/kafka/blob/trunk/bin/kafka-console-consumer.sh#L21
> {code}
> exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
> {code}
> if I create a symlink using 
> {code}
> ln -s
> {code}
> it doesn't resolve the right directory name because of $(dirname $0) 
> I believe the right way is to do:
> {code}
> "$(dirname "$(readlink -e "$0")")"
> {code}
>  
> Any thoughts on that before I do a PR?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899835#comment-15899835
 ] 

Ismael Juma commented on KAFKA-4669:


It seems plausible that if that NPE happens, we process the next request from 
the producer without responding to the previous one.

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899832#comment-15899832
 ] 

Ismael Juma commented on KAFKA-4669:


That NPE looks like KAFKA-3689.

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-03-07 Thread Ismael Juma
Hi Dong,

It took me a while, but I finally went through the whole thread. I have a
few minor comments:

1. Regarding the metrics, can we include the full name (e.g.
kafka.cluster:type=Partition,name=InSyncReplicasCount,
topic={topic},partition={partition} was defined in KIP-96)?

2. We talk about changes in operational procedures for people switching
from RAID to JBOD, but what about people who are already using JBOD? Since
disk failures won't necessarily cause broker failures, some adjustments may
be needed.

3. Another point regarding operational procedures, with a large enough
cluster, disk failures may not be that uncommon. It may be worth explaining
the recommended procedure if someone needs to do a rolling bounce of a
cluster with some bad disks. One option is to simply do the bounce and hope
that the bad disks are detected during restart, but we know that this is
not guaranteed to happen immediately. A better option may be to remove the
bad log dirs from the broker config until the disk is replaced.

4. The test plan doesn't mention the number of log directories per broker.
It could be good to specify this. Also, we seem to create one topic with
one partition, which means that only one log directory will be populated.
It seems like we should have partitions in more than one log directory to
verify that the failed log directory doesn't affect the ones that are still
good.

5. In the protocol definition, we have isNewReplica, but it should probably
be is_new_replica.

Thanks,
Ismael


On Thu, Jan 12, 2017 at 6:46 PM, Dong Lin  wrote:

> Hi all,
>
> We created KIP-112: Handle disk failure for JBOD. Please find the KIP wiki
> in the link https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 112%3A+Handle+disk+failure+for+JBOD.
>
> This KIP is related to KIP-113
>  113%3A+Support+replicas+movement+between+log+directories>:
> Support replicas movement between log directories. They are needed in order
> to support JBOD in Kafka. Please help review the KIP. You feedback is
> appreciated!
>
> Thanks,
> Dong
>


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-03-07 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899827#comment-15899827
 ] 

Edoardo Comar commented on KAFKA-4669:
--

We have found a strong correlation between the clients getting 

{code}
Uncaught error in kafka producer I/O thread: 
java.lang.IllegalStateException: Correlation id for response (703766) does not 
match request (703764)
{code}

and an NPE in one of our 10.0.1 brokers
{code}
[2017-03-06 17:46:29,827] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.NullPointerException
at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:486)
at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:809)
{code}

that suggest that somehow 
{code}
  private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
  try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
//NPE if channel is null
...
{code}

[~ijuma] the only clients that are getting the occasional IllegalStateException 
are the ones producing to a partition that has as leader a broker where that 
NPE is appearing in our logs.

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4437) Incremental Batch Processing for Kafka Streams

2017-03-07 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899823#comment-15899823
 ] 

Matthias J. Sax commented on KAFKA-4437:


Kafka 0.10.2 got released already not containing this feature. Right now, the 
KIP is still under discussion but nobody is actively working on it. It might 
not even make it into next release 0.11.0.0 -- it does not seem that anybody 
finds time picking it up at the moment :(

> Incremental Batch Processing for Kafka Streams
> --
>
> Key: KAFKA-4437
> URL: https://issues.apache.org/jira/browse/KAFKA-4437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> We want to add an “auto stop” feature that terminate a stream application 
> when it has processed all the data that was newly available at the time the 
> application started (to at current end-of-log, i.e., current high watermark). 
> This allows to chop the (infinite) log into finite chunks where each run for 
> the application processes one chunk. This feature allows for incremental 
> batch-like processing; think "start-process-stop-restart-process-stop-..."
> For details see KIP-95: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-07 Thread Jun Rao
Hi, Guozhang,

Thanks for the KIP. A couple of comments.

1. About the impact on producer batching. My understanding is that
typically different sub-topologies in the same task are publishing to
different topics. Since the producer batching happens at the
topic/partition level, using a producer per task may not impact batching
much.

2. When processing.guarantee is set to exactly_once, do we want to enforce
acks to all in the producer? The default acks is 1 and may cause acked data
to be lost later when the leader changes.

Thanks,

Jun

On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy  wrote:

> Hi Matthias,
>
> Thanks. The perf test is a good start but I don't think it goes far enough.
> 100 partitions is not a lot. What happens when there are thousands of
> partitions? What is the load on the brokers? How much more memory is used
> by the Streams App etc?
>
> Thanks,
> Damian
>
> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax  wrote:
>
> > Hi,
> >
> > I want to give a first respond:
> >
> >
> >
> > 1. Producer per task:
> >
> > First, we did some performance tests, indicating that the performance
> > penalty is small. Please have a look here:
> >
> > https://docs.google.com/spreadsheets/d/18aGOB13-
> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> >
> > For the test, we ran with a trunk version and a modified version that
> > uses a producer per task (of course, no transactions, but at-least-once
> > semantics). The scaling factor indicates the number of brokers and
> > (single threaded) Streams instances. We used SimpleBenchmark that is
> > part of AK code base.
> >
> >
> > Second, as the design is "producer per task" (and not "producer per
> > partition") it is possible to specify a custom PartitionGrouper that
> > assigns multiple partitions to a single task. Thus, it allows to reduce
> > the number of tasks for scenarios with many partitions. Right now, this
> > interface must be implemented solely by the user, but we could also add
> > a new config parameter that specifies the max.number.of.tasks or
> > partitions.per.task so that the user can configure this instead of
> > implementing the interface.
> >
> > Third, there is the idea of a "Producer Pool" that would allow to share
> > resources (network connections, memory, etc) over multiple producers.
> > This would allow to separate multiple transaction on the producer level,
> > while resources are shared. There is no detailed design document yet and
> > there would be a KIP for this feature.
> >
> > Thus, if there should be any performance problems for high scale
> > scenarios, there are multiple ways to tackle them while keeping the
> > "producer per task" design.
> >
> > Additionally, a "producer per thread" design would be way more complex
> > and I summarized the issues in a separate document. I will share a link
> > to the document soon.
> >
> >
> >
> > 2. StateStore recovery:
> >
> > Streams EoS will in the first design not allow to exploit the
> > improvements that are added for 0.11 at the moment. However, as 0.10.2
> > faces the same issues of potentially long recovery, there is no
> > regression with this regard. Thus, I see those improvements as
> > orthogonal or add-ons. Nevertheless, we should try to explore those
> > options and if possible get them into 0.11 such that Streams with EoS
> > gets the same improvements as at-least-once scenario.
> >
> >
> >
> > 3. Caching:
> >
> > We might need to do some experiments to quantify the impact on caching.
> > If it's severe, the suggested default commit interval of 100ms could
> > also be increased. Also, EoS will not enforce any commit interval, but
> > only change the default value. Thus, a user can freely trade-off latency
> > vs. caching-effect.
> >
> > Last but not least, there is the idea to allow "read_uncommitted" for
> > intermediate topic. This would be an advance design for Streams EoS that
> > allows downstream sub-topologies to read uncommitted data
> > optimistically. In case of failure, a cascading abort of transactions
> > would be required. This change will need another KIP.
> >
> >
> >
> > 4. Idempotent Producer:
> >
> > The transactional part automatically leverages the idempotent properties
> > of the producer. Idempotency is a requirement:
> >
> > > Note that enable.idempotence must be enabled if a TransactionalId is
> > configured.
> >
> > See
> >
> > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> >
> > All idempotent retries, are handled by the producer internally (with or
> > without transaction) if enable.idempotence is set to true.
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 3/3/17 3:34 AM, Eno Thereska wrote:
> > > Another question:
> > >
> > > The KIP doesn’t exactly spell out how it uses the idempotence guarantee
> > from KIP-98. It seems that only the transactional part is needed. Or is
> the
> > idempotence guarantee working behind the scenes and 

Re: [DISCUSS] KIP-124: Request rate quotas

2017-03-07 Thread Jun Rao
Hi, Rajini,

Let's take your example. Let's say a user sets the limit to 50%. I am not
sure if it's better to apply the same percentage separately to network and
io thread pool. For example, for produce requests, most of the time will be
spent in the io threads whereas for fetch requests, most of the time will
be in the network threads. So, using the same percentage in both thread
pools means one of the pools' resource will be over allocated.

An alternative way is to simply model network and io thread pool together.
If you get 10 io threads and 5 network threads, you get 1500% request
processing power. A 50% limit means a total of 750% processing power. We
just add up the time a user request spent in either network or io thread.
If that total exceeds 750% (doesn't matter whether it's spent more in
network or io thread), the request will be throttled. This seems more
general and is not sensitive to the current implementation detail of having
a separate network and io thread pool. In the future, if the threading
model changes, the same concept of quota can still be applied. For now,
since it's a bit tricky to add the delay logic in the network thread pool,
we could probably just do the delaying only in the io threads as you
suggested earlier.

There is still the orthogonal question of whether a quota of 50% is out of
100% or 100% * #total processing threads. My feeling is that the latter is
slightly better based on my explanation earlier. The way to describe this
quota to the users can be "share of elapsed request processing time on a
single CPU" (similar to top).

Thanks,

Jun


On Fri, Mar 3, 2017 at 4:22 AM, Rajini Sivaram 
wrote:

> Jun,
>
> Agree about the two scenarios.
>
> But still not sure about a single quota covering both network threads and
> I/O threads with per-thread quota. If there are 10 I/O threads and 5
> network threads and I want to assign half the quota to userA, the quota
> would be 750%. I imagine, internally, we would convert this to 500% for I/O
> and 250% for network threads to allocate 50% of each pool.
>
> A couple of scenarios:
>
> 1. Admin adds 1 extra network thread. To retain 50%, admin needs to now
> allocate 800% for each user. Or increase the quota for a few users. To me,
> it feels like admin needs to convert 50% to 800% and Kafka internally needs
> to convert 800% to (500%, 300%). Everyone using just 50% feels a lot
> simpler.
>
> 2. We decide to add some other thread to this list. Admin needs to know
> exactly how many threads form the maximum quota. And we can be changing
> this between broker versions as we add more to the list. Again a single
> overall percent would be a lot simpler.
>
> There were others who were unconvinced by a single percent from the initial
> proposal and were happier with thread units similar to CPU units, so I am
> ok with going with per-thread quotas (as units or percent). Just not sure
> it makes it easier for admin in all cases.
>
> Regards,
>
> Rajini
>
>
> On Fri, Mar 3, 2017 at 6:03 AM, Jun Rao  wrote:
>
> > Hi, Rajini,
> >
> > Consider modeling as n * 100% unit. For 2), the question is what's
> causing
> > the I/O threads to be saturated. It's unlikely that all users'
> utilization
> > have increased at the same. A more likely case is that a few isolated
> > users' utilization have increased. If so, after increasing the number of
> > threads, the admin just needs to adjust the quota for a few isolated
> users,
> > which is expected and is less work.
> >
> > Consider modeling as 1 * 100% unit. For 1), all users' quota need to be
> > adjusted, which is unexpected and is more work.
> >
> > So, to me, the n * 100% model seems more convenient.
> >
> > As for future extension to cover network thread utilization, I was
> thinking
> > that one way is to simply model the capacity as (n + m) * 100% unit,
> where
> > n and m are the number of network and i/o threads, respectively. Then,
> for
> > each user, we can just add up the utilization in the network and the i/o
> > thread. If we do this, we don't need a new type of quota.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Mar 2, 2017 at 12:27 PM, Rajini Sivaram  >
> > wrote:
> >
> > > Jun,
> > >
> > > If we use request.percentage as the percentage used in a single I/O
> > thread,
> > > the total percentage being allocated will be num.io.threads * 100 for
> I/O
> > > threads and num.network.threads * 100 for network threads. A single
> quota
> > > covering the two as a percentage wouldn't quite work if you want to
> > > allocate the same proportion in both cases. If we want to treat threads
> > as
> > > separate units, won't we need two quota configurations regardless of
> > > whether we use units or percentage? Perhaps I misunderstood your
> > > suggestion.
> > >
> > > I think there are two cases:
> > >
> > >1. The use case that you mentioned where an admin is adding more
> users
> > >and decides to add more I/O 

[GitHub] kafka pull request #2652: MINOR: add warning when offset topic is not replic...

2017-03-07 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/2652

MINOR: add warning when offset topic is not replicated properly

Added a warning, otherwise this problem is hard to debug if you haven't 
followed the latest KIP closely.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
minor-warning-enforcement-offset

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2652


commit 187f9b6961eb6f54d4d9570a545b3a74cb4bee7e
Author: Eno Thereska 
Date:   2017-03-05T14:57:45Z

Temporarily use only 2 relevant tests for branch builder

commit 87f06a893e799e81a9bffce57c203d331d6cb8b6
Author: Eno Thereska 
Date:   2017-03-05T14:58:52Z

Undo previous

commit 978f925b606d552e047df480658eec542797f853
Author: Eno Thereska 
Date:   2017-03-06T10:12:20Z

Merge remote-tracking branch 'apache-kafka/trunk' into trunk

commit 57856b0ce4e1f9259f8a5a3f10b6cbe44eb013f7
Author: Eno Thereska 
Date:   2017-03-06T14:38:38Z

Merge remote-tracking branch 'apache-kafka/trunk' into trunk

commit 5086dd7a5e5579a0398db914aa8a6d24ca4de187
Author: Eno Thereska 
Date:   2017-03-07T11:21:51Z

Merge remote-tracking branch 'apache-kafka/trunk' into trunk

commit 6160ad81599cdf894255e82ed57f28c814bb4e5e
Author: Eno Thereska 
Date:   2017-03-07T16:43:16Z

Added warning




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4849) Bug in KafkaStreams documentation

2017-03-07 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899743#comment-15899743
 ] 

Matthias J. Sax commented on KAFKA-4849:


[~habdank] Your Jira description says: "2. The client.id and zookeeper.connect 
are marked by high importance, but according to 
http://docs.confluent.io/3.2.0/streams/developer-guide.html none of them are 
important to initialize the stream." I don't understand the second sentence: 
"but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
none of them are important to initialize the stream". If I am not wrong, 
Confluent Docs about Streams don't say anything about "importance" of any 
parameter? Can you elaborate what inconsistency you mean?

> Bug in KafkaStreams documentation
> -
>
> Key: KAFKA-4849
> URL: https://issues.apache.org/jira/browse/KAFKA-4849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Matthias J. Sax
>Priority: Minor
>
> At the page: https://kafka.apache.org/documentation/streams
>  
> In the chapter titled Application Configuration and Execution, in the example 
> there is a line:
>  
> settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
>  
> but ZOOKEEPER_CONNECT_CONFIG is deprecated in the Kafka version 0.10.2.0.
>  
> Also the table on the page: 
> https://kafka.apache.org/0102/documentation/#streamsconfigs is a bit 
> misleading.
> 1. Again zookeeper.connect is deprecated.
> 2. The client.id and zookeeper.connect are marked by high importance, 
> but according to http://docs.confluent.io/3.2.0/streams/developer-guide.html 
> none of them are important to initialize the stream.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4857) Improve Client handling

2017-03-07 Thread Sharad (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharad reassigned KAFKA-4857:
-

Assignee: Sharad

> Improve Client handling
> ---
>
> Key: KAFKA-4857
> URL: https://issues.apache.org/jira/browse/KAFKA-4857
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>  Labels: needs-kip
>
> Streams uses {{KafkaClientSupplier}} to get 
> consumer/restore-consumer/producer clients. Streams also uses one more client 
> for admin purpose namely {{StreamsKafkaClient}} that is instantiated 
> "manually".
> With the newly upcoming {{AdminClient}} from KIP-117, we can simplify (or 
> even replace {{StreamsKafkaClient}} with the new {{AdminClient}}. We 
> furthermore want to unify how the client in generated and extend 
> {{KafkaClientSupplier}} with method that return this client.
> As this is a public API change, a KIP is required.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-07 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-4845.

Resolution: Duplicate

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-07 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899634#comment-15899634
 ] 

Vahid Hashemian commented on KAFKA-4845:


Great, then I'll mark this one as a duplicate of KAFKA-4547. Thanks for 
confirming.

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Wiki edit permission

2017-03-07 Thread Molnár Bálint
Thanks, Guozhang:)

2017-03-07 16:03 GMT+01:00 Guozhang Wang :

> Done.
>
> Cheers,
> Guozhang
>
> On Tue, Mar 7, 2017 at 2:59 AM, Molnár Bálint 
> wrote:
>
> > Just a kindly remainder.
> >
> > Thanks,
> > Balint
> >
> > 2017-03-03 8:44 GMT+01:00 Molnár Bálint :
> >
> > > Hi,
> > >
> > > I would like to create a KIP, please add create permission to the wiki
> > > page.
> > > My username is baluchicken.
> > >
> > > Thanks
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Wiki edit permission

2017-03-07 Thread Guozhang Wang
Done.

Cheers,
Guozhang

On Tue, Mar 7, 2017 at 2:59 AM, Molnár Bálint 
wrote:

> Just a kindly remainder.
>
> Thanks,
> Balint
>
> 2017-03-03 8:44 GMT+01:00 Molnár Bálint :
>
> > Hi,
> >
> > I would like to create a KIP, please add create permission to the wiki
> > page.
> > My username is baluchicken.
> >
> > Thanks
> >
>



-- 
-- Guozhang


[jira] [Updated] (KAFKA-4657) Improve test coverage of CompositeReadOnlyWindowStore

2017-03-07 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4657:
--
Description: 
No tests that cover what happens when {{windowStore.fetch(...)}} throw an 
{{InvalidStateStoreException}}, i.e., we should check that we get an 
{{InvalidStateStoreException}} with the modified message.

There are no tests that cover the {{WindowStoreIterator}} that is returned when 
there are no results. In this case we should at least add tests that show the 
expected behaviour for {{peekNextKey}}, {{hasNext}}, and {{next}} 

  was:exceptions not covered and internal iterator


> Improve test coverage of CompositeReadOnlyWindowStore
> -
>
> Key: KAFKA-4657
> URL: https://issues.apache.org/jira/browse/KAFKA-4657
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> No tests that cover what happens when {{windowStore.fetch(...)}} throw an 
> {{InvalidStateStoreException}}, i.e., we should check that we get an 
> {{InvalidStateStoreException}} with the modified message.
> There are no tests that cover the {{WindowStoreIterator}} that is returned 
> when there are no results. In this case we should at least add tests that 
> show the expected behaviour for {{peekNextKey}}, {{hasNext}}, and {{next}} 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4437) Incremental Batch Processing for Kafka Streams

2017-03-07 Thread Ritesh Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899518#comment-15899518
 ] 

Ritesh Agarwal commented on KAFKA-4437:
---

Dear Team,

Could you please let us know regarding the status of this issue. We are highly 
interested in this feature and we were under the assumption that this will be 
released with version 0.10.2.0.

Thanks,
Ritesh Agarwal
+31 616245389

> Incremental Batch Processing for Kafka Streams
> --
>
> Key: KAFKA-4437
> URL: https://issues.apache.org/jira/browse/KAFKA-4437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> We want to add an “auto stop” feature that terminate a stream application 
> when it has processed all the data that was newly available at the time the 
> application started (to at current end-of-log, i.e., current high watermark). 
> This allows to chop the (infinite) log into finite chunks where each run for 
> the application processes one chunk. This feature allows for incremental 
> batch-like processing; think "start-process-stop-restart-process-stop-..."
> For details see KIP-95: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2651: MINOR: reduce() javadocs: clarify position of argu...

2017-03-07 Thread miguno
GitHub user miguno opened a pull request:

https://github.com/apache/kafka/pull/2651

MINOR: reduce() javadocs: clarify position of arguments



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/miguno/kafka trunk-reduce-javadocs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2651.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2651


commit 3252434ebc8c12666484d6420ce83e17923b842c
Author: Michael G. Noll 
Date:   2017-03-07T12:44:01Z

reduce() javadocs: clarify position of arguments




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-07 Thread Damian Guy
Hi Matthias,

Thanks. The perf test is a good start but I don't think it goes far enough.
100 partitions is not a lot. What happens when there are thousands of
partitions? What is the load on the brokers? How much more memory is used
by the Streams App etc?

Thanks,
Damian

On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax  wrote:

> Hi,
>
> I want to give a first respond:
>
>
>
> 1. Producer per task:
>
> First, we did some performance tests, indicating that the performance
> penalty is small. Please have a look here:
>
> https://docs.google.com/spreadsheets/d/18aGOB13-ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>
> For the test, we ran with a trunk version and a modified version that
> uses a producer per task (of course, no transactions, but at-least-once
> semantics). The scaling factor indicates the number of brokers and
> (single threaded) Streams instances. We used SimpleBenchmark that is
> part of AK code base.
>
>
> Second, as the design is "producer per task" (and not "producer per
> partition") it is possible to specify a custom PartitionGrouper that
> assigns multiple partitions to a single task. Thus, it allows to reduce
> the number of tasks for scenarios with many partitions. Right now, this
> interface must be implemented solely by the user, but we could also add
> a new config parameter that specifies the max.number.of.tasks or
> partitions.per.task so that the user can configure this instead of
> implementing the interface.
>
> Third, there is the idea of a "Producer Pool" that would allow to share
> resources (network connections, memory, etc) over multiple producers.
> This would allow to separate multiple transaction on the producer level,
> while resources are shared. There is no detailed design document yet and
> there would be a KIP for this feature.
>
> Thus, if there should be any performance problems for high scale
> scenarios, there are multiple ways to tackle them while keeping the
> "producer per task" design.
>
> Additionally, a "producer per thread" design would be way more complex
> and I summarized the issues in a separate document. I will share a link
> to the document soon.
>
>
>
> 2. StateStore recovery:
>
> Streams EoS will in the first design not allow to exploit the
> improvements that are added for 0.11 at the moment. However, as 0.10.2
> faces the same issues of potentially long recovery, there is no
> regression with this regard. Thus, I see those improvements as
> orthogonal or add-ons. Nevertheless, we should try to explore those
> options and if possible get them into 0.11 such that Streams with EoS
> gets the same improvements as at-least-once scenario.
>
>
>
> 3. Caching:
>
> We might need to do some experiments to quantify the impact on caching.
> If it's severe, the suggested default commit interval of 100ms could
> also be increased. Also, EoS will not enforce any commit interval, but
> only change the default value. Thus, a user can freely trade-off latency
> vs. caching-effect.
>
> Last but not least, there is the idea to allow "read_uncommitted" for
> intermediate topic. This would be an advance design for Streams EoS that
> allows downstream sub-topologies to read uncommitted data
> optimistically. In case of failure, a cascading abort of transactions
> would be required. This change will need another KIP.
>
>
>
> 4. Idempotent Producer:
>
> The transactional part automatically leverages the idempotent properties
> of the producer. Idempotency is a requirement:
>
> > Note that enable.idempotence must be enabled if a TransactionalId is
> configured.
>
> See
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>
> All idempotent retries, are handled by the producer internally (with or
> without transaction) if enable.idempotence is set to true.
>
>
>
> -Matthias
>
>
>
> On 3/3/17 3:34 AM, Eno Thereska wrote:
> > Another question:
> >
> > The KIP doesn’t exactly spell out how it uses the idempotence guarantee
> from KIP-98. It seems that only the transactional part is needed. Or is the
> idempotence guarantee working behind the scenes and helping for some
> scenarios for which it is not worthwhile aborting a transaction (e.g.,
> retransmitting a record after a temporary network glitch)?
> >
> > Thanks
> > Eno
> >
> >> On Mar 2, 2017, at 4:56 PM, Jay Kreps  wrote:
> >>
> >> I second the concern on with the one producer per task approach. At a
> >> high-level it seems to make sense but I think Damian is exactly right
> that
> >> that cuts against the general design of the producer. Many people have
> high
> >> input partition counts and will have high task counts as a result. I
> think
> >> processing 1000 partitions should not be an unreasonable thing to want
> to
> >> do.
> >>
> >> The tricky bits will be:
> >>
> >>   - Reduced effectiveness of batching (or more latency and memory to get
> >>   equivalent batching). This doesn't show up in simple 

[jira] [Commented] (KAFKA-4834) Kafka cannot delete topic with ReplicaStateMachine went wrong

2017-03-07 Thread Dan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899306#comment-15899306
 ] 

Dan commented on KAFKA-4834:


No. I found in another broker's log that it changed the same partition from New 
to Online and succeeded. So I guess there exists two controllers watching for 
the zookeeper events.

> Kafka cannot delete topic with ReplicaStateMachine went wrong
> -
>
> Key: KAFKA-4834
> URL: https://issues.apache.org/jira/browse/KAFKA-4834
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dan
>  Labels: reliability
>
> It happened several times that some topics can not be deleted in our 
> production environment. By analyzing the log, we found ReplicaStateMachine 
> went wrong. Here are the error messages:
> In state-change.log:
> ERROR Controller 2 epoch 201 initiated state change of replica 1 for 
> partition [test_create_topic1,1] from OnlineReplica to ReplicaDeletionStarted 
> failed (state.change.logger)
> java.lang.AssertionError: assertion failed: Replica 
> [Topic=test_create_topic1,Partition=1,Replica=1] should be in the 
> OfflineReplica states before moving to ReplicaDeletionStarted state. Instead 
> it is in OnlineReplica state
> at scala.Predef$.assert(Predef.scala:179)
> at 
> kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> at 
> kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> at 
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> In controller.log:
> INFO Leader not yet assigned for partition [test_create_topic1,1]. Skip 
> sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch)
> There may exist two controllers in the cluster because creating a new topic 
> may trigger two machines to change the state of same partition, eg. 
> NonExistentPartition -> NewPartition.
> On the other controller, we found following messages in controller.log of 
> several days earlier:
> 

[jira] [Commented] (KAFKA-4845) KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

2017-03-07 Thread Dan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899299#comment-15899299
 ] 

Dan commented on KAFKA-4845:


I checked 0.10.2.0 again and it was indeed fixed. Thanks a lot!

> KafkaConsumer.seekToEnd cannot take effect when integrating with spark 
> streaming
> 
>
> Key: KAFKA-4845
> URL: https://issues.apache.org/jira/browse/KAFKA-4845
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Dan
>Assignee: Vahid Hashemian
>
> When integrating with spark streaming, kafka consumer cannot get the latest 
> offsets except for one partition. The  code snippet is as follows: 
> {code}
> protected def latestOffsets(): Map[TopicPartition, Long] = {
> val c = consumer
> c.poll(0)
> val parts = c.assignment().asScala
> val newPartitions = parts.diff(currentOffsets.keySet)
> currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
> c.position(tp)).toMap
> c.pause(newPartitions.asJava)
> c.seekToEnd(currentOffsets.keySet.asJava)
> parts.map(tp => tp -> c.position(tp)).toMap
>   }
> {code}
> When calling consumer.position(topicPartition), it will call 
> updateFetchPositions(Collections.singleton(partition)):
> The bug lies in updateFetchPositions(Set partitions):
> {code}
> fetcher.resetOffsetsIfNeeded(partitions);// reset to latest 
> offset for current partition
> if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
> all partitions before, so this sentence will be true 
> coordinator.refreshCommittedOffsetsIfNeeded();
> fetcher.updateFetchPositions(partitions);  // reset to committed 
> offsets for current partition
> }
> {code}
> So eventually there is only one partition(the last partition in assignment) 
> can get latest offset while all the others get the committed offset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2650: [WIP]: add internal leave.group.on.close config to...

2017-03-07 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2650

[WIP]: add internal leave.group.on.close config to consumer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka consumer-leave-group-config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2650


commit a9d8f26fc0b23e7f02e71237e334749230e2380a
Author: Damian Guy 
Date:   2017-03-01T17:20:53Z

add internal leave.group.on.close config to consumer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Wiki edit permission

2017-03-07 Thread Molnár Bálint
Just a kindly remainder.

Thanks,
Balint

2017-03-03 8:44 GMT+01:00 Molnár Bálint :

> Hi,
>
> I would like to create a KIP, please add create permission to the wiki
> page.
> My username is baluchicken.
>
> Thanks
>


[GitHub] kafka pull request #2627: [WIP]: add a consumer leave group config

2017-03-07 Thread dguy
Github user dguy closed the pull request at:

https://github.com/apache/kafka/pull/2627


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Allow to replace Zookeeper with different Coordination Service (etcd/consul)

2017-03-07 Thread Molnár Bálint
Thanks, for pointing me to the right direction:). I will take a look on
this KIP.

2017-03-06 16:48 GMT+01:00 Ofir Manor :

> I suggest you check KIP-30 and the extensive discussion about it in the
> mailing list  from around December 2015 called "[DISCUSS] KIP-30 Allow for
> brokers to have plug-able consensus and meta data storage sub systems"
> If I remember correctly, it ran into some objections, as the existing
> commiters thought at the time that ZK was significantly more reliable, so
> it was not worth the effort to add inferior options.
> I personally think nowadays, when a lot of other critical cluster infra
> relies on these coordination services anyway, this KIP makes a lot of
> sense. The current dependency of ZK creates a large, unneeded operational
> overhead for those who have already deployed and relies on etcd/consul for
> the rest of their stack (including other stateful services).
> Just my two cents,
>
> Ofir Manor
>
> Co-Founder & CTO | Equalum
>
> Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io
>
> On Mon, Mar 6, 2017 at 12:15 PM, Alexander Binzberger <
> alexander.binzber...@wingcon.com> wrote:
>
> > I would also be interested in this. (etcd)
> >
> >
> > Am 02.03.2017 um 12:24 schrieb Molnár Bálint:
> >
> >> Hi,
> >>
> >> I was wonderring to refactor Kafka core code to be able to use different
> >> Coordination Service than Zookeeper. I know I will need to create a KIP
> >> for
> >> that.
> >> I think the first part of this task to refactor the classes which are
> >> using
> >> the ZkUtil methods to use a zookeeper independent trait instead.
> >> After that I think it will be possible to replace Zookeeper with
> >> etcd/Consul or even with a Raft implementation.
> >> Even without additional implementation it would help to test the code
> >> without starting an embedded zookeeper.
> >> I have already started to implement a POC and it seems doable, even if
> >> it's
> >> not a small patch.
> >>
> >> Balint
> >>
> >>
> > --
> > Alexander Binzberger
> > System Designer - WINGcon AG
> > Tel. +49 7543 966-119
> >
> > Sitz der Gesellschaft: Langenargen
> > Registergericht: ULM, HRB 734260
> > USt-Id.: DE232931635, WEEE-Id.: DE74015979
> > Vorstand: thomasThomas Ehrle (Vorsitz), Fritz R. Paul (Stellvertreter),
> > Tobias Treß
> > Aufsichtsrat: Jürgen Maucher (Vorsitz), Andreas Paul (Stellvertreter),
> > Martin Sauter
> >
> >
>