[jira] [Updated] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Jiao Ziheng updated KAFKA-2052:

Description: 
I wish to specify multiple zk nodes with chroot strings in server.properties 
config file but Kafka is unable to separate the zk nodes properly in the 
presence of chroot strings:

{code:title=server.properties|borderStyle=solid}
zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka
{code}

After running {code}service kafka start{code}

Here are the error logs:

{code:title=kafka_init_stdout.log|borderStyle=solid}
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
{code}

{code:title=server.log|borderStyle=solid}
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(

[jira] [Updated] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Jiao Ziheng updated KAFKA-2052:

Description: 
I wish to specify multiple zk nodes with chroot strings in server.properties 
config file but Kafka is unable to separate the zk nodes properly in the 
presence of chroot strings:

{code:title=server.properties|borderStyle=solid}
zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka
{code}

Error logs after running {code}service kafka start{code}:

{code:title=kafka_init_stdout.log|borderStyle=solid}
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
{code}

{code:title=server.log|borderStyle=solid}
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java

[jira] [Updated] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Jiao Ziheng updated KAFKA-2052:

Description: 
I wish to specify multiple zk nodes with chroot strings in server.properties 
config file but Kafka is unable to separate the zk nodes properly in the 
presence of chroot strings:

{code:title=server.properties|borderStyle=solid}
zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka
{code}

Error logs:

{code:title=kafka_init_stdout.log|borderStyle=solid}
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
{code}

{code:title=server.log|borderStyle=solid}
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper

[jira] [Updated] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Jiao Ziheng updated KAFKA-2052:

Description: 
I wish to specify multiple zk nodes with chroot strings in server.properties 
config file but Kafka is unable to separate the zk nodes properly in the 
presence of chroot strings:

{code:title=server.properties|borderStyle=solid}
zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka
{code}

Error logs:

{code:title=kafka_init_stdout.log|borderStyle=solid}
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
{code}

{code:title=server.log|borderStyle=solid}
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper

Re: Metrics package discussion

2015-03-25 Thread Jay Kreps
Here was my understanding of the issue last time.

The yammer metrics use a random sample of requests to estimate the
histogram. This allocates a fairly large array of longs (their values are
longs rather than floats). A reasonable sample might be 8k entries which
would give about 64KB per histogram. There are bounds on accuracy, but they
are only probabilistic. I.e. if you try to get 99% < 5 ms of inaccuracy,
you will 1% of the time get more than this. This is okay but if you try to
alert, in which you realize that being wrong 1% of the time is a lot if you
are computing stats every second continuously on many metrics (i.e. 1 in
100 estimates will be outside you bound). This array is copied in full
every time you check the metric which is the other cause of the memory
pressure.

The better approach to histograms is to calculate buckets boundaries and
record arbitrarily many values in those buckets. A simple bucketing
approach for latency would be 0, 5ms, 10ms, 15ms, etc, and you just count
how many fall in each bucket. Your precision is deterministically bounded
by the bucket boundaries, so if you had 5ms buckets you would never have
more than 5ms loss of precision. By using non-uniform bucket sizes you can
make this work even better (e.g. give ~1ms precision for latencies in the
1ms range, but give only 1 second precision for latencies in the 30 second
range). That is what is implemented in that metrics package.

I think this bucketing approach is popular now. There is a whole "HDR
histogram" library that gives lots of different bucketing methods and
implements dynamic resizing so you don't have to specify an upper bound.
 https://github.com/HdrHistogram/HdrHistogram

Whether this matters depends entirely if you want histograms broken down at
the client, topic, partition, or broker level or just want overall metrics.
If we just want per sever aggregates for histograms then I think the memory
usage is not a huge issue. If you want a histogram per topic or client or
partition and have 10k of these then that is where you start talking like
1GB of memory with the yammer package, which is what we hit last time.
Getting percentiles on the client level is nice, percentiles are definitely
better than averages, but I'm not sure it is required.

-Jay

On Wed, Mar 25, 2015 at 9:43 PM, Neha Narkhede  wrote:

> Aditya,
>
> If we are doing a deep dive, one of the things to investigate would be
> memory/GC performance. IIRC, when I was looking into codahale at LinkedIn,
> I remember it having quite a few memory management and GC issues while
> using histograms. In comparison, histograms in the new metrics package
> aren't very well tested.
>
> Thanks,
> Neha
>
> On Wed, Mar 25, 2015 at 8:25 AM, Aditya Auradkar <
> aaurad...@linkedin.com.invalid> wrote:
>
> > Hey everyone,
> >
> > Picking up this discussion after yesterdays KIP hangout. For anyone who
> > did not join the meeting, we have 2 different metrics packages being used
> > by the clients (custom package) and the server (codahale). We are
> > discussing whether to migrate the server to the new package.
> >
> > What information do we need in order to make a decision?
> >
> > Some pros of the new package:
> > - Using the most recent information by combining data from previous and
> > current samples. I'm not sure how codahale does this so I'll investigate.
> > - We can quota on anything we measure. This is pretty cool IMO. I've
> > investigate the feasibility of adding this feature in codahale.
> > - Hierarchical metrics. For example: we can define a sensor for overall
> > bytes-in/bytes-out and also per-client. Updating the client sensor will
> > cause the global byte rate sensor to get modified too.
> >
> > What are some of the issues with codahale? One previous discussion
> > mentions high memory usage but I don't have any experience with it
> myself.
> >
> > Thanks,
> > Aditya
> >
> >
> >
> >
> >
>
>
> --
> Thanks,
> Neha
>


[jira] [Updated] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Jiao Ziheng updated KAFKA-2052:

Description: 
I wish to specify multiple zk nodes with chroot strings in server.properties 
config file but Kafka is unable to separate the zk nodes properly in the 
presence of chroot strings:

{code:title=server.properties|borderStyle=solid}
zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka
{code}

Error logs:

==> kafka_init_stdout.log <==
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)

==> server.log <==
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:626)
at org.I0Itec.zkc

[jira] [Updated] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Jiao Ziheng updated KAFKA-2052:

Description: 
I wish to specify multiple zk nodes with chroot strings in server.properties 
config file but Kafka is unable to separate the zk nodes properly in the 
presence of chroot strings:
{code:title=Bar.java|borderStyle=solid}
zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka
{code}

Error logs:

==> kafka_init_stdout.log <==
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)

==> server.log <==
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:626)
at org.I0Itec.zkclient.ZkCo

[jira] [Updated] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Jiao Ziheng updated KAFKA-2052:

Description: 
I wish to specify multiple zk nodes with chroot strings in server.properties 
config file but Kafka is unable to separate the zk nodes properly in the 
presence of chroot strings:

zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka

Error logs:

==> kafka_init_stdout.log <==
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)

==> server.log <==
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:626)
at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)

[jira] [Created] (KAFKA-2052) zookeeper.connect does not work when specifying multiple zk nodes with chroot

2015-03-25 Thread Alex Jiao Ziheng (JIRA)
Alex Jiao Ziheng created KAFKA-2052:
---

 Summary: zookeeper.connect does not work when specifying multiple 
zk nodes with chroot 
 Key: KAFKA-2052
 URL: https://issues.apache.org/jira/browse/KAFKA-2052
 Project: Kafka
  Issue Type: Bug
  Components: config
Reporter: Alex Jiao Ziheng
Priority: Minor


I wish to specify multiple zk nodes in server.properties config file:

zookeeper.connect=zookeeper-staging-a-1.bezurk.org:2181/kafka,zookeeper-staging-b-1.bezurk.org:2181/kafka

Error logs:

==> kafka_init_stdout.log <==
[2015-03-26 13:58:21,330] INFO [Kafka Server 1], Connecting to zookeeper on 
zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 (kafka.server.KafkaServer)
[2015-03-26 13:58:21,438] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2015-03-26 13:58:21,454] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:host.name=ip-10-0-11-38.ap-southeast-1.compute.internal 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.version=1.7.0_75 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,454] INFO Client 
environment:java.home=/usr/lib/jvm/jdk1.7.0_75/jre 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.class.path=:/opt/kafka/bin/../core/build/dependant-libs-2.8.0/*.jar:/opt/kafka/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/opt/kafka/bin/../clients/build/libs//kafka-clients*.jar:/opt/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/opt/kafka/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/opt/kafka/bin/../libs/log4j-1.2.15.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/scala-library-2.9.2.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.2.jar:/opt/kafka/bin/../libs/snappy-java-1.0.5.jar:/opt/kafka/bin/../libs/zkclient-0.3.jar:/opt/kafka/bin/../libs/zookeeper-3.3.4.jar:/opt/kafka/bin/../core/build/libs/kafka_2.8.0*.jar
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:java.compiler= 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,455] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:os.version=3.2.0-58-virtual 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.name=kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.home=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,456] INFO Client environment:user.dir=/home/kafka 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,457] INFO Initiating client connection, 
connectString=zookeeper-staging-b-1.bezurk.org:2181/kafka,zookeeper-staging-a-1.bezurk.org:2181/kafka
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@c0d7 
(org.apache.zookeeper.ZooKeeper)
[2015-03-26 13:58:21,564] INFO Opening socket connection to server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,578] INFO Socket connection established to 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,596] INFO Session establishment complete on server 
zookeeper-staging-b-1.bezurk.org/10.0.29.191:2181, sessionid = 
0x24c50668a78000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-03-26 13:58:21,604] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)

==> server.log <==
2015-03-26 13:58:21,750 FATAL kafka.server.KafkaServerStartable: Fatal error 
during KafkaServerStable startup. Prepare to shutdown
java.lang.IllegalArgumentException: Path length must be > 0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper.cr

Re: Metrics package discussion

2015-03-25 Thread Neha Narkhede
Aditya,

If we are doing a deep dive, one of the things to investigate would be
memory/GC performance. IIRC, when I was looking into codahale at LinkedIn,
I remember it having quite a few memory management and GC issues while
using histograms. In comparison, histograms in the new metrics package
aren't very well tested.

Thanks,
Neha

On Wed, Mar 25, 2015 at 8:25 AM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Hey everyone,
>
> Picking up this discussion after yesterdays KIP hangout. For anyone who
> did not join the meeting, we have 2 different metrics packages being used
> by the clients (custom package) and the server (codahale). We are
> discussing whether to migrate the server to the new package.
>
> What information do we need in order to make a decision?
>
> Some pros of the new package:
> - Using the most recent information by combining data from previous and
> current samples. I'm not sure how codahale does this so I'll investigate.
> - We can quota on anything we measure. This is pretty cool IMO. I've
> investigate the feasibility of adding this feature in codahale.
> - Hierarchical metrics. For example: we can define a sensor for overall
> bytes-in/bytes-out and also per-client. Updating the client sensor will
> cause the global byte rate sensor to get modified too.
>
> What are some of the issues with codahale? One previous discussion
> mentions high memory usage but I don't have any experience with it myself.
>
> Thanks,
> Aditya
>
>
>
>
>


-- 
Thanks,
Neha


Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-03-25 Thread Neha Narkhede
Parth,

We can make some 15 mins or so to discuss this at the next KIP hangout.

Thanks,
Neha

On Wed, Mar 25, 2015 at 1:07 PM, Parth Brahmbhatt <
pbrahmbh...@hortonworks.com> wrote:

> Hi all,
>
> I have modified the KIP to reflect the recent change request from the
> reviewers. I have been working on the code and I have the server side code
> for authorization ready. I am now modifying the command line utilities. I
> would really appreciate if some of the committers can spend sometime to
> review the KIP so we can make progress on this.
>
> Thanks
> Parth
>
> On 3/18/15, 2:20 PM, "Michael Herstine" 
> wrote:
>
> >Hi Parth,
> >
> >Thanks! A few questions:
> >
> >1. Do you want to permit rules in your ACLs that DENY access as well as
> >ALLOW? This can be handy setting up rules that have exceptions. E.g.
> >“Allow principal P to READ resource R from all hosts” with “Deny principal
> >P READ access to resource R from host H1” in combination would allow P to
> >READ R from all hosts *except* H1.
> >
> >2. When a topic is newly created, will there be an ACL created for it? If
> >not, would that not deny subsequent access to it?
> >
> >(nit) Maybe use Principal instead of String to represent principals?
> >
> >
> >On 3/9/15, 11:48 AM, "Don Bosco Durai"  wrote:
> >
> >>Parth
> >>
> >>Overall it is looking good. Couple of questionsŠ
> >>
> >>- Can you give an example how the policies will look like in the default
> >>implementation?
> >>- In the operations, can we support ³CONNECT² also? This can be used
> >>during Session connection
> >>- Regarding access control for ³Topic Creation², since we can¹t do it on
> >>the server side, can we de-scope it for? And plan it as a future feature
> >>request?
> >>
> >>Thanks
> >>
> >>Bosco
> >>
> >>
> >>
> >>On 3/6/15, 8:10 AM, "Harsha"  wrote:
> >>
> >>>Hi Parth,
> >>>Thanks for putting this together. Overall it looks good to
> >>>me. Although AdminUtils is a concern KIP-4  can probably fix
> >>>that part.
> >>>Thanks,
> >>>Harsha
> >>>
> >>>On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
>  Forgot to add links to wiki and jira.
> 
>  Link to wiki:
> 
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorizatio
> n
> +
> Interface
>  Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688
> 
>  Thanks
>  Parth
> 
>  From: Parth Brahmbhatt
>  mailto:pbrahmbh...@hortonworks.com>>
>  Date: Thursday, March 5, 2015 at 10:33 AM
>  To: "dev@kafka.apache.org"
>  mailto:dev@kafka.apache.org>>
>  Subject: [DISCUSS] KIP-11- Authorization design for kafka security
> 
>  Hi,
> 
>  KIP-11 is open for discussion , I have updated the wiki with the
> design
>  and open questions.
> 
>  Thanks
>  Parth
> >>
> >>
> >
>
>


-- 
Thanks,
Neha


Re: Plan of Controlled Shutdown

2015-03-25 Thread Neha Narkhede
Removed the mention of the controlled shutdown tool from the wiki since we
don't support it anymore -
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown

On Wed, Mar 25, 2015 at 3:07 PM, Guozhang Wang  wrote:

> Mingjie,
>
> Controlled shutdown has been fully implemented and is regularly exercised
> at LinkedIn for version upgrades / rolling bounce tests, etc. As Harsha
> said you just need to turn on the config in the brokers, and normal
> shutting down command like control-c will trigger the controlled shutdown.
>
> Guozhang
>
> On Wed, Mar 25, 2015 at 12:39 PM, Harsha  wrote:
>
> > Hi ,
> >you can enable/disable controlled shutdown using
> > controlled.shutdown.enable in server.properties. This is by default set
> to
> > “true” in 0.8.2. You can go ahead do a rolling restart  and you don’t
> need
> > ShutdownBroker command which is removed.  Regarding KAFKA-2029 talks
> about
> > improvement to the controlled shutdown especially in case of having high
> > partitions per broker which might degrade the performance as per the
> JIRA.
> >
> > Thanks,
> > Harsha
> >
> >
> > On March 25, 2015 at 11:15:21 AM, Mingjie Lai (m...@apache.org) wrote:
> >
> > Hi.
> >
> > I've been trying to figure out the best way to do kafka broker rolling
> > restart, and read the controlled shutdown wiki page:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown
> >
> > However I cannot find this kafka.admin.ShutdownBroker class in 0.8.2:
> >
> > /usr/lib/kafka $ bin/kafka-run-class.sh kafka.admin.ShutdownBroker
> >
> > Exception in thread "main" java.lang.NoClassDefFoundError:
> > kafka/admin/ShutdownBroker
> > Caused by: java.lang.ClassNotFoundException: kafka.admin.ShutdownBroker
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > Could not find the main class: kafka.admin.ShutdownBroker. Program will
> > exit.
> >
> > Some other commands are there:
> >
> > /usr/lib/kafka $ bin/kafka-topics.sh --list --zookeeper zk1:2181/kafka |
> > grep test
> >
> > test
> >
> > However, I also see some jira about the bug fixes for ControlledShutdown
> > like https://issues.apache.org/jira/browse/KAFKA-2029. So I kind of got
> > confused.
> >
> > What's the plan for this feature in kafka? I still see the value of it,
> for
> > example, we can move some the leader partitions around on purpose for a
> > scheduled upgrade or config update, minimize the transition as less as
> > possible.
> >
> > Thanks,
> > Mingjie
> >
>
>
>
> --
> -- Guozhang
>



-- 
Thanks,
Neha


[jira] [Commented] (KAFKA-2050) Avoid calling .size() on java.util.ConcurrentLinkedQueue

2015-03-25 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381337#comment-14381337
 ] 

Jay Kreps commented on KAFKA-2050:
--

Nice catch.

> Avoid calling .size() on java.util.ConcurrentLinkedQueue
> 
>
> Key: KAFKA-2050
> URL: https://issues.apache.org/jira/browse/KAFKA-2050
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Tim Brooks
>Assignee: Jun Rao
> Attachments: KAFKA-2050.patch, dont_call_queue_size.patch
>
>
> Generally, it seems to be preferred to avoid calling .size() on a Java 
> ConcurrentLinkedQueue. This is an O(N) operation as it must iterate through 
> all the nodes.
> Calling this every time through the loop makes this issue worse under high 
> load. It seems like the same functionality can be attained by just polling 
> and checking for null.
> This is more imperative and less functional, but it seems alright since this 
> is in lower-level networking code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2051) Add hashCode() and equals() to ProducerRecord

2015-03-25 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381321#comment-14381321
 ] 

Ewen Cheslack-Postava commented on KAFKA-2051:
--

This was already addressed in KAFKA-1805, you can see that the version on trunk 
has those methods: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java

> Add hashCode() and equals() to ProducerRecord
> -
>
> Key: KAFKA-2051
> URL: https://issues.apache.org/jira/browse/KAFKA-2051
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Justin Plock
>Priority: Trivial
>
> Would it be possible to add both a {{hashCode()}} and {{equals()}} methods to 
> the {{ProducerRecord}} object?
> https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
> I can submit a pull request via Github if that would be easier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2051) Add hashCode() and equals() to ProducerRecord

2015-03-25 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-2051.
--
Resolution: Duplicate

> Add hashCode() and equals() to ProducerRecord
> -
>
> Key: KAFKA-2051
> URL: https://issues.apache.org/jira/browse/KAFKA-2051
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Justin Plock
>Priority: Trivial
>
> Would it be possible to add both a {{hashCode()}} and {{equals()}} methods to 
> the {{ProducerRecord}} object?
> https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
> I can submit a pull request via Github if that would be easier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Contributing a KIP

2015-03-25 Thread Tom Lee
Hi all,

I'd like to put forth a KIP but don't seem to have write access to cwiki.
Do I need to get a special permission bit set, or should I simply open up
the discussion here on the mailing list?

Cheers,
Tom


Re: [VOTE] KIP-15 add a close with timeout to new producer

2015-03-25 Thread Neha Narkhede
>
> We have agreed that we will have an error log to inform user about this
> mis-usage. The options differ in the way how we can force user to take a
> look at that error log.


Since we have to detect the problem in order to log an appropriate error
message, we have a way to tell if the user is doing something that will
cause their application to block indefinitely, which can't be ideal
behavior in any case I can imagine.

My concern is that this might add to this long list
 of confusing
Kafka behaviors, so I'd vote to log an appropriate error message and exit.

On Wed, Mar 25, 2015 at 10:04 AM, Jiangjie Qin 
wrote:

> Hi Jay,
>
> The reason we keep the blocking behavior if close() or close(timeout) is
> called from callback is discussed in another thread.
> I copy/paste the message here:
>
> It looks that the problem we want to solve and the purpose we want to
> achieve is:
> If user uses close() in callback, we want to let user be aware that they
> should use close(0) instead of close() in the callback.
>
> We have agreed that we will have an error log to inform user about this
> mis-usage. The options differ in the way how we can force user to take a
> look at that error log.
> There are two scenarios:
> 1. User does not expect the program to exit.
> 2. User expect the program to exit.
>
> For scenario 1), blocking will probably delay the discovery of the
> problem. Calling close(0) exposes the problem quicker. In this scenario
> producer just encounter a send failure when running normally.
> For scenario 2), blocking will expose the problem quick. Calling close(-1)
> might hide the problem. This scenario might include: a) Unit test for a
> send failure. b) Message sent during a close() call from a user thread.
>
> So as a summary table:
>
>  Scenario 1) Scenario 2)
>
> BlockingDelay problem discoveryGuaranteed problem discovery
>
> Close(-1)   Immediate problem discoveryProblem might be hidden
>
>
> Personally I prefer blocking because it seems providing more guarantees
> and safer.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
>
>
> On 3/25/15, 9:20 AM, "Jay Kreps"  wrote:
>
> >Wait, actually, why would the thread block forever? I would understand
> >throwing an error and failing immediately (fail fast) and I would
> >understand logging an error and blocking for the time they specified
> >(since
> >that is what they asked for), but the logging an error and putatively
> >blocking forever if they only specified a wait time of say 15 ms just
> >seems
> >weird, right? There is no other error condition where we intentionally
> >block forever as far as I know.
> >
> >Sorry to keep going around and around on this minor point I just want to
> >clarify that that is what you mean.
> >
> >-Jay
> >
> >On Wed, Mar 25, 2015 at 9:17 AM, Jay Kreps  wrote:
> >
> >> +1
> >>
> >> -Jay
> >>
> >> On Tue, Mar 24, 2015 at 2:43 PM, Guozhang Wang 
> >>wrote:
> >>
> >>> +1.
> >>>
> >>> On Tue, Mar 24, 2015 at 2:15 PM, Jiangjie Qin
> >>>
> >>> wrote:
> >>>
> >>> >
> >>> >
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-15+-+Add+a+close+m
> >>>ethod+with+a+timeout+in+the+producer
> >>> >
> >>> > As a short summary, the new interface will be as following:
> >>> > Close(Long Timeout, TimeUnit timeUnit)
> >>> >
> >>> >   1.  When timeout > 0, it will try to wait up to timeout for the
> >>>sender
> >>> > thread to complete all the requests, then join the sender thread. If
> >>>the
> >>> > sender thread is not able to finish work before timeout, the method
> >>> force
> >>> > close the producer by fail all the incomplete requests and join the
> >>> sender
> >>> > thread.
> >>> >   2.  When timeout = 0, it will be a non-blocking call, just initiate
> >>> the
> >>> > force close and DOES NOT join the sender thread.
> >>> >
> >>> > If close(timeout) is called from callback, an error message will be
> >>> logged
> >>> > and the producer sender thread will block forever.
> >>> >
> >>> >
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
>
>


-- 
Thanks,
Neha


Re: MirrorMaker improvements

2015-03-25 Thread Jiangjie Qin
You can set the num.consumer.fetchers to be a larger number (e.g. 3) and
there will be more fetchers created to fetch from different partitions in
the same broker. Each fetcher will have there own TCP connection.


WRT the congestion window, yes, if the link has a high packet drop rate
then congestion avoidance will occur before the bandwidth get fully used.
I thought in normal cases, packets only got dropped occasionally even with
a long link (that’s why congestion avoidance only occur when there are 3
duplicated acks or a timeout). So if packet drop rate is high, it sounds
more like a link quality issue or the network is really congested - like
in the paper you mentioned, one of the internet router enforces drop-tail
policy because buffer is full due to bursty traffic.

Jiangjie (Becket) Qin

On 3/25/15, 4:08 PM, "vlad...@gmail.com"  wrote:

>Hi Jianqjie,
>
>I only noticed a single TCP connection between a MM process to a single
>broker. Is there something I could have done to open up more connections?
>
>TCP can actually cap before saturating the network, which is the reason
>for
>which it is hard to utilize a high bandwidth latency link with a single
>TCP
>connection. There is an equation that links the MSS, RTT and loss rate of
>the link to the TCP achievable throughput. Notice that the link bandwidth
>does not come into play, since the only way it can affect throughput is by
>increasing the loss rate due to drops when the link is congested. On WAN
>links, however, usually a single connection will cap (due to random losses
>and high RTT), long before achieving the capacity of the link. Here is a
>reference for this:
>http://www.ece.virginia.edu/~mv/edu/715/lectures/TCP/padhye98modeling.pdf
>
>Regards,
>Vlad
>
>On Wed, Mar 25, 2015 at 3:43 PM, Jiangjie Qin 
>wrote:
>
>> Hi Vlad,
>>
>> I am not sure I understand the congestion window part. So TCP congestion
>> control will only occur when you are saturating the network. If that is
>> the case, bandwidth has already become the bottleneck. But we are
>>talking
>> about network under utilization, no?
>>
>> Another thing is that each fetcher thread has their own BlockingChannel
>>to
>> the broker, so they have dedicated TCP connections. Could you explain
>>more
>> on the Mux?
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/25/15, 2:59 PM, "vlad...@gmail.com"  wrote:
>>
>> >@Guozhang
>> >We actually have separate topics depending on the source of the message
>> >and
>> >the multicast distribution group (the set of destinations). Our topics
>>are
>> >named: source_multicast-group. We do not aggregate data but we do
>>static
>> >routing based on the destination and the destination set (that is, we
>>set
>> >up a tree of mirrormakers to copy the topic from the original
>>datacenter
>> >to
>> >the others). This gives us a static topology (no path failure
>>resilience)
>> >and limits the number of multicast groups (since each multicase group
>> >needs
>> >a different topic for every source), but for our data replication
>>pattern
>> >is a good match. It also helps that the order of writes in our system
>>is
>> >not important, so we do not need a single point of aggregation :)
>> >
>> >@Jun
>> >The actual problem is the congestion window, I do not think that the we
>> >are
>> >suffering due to the transmit/receive socket buffers (we are using the
>> >same
>> >buffers over different links with similar RTT but different loss rates
>>and
>> >the TCP connection throughput varies a lot, this would not be the case
>>if
>> >the amount of in-flight data would be limited by buffer size). The
>> >socket-level cwnd metrics also support our hypothesis and we also have
>> >measured using iperf what a single connection can transport across a
>>lossy
>> >inter-DC link. Jianqie seems to be suggesting a different blocking
>> >scenario, similar to head-of-line blocking because of other requests,
>> >however increasing the number of fetchers will not necessarily help
>>since
>> >all fetchers will mux their request over a single TCP connection when
>> >sending requests to a single broker. The TCP connection's congestion
>> >window
>> >will continue to be the limiting factor. I would say that the only way
>>out
>> >of this is to pool multiple TCP connections from a single consumer to a
>> >broker.
>> >
>> >For identical mirroring, I thought that when asking for data between a
>> >pair
>> >of offsets the result should always be the same. Would it be possible
>>to
>> >produce also indicating the offsets where the data should go?
>> >
>> >Regards,
>> >Vlad
>> >
>> >On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin
>>> >
>> >wrote:
>> >
>> >> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but
>> >>might
>> >> not be sufficient.
>> >> There are actually two related factors here:
>> >> 1. Pipelining TCP packets when send a single request/response.
>> >> 2. Pipelining multiple requests/responses
>> >> Bumping up socket.receive.buffer.bytes help with the 1) but does not
>> >>help
>> >> 

[jira] [Created] (KAFKA-2051) Add hashCode() and equals() to ProducerRecord

2015-03-25 Thread Justin Plock (JIRA)
Justin Plock created KAFKA-2051:
---

 Summary: Add hashCode() and equals() to ProducerRecord
 Key: KAFKA-2051
 URL: https://issues.apache.org/jira/browse/KAFKA-2051
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Justin Plock
Priority: Trivial


Would it be possible to add both a {{hashCode()}} and {{equals()}} methods to 
the {{ProducerRecord}} object?

https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java

I can submit a pull request via Github if that would be easier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32519: Patch for KAFKA-2050

2015-03-25 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32519/#review77855
---

Ship it!


Ship It!

- Sriharsha Chintalapani


On March 26, 2015, 2:36 a.m., Tim Brooks wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32519/
> ---
> 
> (Updated March 26, 2015, 2:36 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2050
> https://issues.apache.org/jira/browse/KAFKA-2050
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Avoid calling size() on concurrent queue.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
> 
> Diff: https://reviews.apache.org/r/32519/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Tim Brooks
> 
>



[jira] [Commented] (KAFKA-2050) Avoid calling .size() on java.util.ConcurrentLinkedQueue

2015-03-25 Thread Tim Brooks (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381251#comment-14381251
 ] 

Tim Brooks commented on KAFKA-2050:
---

Okay. I submitted it. I had some trouble. But I think I was able to work it out.

> Avoid calling .size() on java.util.ConcurrentLinkedQueue
> 
>
> Key: KAFKA-2050
> URL: https://issues.apache.org/jira/browse/KAFKA-2050
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Tim Brooks
>Assignee: Jun Rao
> Attachments: KAFKA-2050.patch, dont_call_queue_size.patch
>
>
> Generally, it seems to be preferred to avoid calling .size() on a Java 
> ConcurrentLinkedQueue. This is an O(N) operation as it must iterate through 
> all the nodes.
> Calling this every time through the loop makes this issue worse under high 
> load. It seems like the same functionality can be attained by just polling 
> and checking for null.
> This is more imperative and less functional, but it seems alright since this 
> is in lower-level networking code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2050) Avoid calling .size() on java.util.ConcurrentLinkedQueue

2015-03-25 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2050:
--
Attachment: KAFKA-2050.patch

> Avoid calling .size() on java.util.ConcurrentLinkedQueue
> 
>
> Key: KAFKA-2050
> URL: https://issues.apache.org/jira/browse/KAFKA-2050
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Tim Brooks
>Assignee: Jun Rao
> Attachments: KAFKA-2050.patch, dont_call_queue_size.patch
>
>
> Generally, it seems to be preferred to avoid calling .size() on a Java 
> ConcurrentLinkedQueue. This is an O(N) operation as it must iterate through 
> all the nodes.
> Calling this every time through the loop makes this issue worse under high 
> load. It seems like the same functionality can be attained by just polling 
> and checking for null.
> This is more imperative and less functional, but it seems alright since this 
> is in lower-level networking code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2050) Avoid calling .size() on java.util.ConcurrentLinkedQueue

2015-03-25 Thread Tim Brooks (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381250#comment-14381250
 ] 

Tim Brooks commented on KAFKA-2050:
---

Created reviewboard https://reviews.apache.org/r/32519/diff/
 against branch origin/trunk

> Avoid calling .size() on java.util.ConcurrentLinkedQueue
> 
>
> Key: KAFKA-2050
> URL: https://issues.apache.org/jira/browse/KAFKA-2050
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Tim Brooks
>Assignee: Jun Rao
> Attachments: KAFKA-2050.patch, dont_call_queue_size.patch
>
>
> Generally, it seems to be preferred to avoid calling .size() on a Java 
> ConcurrentLinkedQueue. This is an O(N) operation as it must iterate through 
> all the nodes.
> Calling this every time through the loop makes this issue worse under high 
> load. It seems like the same functionality can be attained by just polling 
> and checking for null.
> This is more imperative and less functional, but it seems alright since this 
> is in lower-level networking code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32519: Patch for KAFKA-2050

2015-03-25 Thread Tim Brooks

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32519/
---

Review request for kafka.


Bugs: KAFKA-2050
https://issues.apache.org/jira/browse/KAFKA-2050


Repository: kafka


Description
---

Avoid calling size() on concurrent queue.


Diffs
-

  core/src/main/scala/kafka/network/SocketServer.scala 
76ce41aed6e04ac5ba88395c4d5008aca17f9a73 

Diff: https://reviews.apache.org/r/32519/diff/


Testing
---


Thanks,

Tim Brooks



[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381232#comment-14381232
 ] 

Gwen Shapira commented on KAFKA-2044:
-

Updated reviewboard https://reviews.apache.org/r/32459/diff/
 against branch trunk

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch, KAFKA-2044_2015-03-25_16:53:05.patch, 
> KAFKA-2044_2015-03-25_18:49:24.patch, KAFKA-2044_2015-03-25_19:20:16.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/
---

(Updated March 26, 2015, 2:20 a.m.)


Review request for kafka.


Bugs: KAFKA-2044
https://issues.apache.org/jira/browse/KAFKA-2044


Repository: kafka


Description (updated)
---

support requests and responses using Common api in core modules (missing files)


added error handling and factory method for requests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1927-v2


made getErrorResponse required for requests by adding another abstract class


added serialization tests for error responses and fixed related issues


Fix checkstyle complaint


Diffs (updated)
-

  checkstyle/import-control.xml cca4b38ec766028a604f88a1c63228e40df24573 
  clients/src/main/java/org/apache/kafka/common/Node.java 
88c3b2425e42d9fc9c716a8e093d2ff1a12e28dd 
  clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
321da8afc73941292f743e1c22fc3788df3d12dd 
  clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 1651e75dedf32931eeff75f3ae6ef23db37acdc3 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
721e7d3f53247f5ae1ea4315fb3c466a94880b59 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
f020aaa05525153f39dfda187f0c8174f83a6b95 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
6943878116a97c02b758d273d93976019688830e 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ebc188742fd65e5e744003b4579324874fd81a9 
  clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
94e9d376235b3288836807d8e8d2547b3743aad5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
16c807c01628b9408dcf20ca946373927246f7b0 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
995f89f25b621484ddc3f3e4779ab7446a20124f 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
4b67f7025fb613344ad65903f7bc8e3f61b165b4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32459/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Attachment: KAFKA-2044_2015-03-25_19:20:16.patch

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch, KAFKA-2044_2015-03-25_16:53:05.patch, 
> KAFKA-2044_2015-03-25_18:49:24.patch, KAFKA-2044_2015-03-25_19:20:16.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira


> On March 26, 2015, 12:47 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java,
> >  line 53
> > 
> >
> > Cluster may not serialize properly with those nulls in PartitionInfo.
> 
> Gwen Shapira wrote:
> Oh wow, great catch. Added tests and fixed the serialization issues.
> 
> Gwen Shapira wrote:
> I'm hitting a style-check error for the test: 
> /Users/gshapira/workspace/kafka/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:20:1:
>  Disallowed import - org.apache.kafka.common.errors.UnknownServerException.
> 
> I need an exception to test "getErrorResponse()", any idea why I'm not 
> allowed to import this here? both packages are in o.a.k.common...
> 
> Jun Rao wrote:
> Probably because of the following rule in checkstyle/import-control.xml?
> 
> 
> 
> 
> 

Does it make sense to change the rule here?


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77832
---


On March 26, 2015, 1:49 a.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 26, 2015, 1:49 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1927-v2
> 
> 
> made getErrorResponse required for requests by adding another abstract class
> 
> 
> added serialization tests for error responses and fixed related issues
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 88c3b2425e42d9fc9c716a8e093d2ff1a12e28dd 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> 321da8afc73941292f743e1c22fc3788df3d12dd 
>   clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  1651e75dedf32931eeff75f3ae6ef23db37acdc3 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> 721e7d3f53247f5ae1ea4315fb3c466a94880b59 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
> f020aaa05525153f39dfda187f0c8174f83a6b95 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> 5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  94e9d376235b3288836807d8e8d2547b3743aad5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  16c807c01628b9408dcf20ca946373927246f7b0 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 995f89f25b621484ddc3f3e4779ab7446a20124f 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 4b67f7025fb613344ad65903f7bc8e3f61b165b4 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  13237fd72da5448a3d596b882fef141f336f827d 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467a

Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Jun Rao


> On March 26, 2015, 12:47 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java,
> >  line 53
> > 
> >
> > Cluster may not serialize properly with those nulls in PartitionInfo.
> 
> Gwen Shapira wrote:
> Oh wow, great catch. Added tests and fixed the serialization issues.
> 
> Gwen Shapira wrote:
> I'm hitting a style-check error for the test: 
> /Users/gshapira/workspace/kafka/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:20:1:
>  Disallowed import - org.apache.kafka.common.errors.UnknownServerException.
> 
> I need an exception to test "getErrorResponse()", any idea why I'm not 
> allowed to import this here? both packages are in o.a.k.common...

Probably because of the following rule in checkstyle/import-control.xml?







- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77832
---


On March 26, 2015, 1:49 a.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 26, 2015, 1:49 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1927-v2
> 
> 
> made getErrorResponse required for requests by adding another abstract class
> 
> 
> added serialization tests for error responses and fixed related issues
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 88c3b2425e42d9fc9c716a8e093d2ff1a12e28dd 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> 321da8afc73941292f743e1c22fc3788df3d12dd 
>   clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  1651e75dedf32931eeff75f3ae6ef23db37acdc3 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> 721e7d3f53247f5ae1ea4315fb3c466a94880b59 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
> f020aaa05525153f39dfda187f0c8174f83a6b95 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> 5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  94e9d376235b3288836807d8e8d2547b3743aad5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  16c807c01628b9408dcf20ca946373927246f7b0 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 995f89f25b621484ddc3f3e4779ab7446a20124f 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 4b67f7025fb613344ad65903f7bc8e3f61b165b4 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  13237fd72da5448a3d596b882fef141f336f827d 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467ab 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 

Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira


> On March 26, 2015, 12:47 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java,
> >  line 53
> > 
> >
> > Cluster may not serialize properly with those nulls in PartitionInfo.
> 
> Gwen Shapira wrote:
> Oh wow, great catch. Added tests and fixed the serialization issues.

I'm hitting a style-check error for the test: 
/Users/gshapira/workspace/kafka/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:20:1:
 Disallowed import - org.apache.kafka.common.errors.UnknownServerException.

I need an exception to test "getErrorResponse()", any idea why I'm not allowed 
to import this here? both packages are in o.a.k.common...


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77832
---


On March 26, 2015, 1:49 a.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 26, 2015, 1:49 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1927-v2
> 
> 
> made getErrorResponse required for requests by adding another abstract class
> 
> 
> added serialization tests for error responses and fixed related issues
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 88c3b2425e42d9fc9c716a8e093d2ff1a12e28dd 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> 321da8afc73941292f743e1c22fc3788df3d12dd 
>   clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  1651e75dedf32931eeff75f3ae6ef23db37acdc3 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> 721e7d3f53247f5ae1ea4315fb3c466a94880b59 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
> f020aaa05525153f39dfda187f0c8174f83a6b95 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> 5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  94e9d376235b3288836807d8e8d2547b3743aad5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  16c807c01628b9408dcf20ca946373927246f7b0 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 995f89f25b621484ddc3f3e4779ab7446a20124f 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 4b67f7025fb613344ad65903f7bc8e3f61b165b4 
>   
> clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  13237fd72da5448a3d596b882fef141f336f827d 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467ab 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
> 
> Diff: https://reviews.apache.org/r/32459/diff/
> 
> 
> Testing
> ---
> 
> 

[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381206#comment-14381206
 ] 

Gwen Shapira commented on KAFKA-2044:
-

Updated reviewboard https://reviews.apache.org/r/32459/diff/
 against branch trunk

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch, KAFKA-2044_2015-03-25_16:53:05.patch, 
> KAFKA-2044_2015-03-25_18:49:24.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Attachment: KAFKA-2044_2015-03-25_18:49:24.patch

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch, KAFKA-2044_2015-03-25_16:53:05.patch, 
> KAFKA-2044_2015-03-25_18:49:24.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/
---

(Updated March 26, 2015, 1:49 a.m.)


Review request for kafka.


Bugs: KAFKA-2044
https://issues.apache.org/jira/browse/KAFKA-2044


Repository: kafka


Description (updated)
---

support requests and responses using Common api in core modules (missing files)


added error handling and factory method for requests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1927-v2


made getErrorResponse required for requests by adding another abstract class


added serialization tests for error responses and fixed related issues


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/Node.java 
88c3b2425e42d9fc9c716a8e093d2ff1a12e28dd 
  clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
321da8afc73941292f743e1c22fc3788df3d12dd 
  clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 1651e75dedf32931eeff75f3ae6ef23db37acdc3 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
721e7d3f53247f5ae1ea4315fb3c466a94880b59 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
f020aaa05525153f39dfda187f0c8174f83a6b95 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
6943878116a97c02b758d273d93976019688830e 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ebc188742fd65e5e744003b4579324874fd81a9 
  clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
94e9d376235b3288836807d8e8d2547b3743aad5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
16c807c01628b9408dcf20ca946373927246f7b0 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
995f89f25b621484ddc3f3e4779ab7446a20124f 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
4b67f7025fb613344ad65903f7bc8e3f61b165b4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32459/diff/


Testing
---


Thanks,

Gwen Shapira



Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira


> On March 26, 2015, 12:47 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java,
> >  line 53
> > 
> >
> > Cluster may not serialize properly with those nulls in PartitionInfo.

Oh wow, great catch. Added tests and fixed the serialization issues.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77832
---


On March 25, 2015, 11:53 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 25, 2015, 11:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1927-v2
> 
> 
> made getErrorResponse required for requests by adding another abstract class
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> 321da8afc73941292f743e1c22fc3788df3d12dd 
>   clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  1651e75dedf32931eeff75f3ae6ef23db37acdc3 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> 721e7d3f53247f5ae1ea4315fb3c466a94880b59 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> 5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  94e9d376235b3288836807d8e8d2547b3743aad5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  16c807c01628b9408dcf20ca946373927246f7b0 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 995f89f25b621484ddc3f3e4779ab7446a20124f 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 4b67f7025fb613344ad65903f7bc8e3f61b165b4 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467ab 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
> 
> Diff: https://reviews.apache.org/r/32459/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



[jira] [Commented] (KAFKA-1293) Mirror maker housecleaning

2015-03-25 Thread Matt Warhaftig (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381136#comment-14381136
 ] 

Matt Warhaftig commented on KAFKA-1293:
---

Thanks for pointing out KIP-14 [~becket_qin], it is a very similar proposal.

I would be happy to update KIP-14 with change details and continue its KIPs 
process - can you give me wiki edit access (username mwarhaftig) or point me 
towards an admin who can?

> Mirror maker housecleaning
> --
>
> Key: KAFKA-1293
> URL: https://issues.apache.org/jira/browse/KAFKA-1293
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1
>Reporter: Jay Kreps
>Priority: Minor
>  Labels: usability
> Attachments: KAFKA-1293.patch
>
>
> Mirror maker uses it's own convention for command-line arguments, e.g. 
> --num.producers, where everywhere else follows the unix convention like 
> --num-producers. This is annoying because when running different tools you 
> have to constantly remember whatever quirks of the person who wrote that tool.
> Mirror maker should also have a top-level wrapper script in bin/ to make tab 
> completion work and so you don't have to remember the fully qualified class 
> name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77832
---


Thanks for the new patch. A few comments below. Perhaps we can add a unit test 
to make sure that all error responses can be serialized properly.


clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java


It seems that ConsumerMetadataResponse won't work with a null broker. So we 
will have to create a fake broker as in ConsumerMetadataResponse.scala.



clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java


Perhaps we can define -l and an empty ByteBuffer as a constant.



clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java


ListOffsetResponse.PartitionData doesn't seem to work with a null offset 
list. Probably passing in an empty list?



clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java


Cluster may not serialize properly with those nulls in PartitionInfo.


- Jun Rao


On March 25, 2015, 11:53 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 25, 2015, 11:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1927-v2
> 
> 
> made getErrorResponse required for requests by adding another abstract class
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> 321da8afc73941292f743e1c22fc3788df3d12dd 
>   clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  1651e75dedf32931eeff75f3ae6ef23db37acdc3 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> 721e7d3f53247f5ae1ea4315fb3c466a94880b59 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> 5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  94e9d376235b3288836807d8e8d2547b3743aad5 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  16c807c01628b9408dcf20ca946373927246f7b0 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 995f89f25b621484ddc3f3e4779ab7446a20124f 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 4b67f7025fb613344ad65903f7bc8e3f61b165b4 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467ab 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
> 
> Diff: https://reviews.apache.org/r/32459/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



Re: Review Request 31967: Patch for KAFKA-1546

2015-03-25 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review77810
---


Thanks for the new patch. A couple of more comments.


core/src/main/scala/kafka/cluster/Partition.scala


I think the condition for both cases can now be combined.

If we maintain a lastCaughtUpTime (see the comment below) in Replica, then 
a replica r is out of sync if currentTime - r.lastCaughtUpTime > maxLagMs.

We can probably also get rid of Replica.logEndOffsetUpdateTimeMs.



core/src/main/scala/kafka/cluster/Replica.scala


Would it be simpler to instead keeping track of a lastCaughtUpTime and 
updating it every time readToEndOfEnd is true? If readToEndOfEnd is false, we 
will just leave that value untouched.


- Jun Rao


On March 25, 2015, 8:27 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> ---
> 
> (Updated March 25, 2015, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
> https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time 
> since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
> LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 
> 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 
> 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381056#comment-14381056
 ] 

Gwen Shapira commented on KAFKA-2044:
-

Updated reviewboard https://reviews.apache.org/r/32459/diff/
 against branch trunk

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch, KAFKA-2044_2015-03-25_16:53:05.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Attachment: KAFKA-2044_2015-03-25_16:53:05.patch

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch, KAFKA-2044_2015-03-25_16:53:05.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/
---

(Updated March 25, 2015, 11:53 p.m.)


Review request for kafka.


Bugs: KAFKA-2044
https://issues.apache.org/jira/browse/KAFKA-2044


Repository: kafka


Description (updated)
---

support requests and responses using Common api in core modules (missing files)


added error handling and factory method for requests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1927-v2


made getErrorResponse required for requests by adding another abstract class


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
321da8afc73941292f743e1c22fc3788df3d12dd 
  clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 1651e75dedf32931eeff75f3ae6ef23db37acdc3 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
721e7d3f53247f5ae1ea4315fb3c466a94880b59 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
6943878116a97c02b758d273d93976019688830e 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ebc188742fd65e5e744003b4579324874fd81a9 
  clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
94e9d376235b3288836807d8e8d2547b3743aad5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
16c807c01628b9408dcf20ca946373927246f7b0 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
995f89f25b621484ddc3f3e4779ab7446a20124f 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
4b67f7025fb613344ad65903f7bc8e3f61b165b4 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32459/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Attachment: KAFKA-2044_2015-03-25_16:48:49.patch

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381051#comment-14381051
 ] 

Gwen Shapira commented on KAFKA-2044:
-

Updated reviewboard https://reviews.apache.org/r/32459/diff/
 against branch trunk

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch, 
> KAFKA-2044_2015-03-25_16:48:49.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381049#comment-14381049
 ] 

Gwen Shapira commented on KAFKA-2044:
-

Updated reviewboard https://reviews.apache.org/r/32459/diff/
 against branch trunk

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/
---

(Updated March 25, 2015, 11:48 p.m.)


Review request for kafka.


Bugs: KAFKA-2044
https://issues.apache.org/jira/browse/KAFKA-2044


Repository: kafka


Description (updated)
---

support requests and responses using Common api in core modules (missing files)


added error handling and factory method for requests


KAFKA-2047; Move the stream creation into concurrent mirror maker threads; 
reviewed by Guozhang Wang


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1927-v2


made getErrorResponse required for requests by adding another abstract class


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
321da8afc73941292f743e1c22fc3788df3d12dd 
  clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 1651e75dedf32931eeff75f3ae6ef23db37acdc3 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
721e7d3f53247f5ae1ea4315fb3c466a94880b59 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
6943878116a97c02b758d273d93976019688830e 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ebc188742fd65e5e744003b4579324874fd81a9 
  clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
e5dc92e9bb2aa5e291a99a67422ba3b0b80b31f7 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
5d5f52c644e9ba3e9571c48e3e06b62edbb07fb5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
94e9d376235b3288836807d8e8d2547b3743aad5 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
16c807c01628b9408dcf20ca946373927246f7b0 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
f10c2463b53e157bc9f7ac3f017682fb3d1ace0e 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
995f89f25b621484ddc3f3e4779ab7446a20124f 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
4b67f7025fb613344ad65903f7bc8e3f61b165b4 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
4f3c4c872e144195bb4b742b802fa3b931edb534 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32459/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-2044) Support requests and responses from o.a.k.common in KafkaApis

2015-03-25 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2044:

Attachment: KAFKA-2044_2015-03-25_16:48:01.patch

> Support requests and responses from o.a.k.common in KafkaApis
> -
>
> Key: KAFKA-2044
> URL: https://issues.apache.org/jira/browse/KAFKA-2044
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-2044.patch, KAFKA-2044_2015-03-25_16:48:01.patch
>
>
> As groundwork for KIP-4 and for KAFKA-1927, we'll add some plumbing to 
> support handling of requests and responses from o.a.k.common in KafkaApis.
> This will allow us to add new Api calls just in o.a.k.conmon and to gradually 
> migrate existing requests and responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/
---

(Updated March 25, 2015, 11:48 p.m.)


Review request for kafka.


Bugs: KAFKA-2044
https://issues.apache.org/jira/browse/KAFKA-2044


Repository: kafka


Description (updated)
---

support requests and responses using Common api in core modules (missing files)


added error handling and factory method for requests


KAFKA-2047; Move the stream creation into concurrent mirror maker threads; 
reviewed by Guozhang Wang


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-1927-v2


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
 37aff6c0fd2ec2da8551aa74b166ca49b224ddd3 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
6943878116a97c02b758d273d93976019688830e 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
1ebc188742fd65e5e744003b4579324874fd81a9 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
9a71faae3138af1b4fb48125db619ddc3ad13c5a 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
3651e8603dd0ed0d2ea059786c68cf0722aa094b 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
  core/src/main/scala/kafka/api/RequestKeys.scala 
c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
55ecac285e00abf38d7131368bb46b4c4010dc87 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
4f3c4c872e144195bb4b742b802fa3b931edb534 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 

Diff: https://reviews.apache.org/r/32459/diff/


Testing
---


Thanks,

Gwen Shapira



Re: MirrorMaker improvements

2015-03-25 Thread vlad...@gmail.com
Hi Jianqjie,

I only noticed a single TCP connection between a MM process to a single
broker. Is there something I could have done to open up more connections?

TCP can actually cap before saturating the network, which is the reason for
which it is hard to utilize a high bandwidth latency link with a single TCP
connection. There is an equation that links the MSS, RTT and loss rate of
the link to the TCP achievable throughput. Notice that the link bandwidth
does not come into play, since the only way it can affect throughput is by
increasing the loss rate due to drops when the link is congested. On WAN
links, however, usually a single connection will cap (due to random losses
and high RTT), long before achieving the capacity of the link. Here is a
reference for this:
http://www.ece.virginia.edu/~mv/edu/715/lectures/TCP/padhye98modeling.pdf

Regards,
Vlad

On Wed, Mar 25, 2015 at 3:43 PM, Jiangjie Qin 
wrote:

> Hi Vlad,
>
> I am not sure I understand the congestion window part. So TCP congestion
> control will only occur when you are saturating the network. If that is
> the case, bandwidth has already become the bottleneck. But we are talking
> about network under utilization, no?
>
> Another thing is that each fetcher thread has their own BlockingChannel to
> the broker, so they have dedicated TCP connections. Could you explain more
> on the Mux?
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 2:59 PM, "vlad...@gmail.com"  wrote:
>
> >@Guozhang
> >We actually have separate topics depending on the source of the message
> >and
> >the multicast distribution group (the set of destinations). Our topics are
> >named: source_multicast-group. We do not aggregate data but we do static
> >routing based on the destination and the destination set (that is, we set
> >up a tree of mirrormakers to copy the topic from the original datacenter
> >to
> >the others). This gives us a static topology (no path failure resilience)
> >and limits the number of multicast groups (since each multicase group
> >needs
> >a different topic for every source), but for our data replication pattern
> >is a good match. It also helps that the order of writes in our system is
> >not important, so we do not need a single point of aggregation :)
> >
> >@Jun
> >The actual problem is the congestion window, I do not think that the we
> >are
> >suffering due to the transmit/receive socket buffers (we are using the
> >same
> >buffers over different links with similar RTT but different loss rates and
> >the TCP connection throughput varies a lot, this would not be the case if
> >the amount of in-flight data would be limited by buffer size). The
> >socket-level cwnd metrics also support our hypothesis and we also have
> >measured using iperf what a single connection can transport across a lossy
> >inter-DC link. Jianqie seems to be suggesting a different blocking
> >scenario, similar to head-of-line blocking because of other requests,
> >however increasing the number of fetchers will not necessarily help since
> >all fetchers will mux their request over a single TCP connection when
> >sending requests to a single broker. The TCP connection's congestion
> >window
> >will continue to be the limiting factor. I would say that the only way out
> >of this is to pool multiple TCP connections from a single consumer to a
> >broker.
> >
> >For identical mirroring, I thought that when asking for data between a
> >pair
> >of offsets the result should always be the same. Would it be possible to
> >produce also indicating the offsets where the data should go?
> >
> >Regards,
> >Vlad
> >
> >On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin  >
> >wrote:
> >
> >> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but
> >>might
> >> not be sufficient.
> >> There are actually two related factors here:
> >> 1. Pipelining TCP packets when send a single request/response.
> >> 2. Pipelining multiple requests/responses
> >> Bumping up socket.receive.buffer.bytes help with the 1) but does not
> >>help
> >> with 2).
> >>
> >> For example, consider the following scenario.
> >> RTT = 100 ms
> >> Bandwidth = 1 Gbps(128 MBps).
> >> Request size = 10KB
> >> Response size = 1MB
> >> If we only have a single fetcher which is working in a blocking way. The
> >> max number of requests we can achieve is 10 requests/sec because its
> >> restricted by the RTT. In this case, bumping up socket buffer size will
> >> not help. I think this is the situation Vlad mentioned.
> >>
> >> One option might be increase num.consumer.fetchers, so we might have
> >>more
> >> fetcher thread for a since consumer instance (due to the implementation,
> >> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
> >>
> >> One thing might worth considering is that can we enforce pipelining in
> >>new
> >> consumer like we do for new producer. Since we have correlation ID,
> >> reorder should be easily handled. I haven¹t got a chance to read the new
> >> consumer code, but I think it is worth d

Re: MirrorMaker improvements

2015-03-25 Thread Jiangjie Qin
Hi Vlad,

I am not sure I understand the congestion window part. So TCP congestion
control will only occur when you are saturating the network. If that is
the case, bandwidth has already become the bottleneck. But we are talking
about network under utilization, no?

Another thing is that each fetcher thread has their own BlockingChannel to
the broker, so they have dedicated TCP connections. Could you explain more
on the Mux?

Jiangjie (Becket) Qin

On 3/25/15, 2:59 PM, "vlad...@gmail.com"  wrote:

>@Guozhang
>We actually have separate topics depending on the source of the message
>and
>the multicast distribution group (the set of destinations). Our topics are
>named: source_multicast-group. We do not aggregate data but we do static
>routing based on the destination and the destination set (that is, we set
>up a tree of mirrormakers to copy the topic from the original datacenter
>to
>the others). This gives us a static topology (no path failure resilience)
>and limits the number of multicast groups (since each multicase group
>needs
>a different topic for every source), but for our data replication pattern
>is a good match. It also helps that the order of writes in our system is
>not important, so we do not need a single point of aggregation :)
>
>@Jun
>The actual problem is the congestion window, I do not think that the we
>are
>suffering due to the transmit/receive socket buffers (we are using the
>same
>buffers over different links with similar RTT but different loss rates and
>the TCP connection throughput varies a lot, this would not be the case if
>the amount of in-flight data would be limited by buffer size). The
>socket-level cwnd metrics also support our hypothesis and we also have
>measured using iperf what a single connection can transport across a lossy
>inter-DC link. Jianqie seems to be suggesting a different blocking
>scenario, similar to head-of-line blocking because of other requests,
>however increasing the number of fetchers will not necessarily help since
>all fetchers will mux their request over a single TCP connection when
>sending requests to a single broker. The TCP connection's congestion
>window
>will continue to be the limiting factor. I would say that the only way out
>of this is to pool multiple TCP connections from a single consumer to a
>broker.
>
>For identical mirroring, I thought that when asking for data between a
>pair
>of offsets the result should always be the same. Would it be possible to
>produce also indicating the offsets where the data should go?
>
>Regards,
>Vlad
>
>On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin 
>wrote:
>
>> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but
>>might
>> not be sufficient.
>> There are actually two related factors here:
>> 1. Pipelining TCP packets when send a single request/response.
>> 2. Pipelining multiple requests/responses
>> Bumping up socket.receive.buffer.bytes help with the 1) but does not
>>help
>> with 2).
>>
>> For example, consider the following scenario.
>> RTT = 100 ms
>> Bandwidth = 1 Gbps(128 MBps).
>> Request size = 10KB
>> Response size = 1MB
>> If we only have a single fetcher which is working in a blocking way. The
>> max number of requests we can achieve is 10 requests/sec because its
>> restricted by the RTT. In this case, bumping up socket buffer size will
>> not help. I think this is the situation Vlad mentioned.
>>
>> One option might be increase num.consumer.fetchers, so we might have
>>more
>> fetcher thread for a since consumer instance (due to the implementation,
>> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
>>
>> One thing might worth considering is that can we enforce pipelining in
>>new
>> consumer like we do for new producer. Since we have correlation ID,
>> reorder should be easily handled. I haven¹t got a chance to read the new
>> consumer code, but I think it is worth doing if we haven¹t done so.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/25/15, 9:50 AM, "Jun Rao"  wrote:
>>
>> >To amortize the long RTT across data centers, you can tune the TCP
>>window
>> >size by configuring a larger socket.receive.buffer.bytes in the
>>consumer.
>> >
>> >For the last one, it seems that you want identical mirroring. The
>>tricky
>> >thing is to figure out how to avoid duplicates when there is a
>>failure. We
>> >had some related discussion in the context of transactional messaging (
>> >
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging
>>+
>> >in+Kafka
>> >).
>> >
>> >Thanks,
>> >
>> >Jun
>> >
>> >On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com 
>> >wrote:
>> >
>> >> Dear all,
>> >>
>> >> I had a short discussion with Jay yesterday at the ACM meetup and he
>> >> suggested writing an email regarding a few possible MirrorMaker
>> >> improvements.
>> >>
>> >> At Turn, we have been using MirrorMaker for a a few months now to
>> >> asynchronously replicate our key/value store data between our
>> >>datacenters.
>> >> In a way, our system i

Re: [jira] [Commented] (KAFKA-2038) Unable to restart brokers after it went down with no space left on disk

2015-03-25 Thread Clark Haskins
Today the Kafka broker does not have any tools for moving data between
volumes on a single broker. Also it doesn't do any fancy calculation to
determine where to place new partitions, it simply does round-robin
placement across the disks.

The way we tackle this at LinkedIn is to run with a single volume that is
in RAID 10. This gets rid of any single disk size limitation, and pushes
the problem to a broker level. Kafka does have tools to move partitions
around between brokers.

Let me know if you would like more details.

-Clark



On Wed, Mar 25, 2015 at 2:48 PM, K Zakee (JIRA)  wrote:

>
> [
> https://issues.apache.org/jira/browse/KAFKA-2038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380850#comment-14380850
> ]
>
> K Zakee commented on KAFKA-2038:
> 
>
> Thanks, I was able to start the broker after manually moving some
> partitions to directories on other volumes.
>
> Wondering if we would always have to do manual steps every time we reach a
> certain disk usage on a volume/directory. Or is there a better way to
> distribute the storage.
>
> In our case, out of 40 topics, only 6 will have about 80 percent of total
> data. So what is the best way to choose config so we can avoid manual steps
> going on.
>
> > Unable to restart brokers after it went down with no space left on disk
> > ---
> >
> > Key: KAFKA-2038
> > URL: https://issues.apache.org/jira/browse/KAFKA-2038
> > Project: Kafka
> >  Issue Type: Bug
> >  Components: core
> >Affects Versions: 0.8.2.1
> >Reporter: K Zakee
> >Priority: Blocker
> >
> > What should happen if one of the log directories configured with broker
> is 100% full. Is it expected that brokers will shutdown themselves?
> > We ran into the full disk space on one of the volumes (out of 8) on each
> of 5 brokers, and brokers shutdown themselves. We still have about 60% of
> total disk space provided by 8 volumes/directories. Should n’t the brokers
> continue to function as long as they have space left on the last log
> directory.
> > In this case, how do I fix and restart the broker. Trying to restart
> also failed with fatal error.
> > Error stack traces:
> > =
> > [2015-03-21 03:12:21,433] FATAL [app=broker] [ReplicaFetcherThread-6-3]
> [ReplicaFetcherThread-6-3], Disk error while replicating data.
> (kafka.server.ReplicaFetcherThread)
> > kafka.common.KafkaStorageException: I/O exception in append to log
> ‘Topic-11'
> > at kafka.log.Log.append(Log.scala:266)
> > at
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:54)
> > at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
> > at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
> > at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
> > at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
> > at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
> > at kafka.utils.Utils$.inLock(Utils.scala:535)
> > at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
> > at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > Caused by: java.io.IOException: No space left on device
> > at sun.nio.ch.FileDispatcher.write0(Native Method)
> > at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39)
> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:69)
> > at sun.nio.ch.IOUtil.write(IOUtil.java:40)
> > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:198)
> > at
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:133)
> > at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> > at kafka.log.LogSegment.append(LogSegment.scala:85)
> > at kafka.log.Log.append(Log.scala:309)
> > ... 12 more
> > =
> > [2015-03-21 10:38:25,244] INFO [app=broker] [main] [Kafka Server 5],
> shut down completed (kafka.server.KafkaServer)
> > [2015-03-21 10:38:25,245] FATAL [app=broker] [main] Fatal error during
> KafkaServerStartable startup. Prepare to shutdown
> (kafka.server.KafkaServerStartable)
> > java.lang.Int

Re: Plan of Controlled Shutdown

2015-03-25 Thread Guozhang Wang
Mingjie,

Controlled shutdown has been fully implemented and is regularly exercised
at LinkedIn for version upgrades / rolling bounce tests, etc. As Harsha
said you just need to turn on the config in the brokers, and normal
shutting down command like control-c will trigger the controlled shutdown.

Guozhang

On Wed, Mar 25, 2015 at 12:39 PM, Harsha  wrote:

> Hi ,
>you can enable/disable controlled shutdown using
> controlled.shutdown.enable in server.properties. This is by default set to
> “true” in 0.8.2. You can go ahead do a rolling restart  and you don’t need
> ShutdownBroker command which is removed.  Regarding KAFKA-2029 talks about
> improvement to the controlled shutdown especially in case of having high
> partitions per broker which might degrade the performance as per the JIRA.
>
> Thanks,
> Harsha
>
>
> On March 25, 2015 at 11:15:21 AM, Mingjie Lai (m...@apache.org) wrote:
>
> Hi.
>
> I've been trying to figure out the best way to do kafka broker rolling
> restart, and read the controlled shutdown wiki page:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown
>
> However I cannot find this kafka.admin.ShutdownBroker class in 0.8.2:
>
> /usr/lib/kafka $ bin/kafka-run-class.sh kafka.admin.ShutdownBroker
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> kafka/admin/ShutdownBroker
> Caused by: java.lang.ClassNotFoundException: kafka.admin.ShutdownBroker
> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> Could not find the main class: kafka.admin.ShutdownBroker. Program will
> exit.
>
> Some other commands are there:
>
> /usr/lib/kafka $ bin/kafka-topics.sh --list --zookeeper zk1:2181/kafka |
> grep test
>
> test
>
> However, I also see some jira about the bug fixes for ControlledShutdown
> like https://issues.apache.org/jira/browse/KAFKA-2029. So I kind of got
> confused.
>
> What's the plan for this feature in kafka? I still see the value of it, for
> example, we can move some the leader partitions around on purpose for a
> scheduled upgrade or config update, minimize the transition as less as
> possible.
>
> Thanks,
> Mingjie
>



-- 
-- Guozhang


Re: MirrorMaker improvements

2015-03-25 Thread vlad...@gmail.com
@Guozhang
We actually have separate topics depending on the source of the message and
the multicast distribution group (the set of destinations). Our topics are
named: source_multicast-group. We do not aggregate data but we do static
routing based on the destination and the destination set (that is, we set
up a tree of mirrormakers to copy the topic from the original datacenter to
the others). This gives us a static topology (no path failure resilience)
and limits the number of multicast groups (since each multicase group needs
a different topic for every source), but for our data replication pattern
is a good match. It also helps that the order of writes in our system is
not important, so we do not need a single point of aggregation :)

@Jun
The actual problem is the congestion window, I do not think that the we are
suffering due to the transmit/receive socket buffers (we are using the same
buffers over different links with similar RTT but different loss rates and
the TCP connection throughput varies a lot, this would not be the case if
the amount of in-flight data would be limited by buffer size). The
socket-level cwnd metrics also support our hypothesis and we also have
measured using iperf what a single connection can transport across a lossy
inter-DC link. Jianqie seems to be suggesting a different blocking
scenario, similar to head-of-line blocking because of other requests,
however increasing the number of fetchers will not necessarily help since
all fetchers will mux their request over a single TCP connection when
sending requests to a single broker. The TCP connection's congestion window
will continue to be the limiting factor. I would say that the only way out
of this is to pool multiple TCP connections from a single consumer to a
broker.

For identical mirroring, I thought that when asking for data between a pair
of offsets the result should always be the same. Would it be possible to
produce also indicating the offsets where the data should go?

Regards,
Vlad

On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin 
wrote:

> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might
> not be sufficient.
> There are actually two related factors here:
> 1. Pipelining TCP packets when send a single request/response.
> 2. Pipelining multiple requests/responses
> Bumping up socket.receive.buffer.bytes help with the 1) but does not help
> with 2).
>
> For example, consider the following scenario.
> RTT = 100 ms
> Bandwidth = 1 Gbps(128 MBps).
> Request size = 10KB
> Response size = 1MB
> If we only have a single fetcher which is working in a blocking way. The
> max number of requests we can achieve is 10 requests/sec because its
> restricted by the RTT. In this case, bumping up socket buffer size will
> not help. I think this is the situation Vlad mentioned.
>
> One option might be increase num.consumer.fetchers, so we might have more
> fetcher thread for a since consumer instance (due to the implementation,
> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
>
> One thing might worth considering is that can we enforce pipelining in new
> consumer like we do for new producer. Since we have correlation ID,
> reorder should be easily handled. I haven¹t got a chance to read the new
> consumer code, but I think it is worth doing if we haven¹t done so.
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 9:50 AM, "Jun Rao"  wrote:
>
> >To amortize the long RTT across data centers, you can tune the TCP window
> >size by configuring a larger socket.receive.buffer.bytes in the consumer.
> >
> >For the last one, it seems that you want identical mirroring. The tricky
> >thing is to figure out how to avoid duplicates when there is a failure. We
> >had some related discussion in the context of transactional messaging (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >in+Kafka
> >).
> >
> >Thanks,
> >
> >Jun
> >
> >On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com 
> >wrote:
> >
> >> Dear all,
> >>
> >> I had a short discussion with Jay yesterday at the ACM meetup and he
> >> suggested writing an email regarding a few possible MirrorMaker
> >> improvements.
> >>
> >> At Turn, we have been using MirrorMaker for a a few months now to
> >> asynchronously replicate our key/value store data between our
> >>datacenters.
> >> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
> >> clusters and MirrorMaker as its building blocks. Our overall message
> >>rate
> >> peaks at about 650K/sec and, when pushing data over high bandwidth delay
> >> product links, we have found some minor bottlenecks.
> >>
> >> The MirrorMaker process uses a standard consumer to pull data from a
> >>remote
> >> datacenter. This implies that it opens a single TCP connection to each
> >>of
> >> the remote brokers and muxes requests for different topics and
> >>partitions
> >> over this connection. While this is a good thing in terms of maintaining
> >> the congestion window

Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Jun Rao


> On March 25, 2015, 9:43 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java,
> >  lines 46-54
> > 
> >
> > Instead of adding this method here, would it be better to add it to an 
> > abstract subclass AbstractRequest? All requests will now to a subclass of 
> > AbstractRequest and they will be forced to implement this method.
> 
> Gwen Shapira wrote:
> This means adding a handleError to all Requests in this patch (even 
> though the method will not be used until the request is actually used within 
> KafkaApis).
> If you think its safer (and I agree it may be), I'll go ahead and do this.

Yes, it does mean that we will add this method to all existing requests. I 
think this makes the code a bit cleaner and safer.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77805
---


On March 24, 2015, 10:39 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 24, 2015, 10:39 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
>  37aff6c0fd2ec2da8551aa74b166ca49b224ddd3 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467ab 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
> 
> Diff: https://reviews.apache.org/r/32459/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



Re: Review Request 31606: Patch for KAFKA-1416

2015-03-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31606/#review77804
---



core/src/test/scala/unit/kafka/integration/FetcherTest.scala


We can get rid of this function but just use

sendMessages(..).size



core/src/test/scala/unit/kafka/integration/FetcherTest.scala


Import TestUtils.sendMessages



core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala


Can we use TestUtils.sendMessages to replace this function?



core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala


Can we get rid of this function since it is only called once, and hence we 
can just put the logic there?



core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala


We can import TestUtils.getMessages in this class.



core/src/test/scala/unit/kafka/utils/TestUtils.scala


Can we replace this function also?



core/src/test/scala/unit/kafka/utils/TestUtils.scala


Compression code is no longer used anymore, which seems not correct?


A few more general comments:

1. Could you rebase the patch?
2. Could we also replace LogRecoverTest.sendMessages?
3. Could we also replace TestLogCleaning.produceMessages?
4. Could we also replace UncleanLeaderElectionTest.produceMessage?

- Guozhang Wang


On March 2, 2015, 1:25 a.m., Flutra Osmani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31606/
> ---
> 
> (Updated March 2, 2015, 1:25 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1416
> https://issues.apache.org/jira/browse/KAFKA-1416
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Initial Patch
> 
> 
> Diffs
> -
> 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> a17e8532c44aadf84b8da3a57bcc797a848b5020 
>   core/src/test/scala/unit/kafka/integration/FetcherTest.scala 
> 25845abbcad2e79f56f729e59239b738d3ddbc9d 
>   core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
> ba3bcdcd1de9843e75e5395dff2fc31b39a5a9d5 
>   
> core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
>  d6248b09bb0f86ee7d3bd0ebce5b99135491453b 
>   core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
> 111e4a26c1efb6f7c151ca9217dbe107c27ab617 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 6ce18076f6b5deb05b51c25be5bed9957e6b4339 
> 
> Diff: https://reviews.apache.org/r/31606/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Flutra Osmani
> 
>



[jira] [Commented] (KAFKA-2038) Unable to restart brokers after it went down with no space left on disk

2015-03-25 Thread K Zakee (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380850#comment-14380850
 ] 

K Zakee commented on KAFKA-2038:


Thanks, I was able to start the broker after manually moving some partitions to 
directories on other volumes. 

Wondering if we would always have to do manual steps every time we reach a 
certain disk usage on a volume/directory. Or is there a better way to 
distribute the storage. 

In our case, out of 40 topics, only 6 will have about 80 percent of total data. 
So what is the best way to choose config so we can avoid manual steps going on.

> Unable to restart brokers after it went down with no space left on disk
> ---
>
> Key: KAFKA-2038
> URL: https://issues.apache.org/jira/browse/KAFKA-2038
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: K Zakee
>Priority: Blocker
>
> What should happen if one of the log directories configured with broker is 
> 100% full. Is it expected that brokers will shutdown themselves?
> We ran into the full disk space on one of the volumes (out of 8) on each of 5 
> brokers, and brokers shutdown themselves. We still have about 60% of total 
> disk space provided by 8 volumes/directories. Should n’t the brokers continue 
> to function as long as they have space left on the last log directory.
> In this case, how do I fix and restart the broker. Trying to restart also 
> failed with fatal error.
> Error stack traces:
> =
> [2015-03-21 03:12:21,433] FATAL [app=broker] [ReplicaFetcherThread-6-3] 
> [ReplicaFetcherThread-6-3], Disk error while replicating data. 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaStorageException: I/O exception in append to log ‘Topic-11'
> at kafka.log.Log.append(Log.scala:266)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:54)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
> at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> Caused by: java.io.IOException: No space left on device
> at sun.nio.ch.FileDispatcher.write0(Native Method)
> at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:69)
> at sun.nio.ch.IOUtil.write(IOUtil.java:40)
> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:198)
> at 
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:133)
> at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> at kafka.log.LogSegment.append(LogSegment.scala:85)
> at kafka.log.Log.append(Log.scala:309)
> ... 12 more
> =
> [2015-03-21 10:38:25,244] INFO [app=broker] [main] [Kafka Server 5], shut 
> down completed (kafka.server.KafkaServer)
> [2015-03-21 10:38:25,245] FATAL [app=broker] [main] Fatal error during 
> KafkaServerStartable startup. Prepare to shutdown 
> (kafka.server.KafkaServerStartable)
> java.lang.InternalError: a fault occurred in a recent unsafe memory access 
> operation in compiled Java code
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:39)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:312)
> at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188)
> at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165)
> at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> at kafka.log.LogSegment.recover(LogSegment.scala:175)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:162)
> at

Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Gwen Shapira


> On March 25, 2015, 9:43 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java,
> >  lines 46-54
> > 
> >
> > Instead of adding this method here, would it be better to add it to an 
> > abstract subclass AbstractRequest? All requests will now to a subclass of 
> > AbstractRequest and they will be forced to implement this method.

This means adding a handleError to all Requests in this patch (even though the 
method will not be used until the request is actually used within KafkaApis).
If you think its safer (and I agree it may be), I'll go ahead and do this.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77805
---


On March 24, 2015, 10:39 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 24, 2015, 10:39 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
>  37aff6c0fd2ec2da8551aa74b166ca49b224ddd3 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467ab 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
> 
> Diff: https://reviews.apache.org/r/32459/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



Re: Review Request 32459: Patch for KAFKA-2044

2015-03-25 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32459/#review77805
---


Thanks for the patch. Looks good to me overall. A few comments below.


clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java


Instead of adding this method here, would it be better to add it to an 
abstract subclass AbstractRequest? All requests will now to a subclass of 
AbstractRequest and they will be forced to implement this method.



core/src/main/scala/kafka/network/RequestChannel.scala


Need identation.



core/src/main/scala/kafka/network/RequestChannel.scala


Need identation and be consistent with the previous statement.



core/src/main/scala/kafka/network/RequestChannel.scala


Ditto as the above.


- Jun Rao


On March 24, 2015, 10:39 p.m., Gwen Shapira wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32459/
> ---
> 
> (Updated March 24, 2015, 10:39 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2044
> https://issues.apache.org/jira/browse/KAFKA-2044
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> support requests and responses using Common api in core modules (missing 
> files)
> 
> 
> added error handling and factory method for requests
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
>  37aff6c0fd2ec2da8551aa74b166ca49b224ddd3 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 6943878116a97c02b758d273d93976019688830e 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> 1ebc188742fd65e5e744003b4579324874fd81a9 
>   core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala 
> f168d9fc99ce51b8b41b7f7db2a06f371b1a44e5 
>   core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala 
> 9a71faae3138af1b4fb48125db619ddc3ad13c5a 
>   core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala 
> 3651e8603dd0ed0d2ea059786c68cf0722aa094b 
>   core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala 
> d0f07e0cbbdacf9ff8287e901ecabde3921bbab3 
>   core/src/main/scala/kafka/api/RequestKeys.scala 
> c24c0345feedc7b9e2e9f40af11bfa1b8d328c43 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> 55ecac285e00abf38d7131368bb46b4c4010dc87 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 7b1db3dbbb2c0676f166890f566c14aa248467ab 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> fba852afa1b2f46b61e2fd12c38c821ba04e9cc6 
> 
> Diff: https://reviews.apache.org/r/32459/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>



Jenkins build is back to normal : KafkaPreCommit #45

2015-03-25 Thread Apache Jenkins Server
See 



[jira] [Commented] (KAFKA-2050) Avoid calling .size() on java.util.ConcurrentLinkedQueue

2015-03-25 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380809#comment-14380809
 ] 

Sriharsha Chintalapani commented on KAFKA-2050:
---

[~timbrooks] changes looks good to me. But can you send the patch using 
kafka-patch-review.py 
https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review

> Avoid calling .size() on java.util.ConcurrentLinkedQueue
> 
>
> Key: KAFKA-2050
> URL: https://issues.apache.org/jira/browse/KAFKA-2050
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Tim Brooks
>Assignee: Jun Rao
> Attachments: dont_call_queue_size.patch
>
>
> Generally, it seems to be preferred to avoid calling .size() on a Java 
> ConcurrentLinkedQueue. This is an O(N) operation as it must iterate through 
> all the nodes.
> Calling this every time through the loop makes this issue worse under high 
> load. It seems like the same functionality can be attained by just polling 
> and checking for null.
> This is more imperative and less functional, but it seems alright since this 
> is in lower-level networking code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31967: Patch for KAFKA-1546

2015-03-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review77799
---



core/src/main/scala/kafka/cluster/Replica.scala


Instead of pass these fiels of logReadResult all the way up to the replica, 
I think we can just push this comparison logic down to 
replicaManager.readFromLocalLog around line 507, and just keep a boolean in the 
logReadResult like isReadFromLogEnd. That value can then be passed all the way 
up to Replica as sth. like isLEOCaughtUp.

Also we can change the names of updateReplicaLEO and updateFollowerLEOs as 
they now carries the isLEOCaughtUp information as well as LEOs.


- Guozhang Wang


On March 25, 2015, 8:27 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> ---
> 
> (Updated March 25, 2015, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
> https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time 
> since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
> LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 
> 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 
> 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



Build failed in Jenkins: Kafka-trunk #433

2015-03-25 Thread Apache Jenkins Server
See 

Changes:

[junrao] kafka-527; Compression support does numerous byte copies; patched by 
Yasuhiro Matsuda; reviewed by Guozhang Wang and Jun Rao

[wangguoz] KAFKA-2047; Move the stream creation into concurrent mirror maker 
threads; reviewed by Guozhang Wang

--
[...truncated 706 lines...]
kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.log.FileMessageSetTest > testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest > testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest > testSizeInBytes PASSED

kafka.log.FileMessageSetTest > testWriteTo PASSED

kafka.log.FileMessageSetTest > testFileSize PASSED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest > testRead PASSED

kafka.log.FileMessageSetTest > testSearch PASSED

kafka.log.FileMessageSetTest > testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest > testTruncate PASSED

kafka.log.LogConfigTest > testFromPropsDefaults PASSED

kafka.log.LogConfigTest > testFromPropsEmpty PASSED

kafka.log.LogConfigTest > testFromPropsToProps PASSED

kafka.log.LogConfigTest > testFromPropsInvalid PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest PASSED

kafka.log.LogManagerTest > testCreateLog PASSED

kafka.log.LogManagerTest > testGetNonExistentLog PASSED

kafka.log.LogManagerTest > testCleanupExpiredSegments PASSED

kafka.log.LogManagerTest > testCleanupSegmentsToMaintainSize PASSED

kafka.log.LogManagerTest > testTimeBasedFlush PASSED

kafka.log.LogManagerTest > testLeastLoadedAssignment PASSED

kafka.log.LogManagerTest > testTwoLogManagersUsingSameDirFails PASSED

kafka.log.LogManagerTest > testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testReadOutOfRange PASSED

kafka.log.LogTest > testLogRolls PASSED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testCompactedTopicConstraints PASSED

kafka.log.LogTest > testMessageSizeCheck PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testIndexRebuild PASSED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testAsyncDelete PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTe

[jira] [Updated] (KAFKA-2050) Avoid calling .size() on java.util.ConcurrentLinkedQueue

2015-03-25 Thread Tim Brooks (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Brooks updated KAFKA-2050:
--
Attachment: dont_call_queue_size.patch

> Avoid calling .size() on java.util.ConcurrentLinkedQueue
> 
>
> Key: KAFKA-2050
> URL: https://issues.apache.org/jira/browse/KAFKA-2050
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Tim Brooks
>Assignee: Jun Rao
> Attachments: dont_call_queue_size.patch
>
>
> Generally, it seems to be preferred to avoid calling .size() on a Java 
> ConcurrentLinkedQueue. This is an O(N) operation as it must iterate through 
> all the nodes.
> Calling this every time through the loop makes this issue worse under high 
> load. It seems like the same functionality can be attained by just polling 
> and checking for null.
> This is more imperative and less functional, but it seems alright since this 
> is in lower-level networking code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2050) Avoid calling .size() on java.util.ConcurrentLinkedQueue

2015-03-25 Thread Tim Brooks (JIRA)
Tim Brooks created KAFKA-2050:
-

 Summary: Avoid calling .size() on java.util.ConcurrentLinkedQueue
 Key: KAFKA-2050
 URL: https://issues.apache.org/jira/browse/KAFKA-2050
 Project: Kafka
  Issue Type: Bug
  Components: network
Reporter: Tim Brooks
Assignee: Jun Rao


Generally, it seems to be preferred to avoid calling .size() on a Java 
ConcurrentLinkedQueue. This is an O(N) operation as it must iterate through all 
the nodes.

Calling this every time through the loop makes this issue worse under high 
load. It seems like the same functionality can be attained by just polling and 
checking for null.

This is more imperative and less functional, but it seems alright since this is 
in lower-level networking code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2046) Delete topic still doesn't work

2015-03-25 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380776#comment-14380776
 ] 

Onur Karaman commented on KAFKA-2046:
-

It looks like I hit a deadlock yesterday between the DeleteTopicsThread and 
RequestSendThread on the controller when controller.message.queue.size was 
small.

When the blocking queue shared between DeleteTopicsThread and RequestSendThread 
is full, the DeleteTopicsThread puts will be blocked while holding the 
controller lock until the RequestSendThread takes items from the queue, but the 
RequestSendThread runs a callback after sending a request which needs the 
controller lock in order to finish processing a request, causing the hang. 
Delete topic performs a state transition to ReplicaDeletionStarted, and this 
state transition involves a callback (deleteTopicStopReplicaCallback) that 
waits on the controller lock.

This explains why I had only seen one replica from grep "handling stop replica 
(delete=true)" kafka-state-change.log, as it hanged on the callback of that 
replica's transition to ReplicaDeletionStarted.

Bumping up the controller.message.queue.size doesn't get rid of the deadlock 
but should make it less common. I think this should only be considered a 
temporary fix, as tweaking a config value shouldn't decide whether or not we 
hit a deadlock.

Here are snippets of the thread dump:
{code}
"Controller-xyz-to-broker-abc-send-thread" ...
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
...
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
...
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at kafka.utils.Utils$.inLock(Utils.scala:564)
at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:371)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338)
at 
kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:231)
at 
kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:231)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:161)

"delete-topics-thread-xyz" ...
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
...
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
at 
kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
...
at 
kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:310)
at 
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:115)
...
at 
kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:327)
at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:360)
...
at 
kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:305)
...
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390)
at kafka.utils.Utils$.inLock(Utils.scala:566)
at 
kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
{code}

> Delete topic still doesn't work
> ---
>
> Key: KAFKA-2046
> URL: https://issues.apache.org/jira/browse/KAFKA-2046
> Project: Kafka
>  Issue Type: Bug
>Reporter: Clark Haskins
>Assignee: Onur Karaman
>
> I just attempted to delete at 128 partition topic with all inbound producers 
> stopped.
> The result was as follows:
> The /admin/delete_topics znode was empty
> the topic under /brokers/topics was removed
> The Kafka topics command showed that the topic was removed
> However, the data on disk on each broker was not deleted and the topic has 
> not yet been re-created by starting up the inbound mirror maker.
> Let me know what additiona

[jira] [Commented] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-25 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380758#comment-14380758
 ] 

Guozhang Wang commented on KAFKA-2047:
--

Thanks for the patch, +1 and committed to trunk.

> Accelarate consumer rebalance in mirror maker.
> --
>
> Key: KAFKA-2047
> URL: https://issues.apache.org/jira/browse/KAFKA-2047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2047.patch, KAFKA-2047_2015-03-25_13:48:34.patch
>
>
> In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
> longer because there are more zookeeper consumer connectors doing rebalance 
> serially. Rebalance would be faster if the bootstrap of 
> ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-25 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2047:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Accelarate consumer rebalance in mirror maker.
> --
>
> Key: KAFKA-2047
> URL: https://issues.apache.org/jira/browse/KAFKA-2047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2047.patch, KAFKA-2047_2015-03-25_13:48:34.patch
>
>
> In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
> longer because there are more zookeeper consumer connectors doing rebalance 
> serially. Rebalance would be faster if the bootstrap of 
> ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32465: Patch for KAFKA-2047

2015-03-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32465/#review77794
---



core/src/main/scala/kafka/tools/MirrorMaker.scala


Also here: "just one"


- Guozhang Wang


On March 25, 2015, 8:48 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32465/
> ---
> 
> (Updated March 25, 2015, 8:48 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2047
> https://issues.apache.org/jira/browse/KAFKA-2047
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comment.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 4f3c4c872e144195bb4b742b802fa3b931edb534 
> 
> Diff: https://reviews.apache.org/r/32465/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 32465: Patch for KAFKA-2047

2015-03-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32465/#review77791
---

Ship it!


Will make the minor change upon checkin.


core/src/main/scala/kafka/tools/MirrorMaker.scala


Changing the comment to "Mirror maker thread ..."


- Guozhang Wang


On March 25, 2015, 8:48 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32465/
> ---
> 
> (Updated March 25, 2015, 8:48 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2047
> https://issues.apache.org/jira/browse/KAFKA-2047
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comment.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 4f3c4c872e144195bb4b742b802fa3b931edb534 
> 
> Diff: https://reviews.apache.org/r/32465/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Updated] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-25 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2047:

Attachment: KAFKA-2047_2015-03-25_13:48:34.patch

> Accelarate consumer rebalance in mirror maker.
> --
>
> Key: KAFKA-2047
> URL: https://issues.apache.org/jira/browse/KAFKA-2047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2047.patch, KAFKA-2047_2015-03-25_13:48:34.patch
>
>
> In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
> longer because there are more zookeeper consumer connectors doing rebalance 
> serially. Rebalance would be faster if the bootstrap of 
> ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32465: Patch for KAFKA-2047

2015-03-25 Thread Jiangjie Qin


> On March 25, 2015, 8:37 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 325
> > 
> >
> > Wondering if we call System.exit(-1) directly from one of the MM 
> > thread, if we have some shutdown hook in the container will that cause 
> > deadlock?

Good point. Actually we can just put everything into the big try/catch/finally 
block.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32465/#review77789
---


On March 25, 2015, 8:48 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32465/
> ---
> 
> (Updated March 25, 2015, 8:48 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2047
> https://issues.apache.org/jira/browse/KAFKA-2047
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comment.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 4f3c4c872e144195bb4b742b802fa3b931edb534 
> 
> Diff: https://reviews.apache.org/r/32465/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Commented] (KAFKA-2047) Accelarate consumer rebalance in mirror maker.

2015-03-25 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380737#comment-14380737
 ] 

Jiangjie Qin commented on KAFKA-2047:
-

Updated reviewboard https://reviews.apache.org/r/32465/diff/
 against branch origin/trunk

> Accelarate consumer rebalance in mirror maker.
> --
>
> Key: KAFKA-2047
> URL: https://issues.apache.org/jira/browse/KAFKA-2047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2047.patch, KAFKA-2047_2015-03-25_13:48:34.patch
>
>
> In the new mirror maker in KAFKA-1997, the bootstrap rebalance time became 
> longer because there are more zookeeper consumer connectors doing rebalance 
> serially. Rebalance would be faster if the bootstrap of 
> ZookeeperConsumerConnectors are parallelized.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32465: Patch for KAFKA-2047

2015-03-25 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32465/
---

(Updated March 25, 2015, 8:48 p.m.)


Review request for kafka.


Bugs: KAFKA-2047
https://issues.apache.org/jira/browse/KAFKA-2047


Repository: kafka


Description (updated)
---

Addressed Guozhang's comment.


Diffs (updated)
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
4f3c4c872e144195bb4b742b802fa3b931edb534 

Diff: https://reviews.apache.org/r/32465/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 32465: Patch for KAFKA-2047

2015-03-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32465/#review77789
---



core/src/main/scala/kafka/tools/MirrorMaker.scala


Wondering if we call System.exit(-1) directly from one of the MM thread, if 
we have some shutdown hook in the container will that cause deadlock?


- Guozhang Wang


On March 24, 2015, 11:42 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32465/
> ---
> 
> (Updated March 24, 2015, 11:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2047
> https://issues.apache.org/jira/browse/KAFKA-2047
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Fix for KAFKA-2047 Accelarate consumer bootstrap consumer rebalance in mirror 
> maker.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 4f3c4c872e144195bb4b742b802fa3b931edb534 
> 
> Diff: https://reviews.apache.org/r/32465/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2015-03-25 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-527:

Attachment: KAFKA-527_2015-03-25_13:26:36.patch

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, KAFKA-527_2015-03-19_21:32:24.patch, 
> KAFKA-527_2015-03-25_12:08:00.patch, KAFKA-527_2015-03-25_13:26:36.patch, 
> java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2015-03-25 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380718#comment-14380718
 ] 

Guozhang Wang commented on KAFKA-2045:
--

Agreed, and I remember we did have some discussions regarding (1) and proposed 
(b).

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2015-03-25 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380706#comment-14380706
 ] 

Jay Kreps commented on KAFKA-527:
-

Yeah it would be great to get the [~guozhang]'s patch in as well and be able to 
summarize the improvement from the producer's point of view: that is repeat the 
perf test Yasuhiro did but using the producer performance test so it is more 
representative of what the user will actually see. That would be a nice tidbit 
to have for release notes on the next release.

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, KAFKA-527_2015-03-19_21:32:24.patch, 
> KAFKA-527_2015-03-25_12:08:00.patch, KAFKA-527_2015-03-25_13:26:36.patch, 
> java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1546) Automate replica lag tuning

2015-03-25 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-1546:
-
Attachment: KAFKA-1546_2015-03-25_13:27:40.patch

> Automate replica lag tuning
> ---
>
> Key: KAFKA-1546
> URL: https://issues.apache.org/jira/browse/KAFKA-1546
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
>Reporter: Neha Narkhede
>Assignee: Aditya Auradkar
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
> KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch, 
> KAFKA-1546_2015-03-17_14:46:10.patch, KAFKA-1546_2015-03-25_13:27:40.patch
>
>
> Currently, there is no good way to tune the replica lag configs to 
> automatically account for high and low volume topics on the same cluster. 
> For the low-volume topic it will take a very long time to detect a lagging
> replica, and for the high-volume topic it will have false-positives.
> One approach to making this easier would be to have the configuration
> be something like replica.lag.max.ms and translate this into a number
> of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31967: Patch for KAFKA-1546

2015-03-25 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
---

(Updated March 25, 2015, 8:27 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description
---

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time 
since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the 
LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Addressing Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala 
bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 
06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 
26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
---


Thanks,

Aditya Auradkar



[jira] [Commented] (KAFKA-1546) Automate replica lag tuning

2015-03-25 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380696#comment-14380696
 ] 

Aditya A Auradkar commented on KAFKA-1546:
--

Updated reviewboard https://reviews.apache.org/r/31967/diff/
 against branch origin/trunk

> Automate replica lag tuning
> ---
>
> Key: KAFKA-1546
> URL: https://issues.apache.org/jira/browse/KAFKA-1546
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
>Reporter: Neha Narkhede
>Assignee: Aditya Auradkar
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
> KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch, 
> KAFKA-1546_2015-03-17_14:46:10.patch, KAFKA-1546_2015-03-25_13:27:40.patch
>
>
> Currently, there is no good way to tune the replica lag configs to 
> automatically account for high and low volume topics on the same cluster. 
> For the low-volume topic it will take a very long time to detect a lagging
> replica, and for the high-volume topic it will have false-positives.
> One approach to making this easier would be to have the configuration
> be something like replica.lag.max.ms and translate this into a number
> of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31816: Fix decompression regarding KAFKA-572

2015-03-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31816/
---

(Updated March 25, 2015, 8:26 p.m.)


Review request for kafka.


Summary (updated)
-

Fix decompression regarding KAFKA-572


Bugs: KAFKA-527
https://issues.apache.org/jira/browse/KAFKA-527


Repository: kafka


Description (updated)
---

Incorporated Jun and Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
78fbf75651583e390258af2d9f09df6911a97b59 
  core/src/main/scala/kafka/log/LogSegment.scala 
ac9643423a28d189133705ba69b16cfce23f0049 
  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
9c694719dc9b515fb3c3ae96435a87b334044272 
  core/src/main/scala/kafka/tools/DumpLogSegments.scala 
fe2cc11b75f370beb9cb87ebc9ed01b63fd65f87 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
8cd5f2fa4a1a536c3983c5b6eac3d80de49d5a94 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
b5208a5f1186bc089cd89527c1eb7f95b2e76c75 

Diff: https://reviews.apache.org/r/31816/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2015-03-25 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380693#comment-14380693
 ] 

Guozhang Wang commented on KAFKA-527:
-

Updated reviewboard https://reviews.apache.org/r/31816/diff/
 against branch origin/trunk

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, KAFKA-527_2015-03-19_21:32:24.patch, 
> KAFKA-527_2015-03-25_12:08:00.patch, KAFKA-527_2015-03-25_13:26:36.patch, 
> java.hprof.no-compression.txt, java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32440: Patch for KAFKA-2043

2015-03-25 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/#review77787
---

Ship it!


- Mayuresh Gharat


On March 25, 2015, 6:29 p.m., Grant Henke wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32440/
> ---
> 
> (Updated March 25, 2015, 6:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2043
> https://issues.apache.org/jira/browse/KAFKA-2043
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Promote compressionType to class variable
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> ab263423ff1d33170effb71acdef3fc501fa072a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  24274a64885fadd0e9318de2beb232218ddd52cd 
> 
> Diff: https://reviews.apache.org/r/32440/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Grant Henke
> 
>



[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2015-03-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380657#comment-14380657
 ] 

Jun Rao commented on KAFKA-527:
---

[~ymatsuda], thanks for the patch. +1 and committed to trunk. Leave this jira 
open for the other patch from Guozhang.

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, KAFKA-527_2015-03-19_21:32:24.patch, 
> KAFKA-527_2015-03-25_12:08:00.patch, java.hprof.no-compression.txt, 
> java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-03-25 Thread Parth Brahmbhatt
Hi all,

I have modified the KIP to reflect the recent change request from the
reviewers. I have been working on the code and I have the server side code
for authorization ready. I am now modifying the command line utilities. I
would really appreciate if some of the committers can spend sometime to
review the KIP so we can make progress on this.

Thanks
Parth

On 3/18/15, 2:20 PM, "Michael Herstine" 
wrote:

>Hi Parth,
>
>Thanks! A few questions:
>
>1. Do you want to permit rules in your ACLs that DENY access as well as
>ALLOW? This can be handy setting up rules that have exceptions. E.g.
>“Allow principal P to READ resource R from all hosts” with “Deny principal
>P READ access to resource R from host H1” in combination would allow P to
>READ R from all hosts *except* H1.
>
>2. When a topic is newly created, will there be an ACL created for it? If
>not, would that not deny subsequent access to it?
>
>(nit) Maybe use Principal instead of String to represent principals?
>
>
>On 3/9/15, 11:48 AM, "Don Bosco Durai"  wrote:
>
>>Parth
>>
>>Overall it is looking good. Couple of questionsŠ
>>
>>- Can you give an example how the policies will look like in the default
>>implementation?
>>- In the operations, can we support ³CONNECT² also? This can be used
>>during Session connection
>>- Regarding access control for ³Topic Creation², since we can¹t do it on
>>the server side, can we de-scope it for? And plan it as a future feature
>>request?
>>
>>Thanks
>>
>>Bosco
>>
>> 
>>
>>On 3/6/15, 8:10 AM, "Harsha"  wrote:
>>
>>>Hi Parth,
>>>Thanks for putting this together. Overall it looks good to
>>>me. Although AdminUtils is a concern KIP-4  can probably fix
>>>that part.
>>>Thanks,
>>>Harsha
>>>
>>>On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
 Forgot to add links to wiki and jira.
 
 Link to wiki:
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorizatio
n
+
Interface
 Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688
 
 Thanks
 Parth
 
 From: Parth Brahmbhatt
 mailto:pbrahmbh...@hortonworks.com>>
 Date: Thursday, March 5, 2015 at 10:33 AM
 To: "dev@kafka.apache.org"
 mailto:dev@kafka.apache.org>>
 Subject: [DISCUSS] KIP-11- Authorization design for kafka security
 
 Hi,
 
 KIP-11 is open for discussion , I have updated the wiki with the
design
 and open questions.
 
 Thanks
 Parth
>>
>>
>



Re: kafka system tests

2015-03-25 Thread Geoffrey Anderson
Hi Gwen,

Sorry about that, the ducttape repository was not yet public, but now it is.

Cheers,
Geoff

On Wed, Mar 25, 2015 at 12:08 PM, Gwen Shapira 
wrote:

> Thanks for summarizing! I think we are all feeling the pain here and want
> to make life easier moving forward.
>
> The framework discussion is particularly interesting - unfortunately, the
> link to ducttape is broken at the moment.
>
> Gwen
>
> On Wed, Mar 25, 2015 at 11:46 AM, Geoffrey Anderson 
> wrote:
>
> > Hi dev list,
> >
> > I've been discussing the current state of system tests with Jun and
> others,
> > and have summarized goals moving forward at:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/System+Test+Improvements
> >
> > Feedback is welcome!
> >
> > Thanks,
> > Geoff
> >
>


Re: Plan of Controlled Shutdown

2015-03-25 Thread Harsha
Hi ,
       you can enable/disable controlled shutdown using 
controlled.shutdown.enable in server.properties. This is by default set to 
“true” in 0.8.2. You can go ahead do a rolling restart  and you don’t need 
ShutdownBroker command which is removed.  Regarding KAFKA-2029 talks about 
improvement to the controlled shutdown especially in case of having high 
partitions per broker which might degrade the performance as per the JIRA.

Thanks,
Harsha


On March 25, 2015 at 11:15:21 AM, Mingjie Lai (m...@apache.org) wrote:

Hi.  

I've been trying to figure out the best way to do kafka broker rolling  
restart, and read the controlled shutdown wiki page:  

https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown
  

However I cannot find this kafka.admin.ShutdownBroker class in 0.8.2:  

/usr/lib/kafka $ bin/kafka-run-class.sh kafka.admin.ShutdownBroker  

Exception in thread "main" java.lang.NoClassDefFoundError:  
kafka/admin/ShutdownBroker  
Caused by: java.lang.ClassNotFoundException: kafka.admin.ShutdownBroker  
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)  
at java.security.AccessController.doPrivileged(Native Method)  
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)  
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)  
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)  
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)  
Could not find the main class: kafka.admin.ShutdownBroker. Program will  
exit.  

Some other commands are there:  

/usr/lib/kafka $ bin/kafka-topics.sh --list --zookeeper zk1:2181/kafka |  
grep test  

test  

However, I also see some jira about the bug fixes for ControlledShutdown  
like https://issues.apache.org/jira/browse/KAFKA-2029. So I kind of got  
confused.  

What's the plan for this feature in kafka? I still see the value of it, for  
example, we can move some the leader partitions around on purpose for a  
scheduled upgrade or config update, minimize the transition as less as  
possible.  

Thanks,  
Mingjie  


Producer Behavior When one or more Brokers' Disk is Full.

2015-03-25 Thread Bhavesh Mistry
Hello Kafka Community,



What is expected behavior on Producer side when one or more Brokers’  disk
is full, but have not reached retention period for topics (by size or by
time limit).



Does producer send data to that particular brokers and/or Producer Queue
gets full and always throws  Queue Full  or based on configuration (I have
producer with non-blocking setting when queue is full and ack are 0,1 and
retries set to 3).



What is expected behavior on OLD [Scala Based] vs Pure Java Based Producer ?


Here is reference to past discussion:
http://grokbase.com/t/kafka/users/147h4958k8/how-to-recover-from-a-disk-full-situation-in-kafka-cluster


Is there wiki or cookbook steps to recover from such situation ?



Thanks,

Bhavesh


Re: C++ Client Library -- libkafka-asio

2015-03-25 Thread Daniel Joos
Hi,

Thanks for your comment and thank you for adding the library to the wiki
page.

I haven't done any measurements regarding the performance of the
library, yet.
This, plus completing the documentation, are the missing parts before
version 0.1.0.
Of course, compression also needs to be added at some time, but it
kind-of breaks my "header-only" approach for this library.

Best regards,
Daniel



[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2015-03-25 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380547#comment-14380547
 ] 

Yasuhiro Matsuda commented on KAFKA-527:


Updated reviewboard https://reviews.apache.org/r/31742/diff/
 against branch origin/trunk

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, KAFKA-527_2015-03-19_21:32:24.patch, 
> KAFKA-527_2015-03-25_12:08:00.patch, java.hprof.no-compression.txt, 
> java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2015-03-25 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-527:
---
Attachment: KAFKA-527_2015-03-25_12:08:00.patch

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, KAFKA-527_2015-03-19_21:32:24.patch, 
> KAFKA-527_2015-03-25_12:08:00.patch, java.hprof.no-compression.txt, 
> java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: kafka system tests

2015-03-25 Thread Gwen Shapira
Thanks for summarizing! I think we are all feeling the pain here and want
to make life easier moving forward.

The framework discussion is particularly interesting - unfortunately, the
link to ducttape is broken at the moment.

Gwen

On Wed, Mar 25, 2015 at 11:46 AM, Geoffrey Anderson 
wrote:

> Hi dev list,
>
> I've been discussing the current state of system tests with Jun and others,
> and have summarized goals moving forward at:
>
> https://cwiki.apache.org/confluence/display/KAFKA/System+Test+Improvements
>
> Feedback is welcome!
>
> Thanks,
> Geoff
>


Re: Review Request 31742: Patch for KAFKA-527

2015-03-25 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/
---

(Updated March 25, 2015, 7:08 p.m.)


Review request for kafka.


Bugs: KAFKA-527
https://issues.apache.org/jira/browse/KAFKA-527


Repository: kafka


Description
---

less byte copies


Diffs (updated)
-

  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
9c694719dc9b515fb3c3ae96435a87b334044272 
  core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31742/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



kafka system tests

2015-03-25 Thread Geoffrey Anderson
Hi dev list,

I've been discussing the current state of system tests with Jun and others,
and have summarized goals moving forward at:

https://cwiki.apache.org/confluence/display/KAFKA/System+Test+Improvements

Feedback is welcome!

Thanks,
Geoff


Re: Review Request 31816: Fix for KAFKA-527

2015-03-25 Thread Guozhang Wang


> On March 21, 2015, 1:42 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/message/ByteBufferMessageSet.scala, lines 241-243
> > 
> >
> > In the normal path, this should only happen when reading the offset. So 
> > we should probably handle the EOFException there. If we get EOFException in 
> > other places, it should be treated as an error.

I think EOF can be thrown in both reading the header (offset, size) or the 
message itself, like in the old internalIterator.makeNextOuter, we check if 
remaining bytes are not sufficient and return allDone for both reading the 
header and the message content.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31816/#review77328
---


On March 6, 2015, 11:48 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31816/
> ---
> 
> (Updated March 6, 2015, 11:48 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-527
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Avoid double copying on decompress
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
> ac491b4da2583ef7227c67f5b8bc0fd731d705c3 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> 788c7864bc881b935975ab4a4e877b690e65f1f1 
>   core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
> 6f0addcea64f1e78a4de50ec8135f4d02cebd305 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
> 24deea06753e5358aa341c589ca7a7704317e29c 
> 
> Diff: https://reviews.apache.org/r/31816/diff/
> 
> 
> Testing
> ---
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 31816: Fix for KAFKA-527

2015-03-25 Thread Guozhang Wang


> On March 21, 2015, 1:42 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/message/ByteBufferMessageSet.scala, line 208
> > 
> >
> > Can we just use wrapperMessage directly?

I think we can, as we are just accessing its underlying buffer for reading 
data; it is the same pattern as we previously do in 
ByteBufferMessageSet.decompress().


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31816/#review77328
---


On March 6, 2015, 11:48 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31816/
> ---
> 
> (Updated March 6, 2015, 11:48 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-527
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Avoid double copying on decompress
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
> ac491b4da2583ef7227c67f5b8bc0fd731d705c3 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> 788c7864bc881b935975ab4a4e877b690e65f1f1 
>   core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
> 6f0addcea64f1e78a4de50ec8135f4d02cebd305 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
> 24deea06753e5358aa341c589ca7a7704317e29c 
> 
> Diff: https://reviews.apache.org/r/31816/diff/
> 
> 
> Testing
> ---
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Commented] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-25 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380433#comment-14380433
 ] 

Grant Henke commented on KAFKA-2043:


Updated reviewboard https://reviews.apache.org/r/32440/diff/
 against branch origin/trunk

> CompressionType is passed in each RecordAccumulator append
> --
>
> Key: KAFKA-2043
> URL: https://issues.apache.org/jira/browse/KAFKA-2043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Minor
> Attachments: KAFKA-2043.patch, KAFKA-2043_2015-03-25_13:28:52.patch
>
>
> Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
> append method accepts the compressionType on a per record basis. It looks 
> like the code would only work on a per batch basis because the 
> CompressionType is only used when creating a new RecordBatch. My 
> understanding is this should only support setting per batch at most. 
> public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
> value, CompressionType compression, Callback callback) throws 
> InterruptedException;
> The compression type is a producer
> level config. Instead of passing it in for each append, we probably should
> just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-03-25 Thread Grant Henke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated KAFKA-2043:
---
Attachment: KAFKA-2043_2015-03-25_13:28:52.patch

> CompressionType is passed in each RecordAccumulator append
> --
>
> Key: KAFKA-2043
> URL: https://issues.apache.org/jira/browse/KAFKA-2043
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Minor
> Attachments: KAFKA-2043.patch, KAFKA-2043_2015-03-25_13:28:52.patch
>
>
> Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
> append method accepts the compressionType on a per record basis. It looks 
> like the code would only work on a per batch basis because the 
> CompressionType is only used when creating a new RecordBatch. My 
> understanding is this should only support setting per batch at most. 
> public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
> value, CompressionType compression, Callback callback) throws 
> InterruptedException;
> The compression type is a producer
> level config. Instead of passing it in for each append, we probably should
> just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32440: Patch for KAFKA-2043

2015-03-25 Thread Grant Henke

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32440/
---

(Updated March 25, 2015, 6:29 p.m.)


Review request for kafka.


Bugs: KAFKA-2043
https://issues.apache.org/jira/browse/KAFKA-2043


Repository: kafka


Description (updated)
---

Promote compressionType to class variable


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
ab263423ff1d33170effb71acdef3fc501fa072a 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 24274a64885fadd0e9318de2beb232218ddd52cd 

Diff: https://reviews.apache.org/r/32440/diff/


Testing
---


Thanks,

Grant Henke



Plan of Controlled Shutdown

2015-03-25 Thread Mingjie Lai
Hi.

I've been trying to figure out the best way to do kafka broker rolling
restart, and read the controlled shutdown wiki page:

https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown

However I cannot find this kafka.admin.ShutdownBroker class in 0.8.2:

/usr/lib/kafka $ bin/kafka-run-class.sh kafka.admin.ShutdownBroker

Exception in thread "main" java.lang.NoClassDefFoundError:
kafka/admin/ShutdownBroker
Caused by: java.lang.ClassNotFoundException: kafka.admin.ShutdownBroker
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
Could not find the main class: kafka.admin.ShutdownBroker. Program will
exit.

Some other commands are there:

/usr/lib/kafka $ bin/kafka-topics.sh --list --zookeeper zk1:2181/kafka |
grep test

test

However, I also see some jira about the bug fixes for ControlledShutdown
like https://issues.apache.org/jira/browse/KAFKA-2029. So I kind of got
confused.

What's the plan for this feature in kafka? I still see the value of it, for
example, we can move some the leader partitions around on purpose for a
scheduled upgrade or config update, minimize the transition as less as
possible.

Thanks,
Mingjie


[jira] [Updated] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-03-25 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1660:

Attachment: KAFKA-1660_2015-03-25_10:55:42.patch

> Ability to call close() with a timeout on the Java Kafka Producer. 
> ---
>
> Key: KAFKA-1660
> URL: https://issues.apache.org/jira/browse/KAFKA-1660
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Affects Versions: 0.8.2.0
>Reporter: Andrew Stein
>Assignee: Jiangjie Qin
> Fix For: 0.8.3
>
> Attachments: KAFKA-1660.patch, KAFKA-1660.patch, 
> KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, 
> KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, 
> KAFKA-1660_2015-03-25_10:55:42.patch
>
>
> I would like the ability to call {{close}} with a timeout on the Java 
> Client's KafkaProducer.
> h6. Workaround
> Currently, it is possible to ensure that {{close}} will return quickly by 
> first doing a {{future.get(timeout)}} on the last future produced on each 
> partition, but this means that the user has to define the partitions up front 
> at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31850: Patch for KAFKA-1660

2015-03-25 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
---

(Updated March 25, 2015, 5:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
---

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
ab263423ff1d33170effb71acdef3fc501fa072a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
3df450784592b894008e7507b2737f9bb07f7bd2 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
---

Unit tests passed.


Thanks,

Jiangjie Qin



  1   2   >