[jira] [Comment Edited] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread Niranjan Nanda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724657#comment-15724657
 ] 

Niranjan Nanda edited comment on KAFKA-4489 at 12/6/16 7:15 AM:


Ismael, I did understand the issue; the point is why OOM? It's a case where 
"server running on both PLAINTEXT and SSL port, and, consumer is trying to 
connect to SSL port using PLAINTEXT protocol". Something similar to using 443 
port for a HTTP (instead of HTTPS) connection. So, it should result in either 
connection timeout or some sort of connection failure message instead of OOM. 
This is very confusing!

Per your explanation, it also looks like the socket listener does not 
understand the difference between SSL handshake packets and message packets. I 
am not sure if this is the right implementation.


was (Author: nnanda):
Ismael, I did understand the issue; the point is why OOM? It's a case where 
"server running on both PLAINTEXT and SSL port, and, consumer is trying to 
connect to SSL port using PLAINTEXT protocol". Something similar to using 443 
port for a HTTP (instead of HTTPS) connection. So, it should result in either 
connection timeout or some sort of connection failure message instead of OOM. 
This is very confusing!

> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> 
>
> Key: KAFKA-4489
> URL: https://issues.apache.org/jira/browse/KAFKA-4489
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> :9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> :9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with / 
> disconnected
> java.io.EOFException
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>

[jira] [Comment Edited] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread Niranjan Nanda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724657#comment-15724657
 ] 

Niranjan Nanda edited comment on KAFKA-4489 at 12/6/16 7:13 AM:


Ismael, I did understand the issue; the point is why OOM? It's a case where 
"server running on both PLAINTEXT and SSL port, and, consumer is trying to 
connect to SSL port using PLAINTEXT protocol". Something similar to using 443 
port for a HTTP (instead of HTTPS) connection. So, it should result in either 
connection timeout or some sort of connection failure message instead of OOM. 
This is very confusing!


was (Author: nnanda):
Ismael, I understood the issue; the point is why OOM? It's a case where "server 
running on both PLAINTEXT and SSL port, and, consumer is trying to connect to 
SSL port using PLAINTEXT protocol". Something similar to using 443 port for a 
HTTP (instead of HTTPS) connection. So, it should result in either connection 
timeout or some sort of connection failure message instead of OOM. This is very 
confusing!

> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> 
>
> Key: KAFKA-4489
> URL: https://issues.apache.org/jira/browse/KAFKA-4489
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> :9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> :9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with / 
> disconnected
> java.io.EOFException
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> 

[jira] [Commented] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread Niranjan Nanda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724657#comment-15724657
 ] 

Niranjan Nanda commented on KAFKA-4489:
---

Ismael, I understood the issue; the point is why OOM? It's a case where "server 
running on both PLAINTEXT and SSL port, and, consumer is trying to connect to 
SSL port using PLAINTEXT protocol". Something similar to using 443 port for a 
HTTP (instead of HTTPS) connection. So, it should result in either connection 
timeout or some sort of connection failure message instead of OOM. This is very 
confusing!

> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> 
>
> Key: KAFKA-4489
> URL: https://issues.apache.org/jira/browse/KAFKA-4489
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> :9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> :9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with / 
> disconnected
> java.io.EOFException
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at com.demo.consumer.Consumer.run(Consumer.java:71)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 

[jira] [Commented] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724538#comment-15724538
 ] 

Ismael Juma commented on KAFKA-4489:


You haven't told the consumer to use SSL for the security protocol so it 
assumes PLAINTEXT. That causes an OOM because the client misinterprets the 
meaning of a SSL negotiation packet (it thinks it's the message size).

> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> 
>
> Key: KAFKA-4489
> URL: https://issues.apache.org/jira/browse/KAFKA-4489
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> :9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> :9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with / 
> disconnected
> java.io.EOFException
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at com.demo.consumer.Consumer.run(Consumer.java:71)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> 

[jira] [Comment Edited] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread Niranjan Nanda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724308#comment-15724308
 ] 

Niranjan Nanda edited comment on KAFKA-4489 at 12/6/16 4:38 AM:


JDK options used:
- Version: 1.8
- Heap config: -Xmx4g
- GC config: default for JDK 8 (no explicit config for GC).

Kafka configs:
- one topic
- 20 partitions
- 20 KafkaConsumer instances in pool (to poll 20 partitions)

We have been using this same configuration for last 3 months and never faced 
this issue. The only change we made recently is to use SSL for Kafka and thats 
when we got this issue.



was (Author: nnanda):
JDK options used:
- Version: 1.8
- Heap config: -Xmx4g
- GC config: default for JDK 8 (no explicit config for GC).

About KafkaConsumer instances, we are creating 20 in our pool because there are 
20 partitions in our topic. We have been using this same configuration for last 
3 months and never faced this issue. The only change we made recently is to use 
SSL for Kafka and thats when we got this issue.


> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> 
>
> Key: KAFKA-4489
> URL: https://issues.apache.org/jira/browse/KAFKA-4489
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> :9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> :9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with / 
> disconnected
> java.io.EOFException
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> 

[jira] [Commented] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread Niranjan Nanda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724308#comment-15724308
 ] 

Niranjan Nanda commented on KAFKA-4489:
---

JDK options used:
- Version: 1.8
- Heap config: -Xmx4g
- GC config: default for JDK 8 (no explicit config for GC).

About KafkaConsumer instances, we are creating 20 in our pool because there are 
20 partitions in our topic. We have been using this same configuration for last 
3 months and never faced this issue. The only change we made recently is to use 
SSL for Kafka and thats when we got this issue.


> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> 
>
> Key: KAFKA-4489
> URL: https://issues.apache.org/jira/browse/KAFKA-4489
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> :9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> :9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with / 
> disconnected
> java.io.EOFException
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at com.demo.consumer.Consumer.run(Consumer.java:71)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   

[jira] [Commented] (KAFKA-2526) Console Producer / Consumer's serde config is not working

2016-12-05 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724184#comment-15724184
 ] 

huxi commented on KAFKA-2526:
-

For the new producer, serde config is hard coded as 

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")

> Console Producer / Consumer's serde config is not working
> -
>
> Key: KAFKA-2526
> URL: https://issues.apache.org/jira/browse/KAFKA-2526
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>  Labels: newbie
> Fix For: 0.10.2.0
>
>
> Although in the console producer one can specify the key value serializer, 
> they are actually not used since 1) it always serialize the input string as 
> String.getBytes (hence always pre-assume the string serializer) and 2) it is 
> actually only passed into the old producer. The same issues exist in console 
> consumer.
> In addition the configs in the console producer is messy: we have 1) some 
> config values exposed as cmd parameters, and 2) some config values in 
> --producer-property and 3) some in --property.
> It will be great to clean the configs up in both console producer and 
> consumer, and put them into a single --property parameter which could 
> possibly take a file to reading in property values as well, and only leave 
> --new-producer as the other command line parameter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1731

2016-12-05 Thread Apache Jenkins Server
See 

Changes:

[me] HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.

--
[...truncated 12555 lines...]
:streams:checkstyleMain
:streams:compileTestJavaNote: Some input files use unchecked or unsafe 
operations.
Note: Recompile with -Xlint:unchecked for details.

:streams:processTestResources
:streams:testClasses
:streams:checkstyleTest
:streams:test

org.apache.kafka.streams.KeyValueTest > shouldHaveSaneEqualsAndHashCode STARTED

org.apache.kafka.streams.KeyValueTest > shouldHaveSaneEqualsAndHashCode PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testNoMessagesSentExceptionFromOverlappingPatterns STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testNoMessagesSentExceptionFromOverlappingPatterns PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[0] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[0] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[1] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[1] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[2] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[2] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[3] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[3] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[4] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[4] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[5] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[5] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[6] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[6] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[7] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[7] PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[8] STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
KTableKTableJoin[8] PASSED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactTopicsForStateChangelogs STARTED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactTopicsForStateChangelogs PASSED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldUseCompactAndDeleteForWindowStoreChangelogs STARTED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldUseCompactAndDeleteForWindowStoreChangelogs PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] PASSED


[jira] [Assigned] (KAFKA-4480) kafka-configs will execute the removal of an invalid property and not report an error

2016-12-05 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4480:
--

Assignee: Vahid Hashemian

> kafka-configs will execute the removal of an invalid property and not report 
> an error
> -
>
> Key: KAFKA-4480
> URL: https://issues.apache.org/jira/browse/KAFKA-4480
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 0.10.0.0
> Environment: CentOS Linux release 7.2.1511 (Core)
> java version "1.8.0_102"
>Reporter: Justin
>Assignee: Vahid Hashemian
>
> Problem:
> kafka-configs will execute the removal of an invalid property and not report 
> an error
> Steps to Reproduce:
> 1. Add a config property to a topic:
> kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name 
> test1 --alter --add-config max.message.bytes=128000
> 2. Confirm config is present:
> kafka-topics --zookeeper localhost:2181 --describe --topic test1
> Topic:test1 PartitionCount:1 ReplicationFactor:1 
> Configs:max.message.bytes=128000
>  Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
> 3. Remove config:
> kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name 
> test1 --alter --delete-config  max.message.bytes=128000
> Updated config for topic: "test1".
> 4. Config is still present and no error is thrown:
> kafka-topics --zookeeper localhost:2181 --describe --topic test1
> Topic:test1 PartitionCount:1 ReplicationFactor:1 
> Configs:max.message.bytes=128000
>  Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
> This is due to the "=128000" in the removal statement.
> Impact:
> 1. We would expect there to be an error statement if an invalid property is 
> specified for removal.  This can lead to unforeseen consequences in heavily 
> automated environments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4471) KafkaConsumer unpauses partitions after subscribe()

2016-12-05 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724068#comment-15724068
 ] 

Vahid Hashemian commented on KAFKA-4471:


[~hachikuji] Thanks for the clarification.
[~salaev] I'll close this JIRA based on [~hachikuji]'s explanation. Please 
advise if you disagree.

> KafkaConsumer unpauses partitions after subscribe()
> ---
>
> Key: KAFKA-4471
> URL: https://issues.apache.org/jira/browse/KAFKA-4471
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Sergey Alaev
>Assignee: Vahid Hashemian
>
> How to reproduce:
> 1. initialize KafkaConsumer and subscribe to some topics by calling `void 
> subscribe(Collection topics)`
> 2. pause some subscribed partitions
> 3. call `void subscribe(Collection topics)` with one more topic
> 4. paused partitions will unpause.
> We are using 3-node Kafka cluster, server version = client version = 
> 0.10.0.0., one partition per topic
> Note:
> There was another problem with 0.9.x - that client did not purge unsubscribed 
> queues if they are paused. Probably that got 'fixed'.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4471) KafkaConsumer unpauses partitions after subscribe()

2016-12-05 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-4471.

Resolution: Not A Bug

> KafkaConsumer unpauses partitions after subscribe()
> ---
>
> Key: KAFKA-4471
> URL: https://issues.apache.org/jira/browse/KAFKA-4471
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Sergey Alaev
>Assignee: Vahid Hashemian
>
> How to reproduce:
> 1. initialize KafkaConsumer and subscribe to some topics by calling `void 
> subscribe(Collection topics)`
> 2. pause some subscribed partitions
> 3. call `void subscribe(Collection topics)` with one more topic
> 4. paused partitions will unpause.
> We are using 3-node Kafka cluster, server version = client version = 
> 0.10.0.0., one partition per topic
> Note:
> There was another problem with 0.9.x - that client did not purge unsubscribed 
> queues if they are paused. Probably that got 'fixed'.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2016-12-05 Thread radai
+1 (non-binding).

small nit pick - just because you returned a response to user doesnt mean
the memory id no longer used. for some cases the actual "point of
termination" may be the deserializer (really impl-dependant), but
generally, wouldnt it be "nice" to have an explicit dispose() call on
responses (with the addition that getting the next batch of data from a
consumer automatically disposes the previous results)

On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar  wrote:

> +1 (non binding)
> --
> Edoardo Comar
> IBM MessageHub
> eco...@uk.ibm.com
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> IBM United Kingdom Limited Registered in England and Wales with number
> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
>
>
>
> From:   Mickael Maison 
> To: dev@kafka.apache.org
> Date:   05/12/2016 14:38
> Subject:[VOTE] KIP-81: Bound Fetch memory usage in the consumer
>
>
>
> Hi all,
>
> I'd like to start the vote for KIP-81:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>
>
> Thank you
>
>
>
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-05 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724000#comment-15724000
 ] 

Jun Rao commented on KAFKA-4454:


[~mgharat], I understand the problem that you described. However, I am not sure 
about the approach that you took. 

The reason that KafkaPrincipal only has a simple string name is that it makes 
defining ACL rules through the default AclCommand easy. If you include 
channelPrincipal in KafkaPrincipal, ideally you want to include 
channelPrincipal when doing equality test between two KafkaPrincipals (It's 
kind of hacky to only do equality test on 2 of the 3 fields). This means that 
AclCommand needs to specify channelPrincipal as well and I am not sure how to 
do that.

It seems that LinkedIn uses a customized authorization module for both defining 
and verifying ACL rules (instead of the default AclCommand and 
SimpleAclAuthorizer) and wants more context for the customized authorization to 
use. Perhaps, we could extend the Session object with channelPrincipal instead. 
It would be good to think through if there is any other extension that we may 
want too. In any case, since this affects the user facing authorization module, 
perhaps we should do a KIP so that more people are aware of the changes.

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723994#comment-15723994
 ] 

huxi commented on KAFKA-4489:
-

Seems a thread pool holding multiple KafkaConsumer instances was spawned to 
consume messages in parallel. How many KafkaConsumer instance did you create in 
the pool? Besides, could you give more information about the JVM you used and a 
heap dump file might be a greater help.

For a quick check, check JVM version(7 or 8), gc collector (throughput 
collector, CMS or G1) and gc logs to see what's happening during the heap 
exhausting.

> Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections
> 
>
> Key: KAFKA-4489
> URL: https://issues.apache.org/jira/browse/KAFKA-4489
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Niranjan Nanda
>
> Configured Kafka brokers on SSL. At consumer side, I configured following 
> properties:
> kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "host1:9093,host2:9093,host3:9093");
> kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
> kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");
> On starting, I could following exceptions in the kafka log
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
> :9093.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Sending metadata request 
> {topics=[my_topic]} to node -1
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
> org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
> disconnected
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 
> for sending metadata request
> 2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
> org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
> :9093.
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
> auto-commit offsets for group my_group1 since the coordinator is unknown
> 2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
> org.apache.kafka.common.network.Selector# - Connection with / 
> disconnected
> java.io.EOFException
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at com.demo.consumer.Consumer.run(Consumer.java:71)
> at 

[VOTE] KIP-97: Improved Kafka Client RPC Compatibility Policy

2016-12-05 Thread Colin McCabe
Hi all,

I'd like to start voting on KIP-97
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-97%3A+Improved+Kafka+Client+RPC+Compatibility+Policy
).

The discussion thread can be found here: 
https://www.mail-archive.com/dev@kafka.apache.org/msg60955.html

Thanks for your feedback.
 
best,
Colin McCabe


[jira] [Created] (KAFKA-4489) Kafka Consumer throws Java Heap Out of Space Error on failed SSL connections

2016-12-05 Thread Niranjan Nanda (JIRA)
Niranjan Nanda created KAFKA-4489:
-

 Summary: Kafka Consumer throws Java Heap Out of Space Error on 
failed SSL connections
 Key: KAFKA-4489
 URL: https://issues.apache.org/jira/browse/KAFKA-4489
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.0.0
Reporter: Niranjan Nanda


Configured Kafka brokers on SSL. At consumer side, I configured following 
properties:

kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"host1:9093,host2:9093,host3:9093");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group1");
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "host_name_of_machine");

On starting, I could following exceptions in the kafka log

2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
org.apache.kafka.clients.NetworkClient# - Initialize connection to node -3 for 
sending metadata request
2016-12-05 22:44:23.277 [Thread: pool-2-thread-4] DEBUG 
org.apache.kafka.clients.NetworkClient# - Initiating connection to node -3 at 
:9093.
2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] WARN 
org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
disconnected
2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
org.apache.kafka.clients.NetworkClient# - Node -2 disconnected.
2016-12-05 22:44:23.277 [Thread: pool-2-thread-10] DEBUG 
org.apache.kafka.clients.NetworkClient# - Sending metadata request 
{topics=[my_topic]} to node -1
2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] WARN 
org.apache.kafka.clients.NetworkClient# - Bootstrap broker :9093 
disconnected
2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
org.apache.kafka.clients.NetworkClient# - Initialize connection to node -1 for 
sending metadata request
2016-12-05 22:44:23.277 [Thread: pool-2-thread-6] DEBUG 
org.apache.kafka.clients.NetworkClient# - Initiating connection to node -1 at 
:9093.
2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
auto-commit offsets for group my_group1 since the coordinator is unknown
2016-12-05 22:44:23.547 [Thread: pool-2-thread-14] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator# - Cannot 
auto-commit offsets for group my_group1 since the coordinator is unknown
2016-12-05 22:44:23.547 [Thread: pool-2-thread-12] DEBUG 
org.apache.kafka.common.network.Selector# - Connection with / 
disconnected
java.io.EOFException
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at com.demo.consumer.Consumer.run(Consumer.java:71)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


And, following stack traces are present in my app log

java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at 

Re: [DISCUSS] KIP-95: Incremental Batch Processing for Kafka Streams

2016-12-05 Thread Jay Kreps
I'd like to second the discouragement of adding a new topic per job. We
went down this path in Samza and I think the result was quite a mess. You
had to read the full topic every time a job started and so it added a lot
of overhead and polluted the topic space.

What if we did the following:

   1. Use timestamp instead of offset
   2. Store the "stopping timestamp" in the metadata field associated with
   the existing offset storage mechanism
   3. Don't worry about fully processing the entire DAG. After all,
   partially processing a tuple isn't much different from not processing it,
   and in any case the stopping point is a heuristic so no point in being
   overly precise here.

Probably I'm missing something, though, I haven't thought through the
implications of using time instead of offset.

-Jay

On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax 
wrote:

> Hi all,
>
> I want to start a discussion about KIP-95:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>
> Looking forward to your feedback.
>
>
> -Matthias
>
>
>


Build failed in Jenkins: kafka-trunk-jdk8 #1080

2016-12-05 Thread Apache Jenkins Server
See 

Changes:

[me] HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.

--
[...truncated 3899 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > 

Re: [DISCUSS] KIP-97: Improved Kafka Client RPC Compatibility Policy

2016-12-05 Thread Colin McCabe
On Fri, Dec 2, 2016, at 13:41, Ashish Singh wrote:
> Colin,
> 
> I just rebased KAFKA-3600's PR on trunk.
> 

Thanks, Ashish!

> KAFKA-4457 is a good idea, however it is probably doing some redundant
> work. AdminClient has a network client that will already have api
> versions
> info (after KAFKA-3600 goes in), so I do not think we need to send out
> another ApiVersions request over wire.
> 

That's a good point.  As long as we can talk to all brokers.  Let's
continue the discussion on Jira.

I'll start a vote thread for KIP-97.

Best,
Colin

> On Wed, Nov 30, 2016 at 5:33 PM, Colin McCabe  wrote:
> 
> > Hi all,
> >
> > I updated the KIP to include a command to print out the version
> > information of brokers (KAFKA-4457).  This will be a useful command for
> > administrators.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Nov 30, 2016, at 11:17, Colin McCabe wrote:
> > > Thanks, Ashish.  I think the idea of having the client make an
> > > ApiVersionRequest call when it starts up is a good one.  This idea is
> > > described in both KIP-97, and the KAFKA-3600 patches.  I also think we
> > > ought to maintain per-node version information.  It would be good to get
> > > that in so that we can use it as a building block for the stuff
> > > described in this KIP.  I'd be happy to review it.
> > >
> > > Colin
> > >
> > >
> > > On Wed, Nov 30, 2016, at 10:40, Ashish Singh wrote:
> > > > Hello Ismael,
> > > >
> > > > It is good to know that you are willing to review KAFKA-3600 again. As
> > > > before, we at Cloudera are highly in support of client compatibility,
> > and
> > > > KAFKA-3600 has always been a building block for that. Now that client
> > > > compatibility is at forefront again, thanks to Colin, I will be happy
> > to
> > > > rebase KAFKA-3600 on trunk. However, any comments/ feedback on
> > > > KAFKA-3600's
> > > > approach on maintaining api version info for each connected broker is
> > > > highly welcome.
> > > >
> > > > On Wed, Nov 30, 2016 at 5:47 AM, Ismael Juma 
> > wrote:
> > > >
> > > > > Thanks Colin, I think this is a good improvement.
> > > > >
> > > > > Ashish, some of the concerns with regards to KAFKA-3600 were related
> > to the
> > > > > cost versus benefit. Once one adds client compatibility, the benefit
> > is
> > > > > much higher. I would be happy to review and merge KAFKA-3600 if we
> > think it
> > > > > serves as a good first step towards client compatibility (if the
> > vote for
> > > > > this passes). Colin, maybe you can review the PR for KAFKA-3600 and
> > see if
> > > > > you can build on that? Ashish, it may be worth merging trunk into
> > your
> > > > > branch and fixing the conflicts.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Tue, Nov 29, 2016 at 7:39 PM, Ashish Singh 
> > wrote:
> > > > >
> > > > > > Hello Colin,
> > > > > >
> > > > > > In the KIP you mentioned that currently the client uses supported
> > api
> > > > > > versions information to check if the server supports its desired
> > > > > versions.
> > > > > > Not sure, if that is true. I had put together a PR for KAFKA-3600,
> > to do
> > > > > > that, but it never went in. Also, I could not find how you plan to
> > > > > perform
> > > > > > version check on client side. In KAFKA-3600, I am maintaining api
> > version
> > > > > > for each live connection, and that made a few folks think it is
> > too big
> > > > > of
> > > > > > a change.
> > > > > >
> > > > > > On Tue, Nov 29, 2016 at 11:05 AM, Colin McCabe  > >
> > > > > wrote:
> > > > > >
> > > > > > > Sorry, that link should be:
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > 97%3A+Improved+Kafka+Client+RPC+Compatibility+Policy
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Nov 29, 2016, at 11:04, Colin McCabe wrote:
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I've been thinking about a KIP to improve the Kafka client's
> > > > > > > > compatibility policy.  If you're interested, please check out:
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > 97%3A+Improved+Kafka+Compatibility+Policy
> > > > > > > >
> > > > > > > > cheers,
> > > > > > > > Colin
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Regards,
> > > > > > Ashish
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Regards,
> > > > Ashish
> >
> 
> 
> 
> -- 
> 
> Regards,
> Ashish


[GitHub] kafka pull request #2211: HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.

2016-12-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2211


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4486) Kafka Streams - exception in process still commits offsets

2016-12-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723370#comment-15723370
 ] 

Guozhang Wang commented on KAFKA-4486:
--

The distinguishment is not in Streams, but in the embedded Producer / Consumer 
clients. Anyways, I think your raised issue is indeed a bug that we should fix: 
if shutdown is called through the raised exception, commit offsets step should 
not be triggered, though other steps like flushing the store, releasing state 
store dir lock should still complete.

We will start to propose a fix asap on this JIRA.

> Kafka Streams - exception in process still commits offsets
> --
>
> Key: KAFKA-4486
> URL: https://issues.apache.org/jira/browse/KAFKA-4486
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Java 8
>Reporter: Joel Lundell
>
> I'm building a streams application and would like to be able to control the 
> commits manually using ProcessorContext#commit() from an instance of 
> org.apache.kafka.streams.processor.Processor.
> My use case is that I want to read messages from a topic and push them to AWS 
> SQS and I need to be able to guarantee that all messages reach the queue at 
> least once. I also want to use SQS batching support so my approach at the 
> moment is that in Processor#process i'm saving X records in a data structure 
> and when I have a full batch I send it off and if successful i commit. If I 
> for any reason can't deliver the records I don't want the offsets being 
> committed so that when processing works again I can start processing from the 
> last successful record.
> When I was trying out the error handling I noticed that if I create a 
> Processor and in the process method always throw an exception that will 
> trigger StreamThread#shutdownTaskAndState which calls 
> StreamThread#commitOffsets and next time I run the application it starts as 
> if the previous "record" was successfully processed.
> Is there a way to achieve what I'm looking for?
> I found a similar discussion in 
> https://issues.apache.org/jira/browse/KAFKA-3491 but that issue is still open.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1730

2016-12-05 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without 
any

--
[...truncated 14446 lines...]
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCount PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceWindowed PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregateWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregateWindowed PASSED

org.apache.kafka.streams.integration.FanoutIntegrationTest > 
shouldFanoutTheInput[0] STARTED

org.apache.kafka.streams.integration.FanoutIntegrationTest > 
shouldFanoutTheInput[0] PASSED

org.apache.kafka.streams.integration.FanoutIntegrationTest > 
shouldFanoutTheInput[1] STARTED

org.apache.kafka.streams.integration.FanoutIntegrationTest > 
shouldFanoutTheInput[1] PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testNoMessagesSentExceptionFromOverlappingPatterns STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testNoMessagesSentExceptionFromOverlappingPatterns PASSED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactTopicsForStateChangelogs STARTED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldCompactTopicsForStateChangelogs PASSED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldUseCompactAndDeleteForWindowStoreChangelogs STARTED

org.apache.kafka.streams.integration.InternalTopicIntegrationTest > 
shouldUseCompactAndDeleteForWindowStoreChangelogs PASSED

org.apache.kafka.streams.integration.KStreamRepartitionJoinTest > 
shouldCorrectlyRepartitionOnJoinOperations[0] STARTED

org.apache.kafka.streams.integration.KStreamRepartitionJoinTest > 
shouldCorrectlyRepartitionOnJoinOperations[0] PASSED

org.apache.kafka.streams.integration.KStreamRepartitionJoinTest > 
shouldCorrectlyRepartitionOnJoinOperations[1] STARTED

org.apache.kafka.streams.integration.KStreamRepartitionJoinTest > 
shouldCorrectlyRepartitionOnJoinOperations[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 

[jira] [Updated] (KAFKA-3994) Deadlock between consumer heartbeat expiration and offset commit.

2016-12-05 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3994:
-
   Resolution: Fixed
Fix Version/s: 0.10.2.0
   Status: Resolved  (was: Patch Available)

> Deadlock between consumer heartbeat expiration and offset commit.
> -
>
> Key: KAFKA-3994
> URL: https://issues.apache.org/jira/browse/KAFKA-3994
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1, 0.10.2.0
>
>
> I got the following stacktraces from ConsumerBounceTest
> {code}
> ...
> "Test worker" #12 prio=5 os_prio=0 tid=0x7fbb28b7f000 nid=0x427c runnable 
> [0x7fbb06445000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x0003d48bcbc0> (a sun.nio.ch.Util$2)
> - locked <0x0003d48bcbb0> (a 
> java.util.Collections$UnmodifiableSet)
> - locked <0x0003d48bbd28> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:454)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:411)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1054)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:103)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
> at 
> 

Build failed in Jenkins: kafka-trunk-jdk8 #1079

2016-12-05 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without 
any

--
[...truncated 7994 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED


[GitHub] kafka pull request #2195: KAFKA-3994: Fix deadlock in Watchers by calling tr...

2016-12-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2195


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3994) Deadlock between consumer heartbeat expiration and offset commit.

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15723075#comment-15723075
 ] 

ASF GitHub Bot commented on KAFKA-3994:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2195


> Deadlock between consumer heartbeat expiration and offset commit.
> -
>
> Key: KAFKA-3994
> URL: https://issues.apache.org/jira/browse/KAFKA-3994
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
>
> I got the following stacktraces from ConsumerBounceTest
> {code}
> ...
> "Test worker" #12 prio=5 os_prio=0 tid=0x7fbb28b7f000 nid=0x427c runnable 
> [0x7fbb06445000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x0003d48bcbc0> (a sun.nio.ch.Util$2)
> - locked <0x0003d48bcbb0> (a 
> java.util.Collections$UnmodifiableSet)
> - locked <0x0003d48bbd28> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:454)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:411)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1054)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:103)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
> at 
> 

[jira] [Updated] (KAFKA-4485) Follower should be in the isr if its FetchRequest has fetched up to the logEndOffset of leader

2016-12-05 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4485:

Description: 
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= max(high 
watermark of partition, log end offset of leader at the time the leader 
receives the previous FetchRequest). The follower should be removed from ISR if 
this criteria is not met for more than replicaLagTimeMaxMs. Note that we are 
comparing begin offset of FetchRequest with log end offset of leader at the 
time the leader receives the previous FetchRequest as an approximate way to 
compare the end offset of fetched data with log end offset of leader. This is 
because we can not easily know the end offset of fetched data at the time 
broker receives fetch request.

This solution makes the following guarantee:

1) If a follower is in ISR, then its log end offset >= high watermark of 
partition at least sometime in the last replicaLagTimeMaxMs.

2) If a follower is not in ISR, then the end offset of its FetchRequest can not 
catch up with log end offset of leader for more than replicaLagTimeMaxMs. 
Either follower is in bootstrap phase, or the follower's average fetch rate is 
smaller than average produce rate into the partition for the last 
replicaLagTimeMaxMs.








  was:
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request but low byte rate, the fetch request is always able to read all 
the available data at the time leader receives it. However, the begin offset of 
fetch request will always be smaller than logEndOffset of leader. Thus the 
follower will be removed from ISR.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= max(high 
watermark of partition, log end offset of leader at the time the leader 
receives the previous FetchRequest). The follower should be removed from ISR if 
this criteria is not met for more than replicaLagTimeMaxMs.

This solution makes the following guarantee:

1) If a follower is in ISR, then its log end offset >= high watermark of 
partition at least sometime in the last replicaLagTimeMaxMs.

2) If a follower is not in ISR, then the end offset of its FetchRequest can not 
catch up with log end offset of leader for more than replicaLagTimeMaxMs. 
Either follower is in bootstrap phase, or the follower's average fetch rate < 
produce rate into the partition for more than replicaLagTimeMaxMs.









> Follower should be in the isr if its FetchRequest has fetched up to the 
> logEndOffset of leader
> --
>
> Key: KAFKA-4485
> URL: https://issues.apache.org/jira/browse/KAFKA-4485
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> As of current implementation, we will exclude follower from ISR if the begin 
> offset of FetchRequest from this follower is always smaller than logEndOffset 
> of leader for more than replicaLagTimeMaxMs.
> Also, we will 

[VOTE] KIP-88: OffsetFetch Protocol Update

2016-12-05 Thread Vahid S Hashemian
Happy Monday,

I'd like to start voting on KIP-88 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update
).
The discussion thread can be found here: 
https://www.mail-archive.com/dev@kafka.apache.org/msg59608.html

Thank you for your feedback.
 
Regards,
--Vahid



Re: [DISCUSS] KIP-95: Incremental Batch Processing for Kafka Streams

2016-12-05 Thread Matthias J. Sax
1) The change to consume the metadata topic by all instances should not
be big. We want to leverage the "restore consumer" that does manual
partitions assignments already.

2) I understand your concern about adding the metadata topic... For a
single instance with one thread, it would be easy to go in-memory -- for
multi-threaded single instance, it would also be possible, even if we
need to do thread synchronization. However, going in-memory solves only
the issue with intermediate topics -- not the "moving EOL" is case of
failure. IMHO, having fixed "stop offsets" is already worth to save it
reliable (for single instance, we could go to disk though).

So, strictly speaking we do not need to persists the data in a topic, we
could also implement our own network broadcast (the required host
information is already there via IQ feature). Thus, the leader would
send the information via an extra network connection and all instances
save it to disk. For intermediate topics, all running instances must
broadcast the "stop offsets" to all other during runtime, too. Overall,
I think this would not simplify the solution compared to using a topic.

However, I cannot follow here:

> I understand the need if we have an application with multiple instances on 
> different servers
> (but I still don't think we need to handle that case).

Why do you think we do not need to handle the distributed case? Isn't
this the most relevant one? I would assume that only a minority of
application will be single instance (even in a single instance might be
good enough from a performance point of view, I guess people tend to
start at least two for fault-tolerance).



-Matthias



On 12/4/16 2:45 AM, Eno Thereska wrote:
> A couple of remaining questions:
> 
> - it says in the KIP: "because the metadata topic must be consumed by all 
> instances, we need to assign the topic’s partitions manually and do not 
> commit offsets -- we also need to seekToBeginning() each time we consume the 
> metadata topic)" . How big of a change is allowing a topic to be consumed by 
> all instances?
> 
> - for the case when we have a single instance, with intermediate topics etc, 
> could we keep the data that we want to persist in the metadata topic in 
> memory instead? What is the advantage of persisting this data? I understand 
> the need if we have an application with multiple instances on different 
> servers (but I still don't think we need to handle that case). Is there a 
> need to persist data for a single instance? It would help if we enumerate the 
> exact failure scenarios and how persisting the data helps. So I think you 
> convinced me that this metadata is useful in the previous email, now I'm 
> asking if it needs persisting.
> 
> I'm really trying to avoid having the metadata topic. It's one more topic 
> that needs to be kept around and maintained carefully with all failure cases 
> considered. With EoS around the corner introducing its own internal topic(s), 
> and atomicity when writing to multiple topics, in my mind there is real value 
> if we can have a solution without an extra topic for now.
> 
> 
> Thanks
> Eno
> 
> 
> 
>> On 28 Nov 2016, at 18:47, Matthias J. Sax  wrote:
>>
>> Hi all,
>>
>> I want to start a discussion about KIP-95:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>
>> Looking forward to your feedback.
>>
>>
>> -Matthias
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Commented] (KAFKA-4488) UnsupportedOperationException during initialization of StandbyTask

2016-12-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722759#comment-15722759
 ] 

ASF GitHub Bot commented on KAFKA-4488:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2212

KAFKA-4488: UnsupportedOperationException during initialization of 
StandbyTask

Instead of throwing `UnsupportedOperationException` from 
`StandbyTask.recordCollector()` return a No-op implementation of 
`RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka standby-task

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2212


commit e2aec74032b3f0312a9969c9a97beaf8919eaafe
Author: Damian Guy 
Date:   2016-12-05T17:07:44Z

provide a no-op record collector in StandbyContext to avoid 
UnsupportedOperationException




> UnsupportedOperationException during initialization of StandbyTask
> --
>
> Key: KAFKA-4488
> URL: https://issues.apache.org/jira/browse/KAFKA-4488
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.1
>
>
> When initializing StandbyTasks an `UnsupportedOperationException` is thrown 
> from `StandbyContext.recordCollector()`.
> This occurs when `StateStore.init()` is called and logging is enabled. In 
> these cases, we attempt to create a `StoreChangeLogger` that in-turn attempts 
> to get the `RecordCollector`



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2212: KAFKA-4488: UnsupportedOperationException during i...

2016-12-05 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2212

KAFKA-4488: UnsupportedOperationException during initialization of 
StandbyTask

Instead of throwing `UnsupportedOperationException` from 
`StandbyTask.recordCollector()` return a No-op implementation of 
`RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka standby-task

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2212


commit e2aec74032b3f0312a9969c9a97beaf8919eaafe
Author: Damian Guy 
Date:   2016-12-05T17:07:44Z

provide a no-op record collector in StandbyContext to avoid 
UnsupportedOperationException




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Work started] (KAFKA-4488) UnsupportedOperationException during initialization of StandbyTask

2016-12-05 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4488 started by Damian Guy.
-
> UnsupportedOperationException during initialization of StandbyTask
> --
>
> Key: KAFKA-4488
> URL: https://issues.apache.org/jira/browse/KAFKA-4488
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.1
>
>
> When initializing StandbyTasks an `UnsupportedOperationException` is thrown 
> from `StandbyContext.recordCollector()`.
> This occurs when `StateStore.init()` is called and logging is enabled. In 
> these cases, we attempt to create a `StoreChangeLogger` that in-turn attempts 
> to get the `RecordCollector`



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4488) UnsupportedOperationException during initialization of StandbyTask

2016-12-05 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4488:
-

 Summary: UnsupportedOperationException during initialization of 
StandbyTask
 Key: KAFKA-4488
 URL: https://issues.apache.org/jira/browse/KAFKA-4488
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0
Reporter: Damian Guy
Assignee: Damian Guy
 Fix For: 0.10.1.1


When initializing StandbyTasks an `UnsupportedOperationException` is thrown 
from `StandbyContext.recordCollector()`.
This occurs when `StateStore.init()` is called and logging is enabled. In these 
cases, we attempt to create a `StoreChangeLogger` that in-turn attempts to get 
the `RecordCollector`



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-12-05 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-3811.
-
Resolution: Duplicate

https://issues.apache.org/jira/browse/KAFKA-3715

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Eno Thereska
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-12-05 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722752#comment-15722752
 ] 

Eno Thereska commented on KAFKA-3811:
-

This JIRA will be resolved as part of 
https://issues.apache.org/jira/browse/KAFKA-3715

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Eno Thereska
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4486) Kafka Streams - exception in process still commits offsets

2016-12-05 Thread Joel Lundell (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722715#comment-15722715
 ] 

Joel Lundell edited comment on KAFKA-4486 at 12/5/16 4:48 PM:
--

I debugged a little more and the exception escapes the run loop in StreamThread 
but independent on the type of exception they end up in the same finally clause 
and the thread shuts down. Can you point to where it tries to distinguish 
retriable from fatal? 

{code:java}
/**
 * Execute the stream processors
 * @throws KafkaException for any Kafka-related exceptions
 * @throws Exception for any other non-Kafka exceptions
 */
@Override
public void run() {
log.info("{} Starting", logPrefix);

try {
runLoop();
} catch (KafkaException e) {
// just re-throw the exception as it should be logged already
throw e;
} catch (Exception e) {
// we have caught all Kafka related exceptions, and other runtime 
exceptions
// should be due to user application errors
log.error("{} Streams application error during processing: ", 
logPrefix, e);
throw e;
} finally {
shutdown();
}
}
{code}


was (Author: joel.lundell):
I debugged a little more and the exception escapes the run loop in StreamThread 
but independent on the type of exception they end up in the same finally clause 
and the thread shuts down. Can you point to where it tries to distinguish 
retriable from fatal? 

/**
 * Execute the stream processors
 * @throws KafkaException for any Kafka-related exceptions
 * @throws Exception for any other non-Kafka exceptions
 */
@Override
public void run() {
log.info("{} Starting", logPrefix);

try {
runLoop();
} catch (KafkaException e) {
// just re-throw the exception as it should be logged already
throw e;
} catch (Exception e) {
// we have caught all Kafka related exceptions, and other runtime 
exceptions
// should be due to user application errors
log.error("{} Streams application error during processing: ", 
logPrefix, e);
throw e;
} finally {
shutdown();
}
}

> Kafka Streams - exception in process still commits offsets
> --
>
> Key: KAFKA-4486
> URL: https://issues.apache.org/jira/browse/KAFKA-4486
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Java 8
>Reporter: Joel Lundell
>
> I'm building a streams application and would like to be able to control the 
> commits manually using ProcessorContext#commit() from an instance of 
> org.apache.kafka.streams.processor.Processor.
> My use case is that I want to read messages from a topic and push them to AWS 
> SQS and I need to be able to guarantee that all messages reach the queue at 
> least once. I also want to use SQS batching support so my approach at the 
> moment is that in Processor#process i'm saving X records in a data structure 
> and when I have a full batch I send it off and if successful i commit. If I 
> for any reason can't deliver the records I don't want the offsets being 
> committed so that when processing works again I can start processing from the 
> last successful record.
> When I was trying out the error handling I noticed that if I create a 
> Processor and in the process method always throw an exception that will 
> trigger StreamThread#shutdownTaskAndState which calls 
> StreamThread#commitOffsets and next time I run the application it starts as 
> if the previous "record" was successfully processed.
> Is there a way to achieve what I'm looking for?
> I found a similar discussion in 
> https://issues.apache.org/jira/browse/KAFKA-3491 but that issue is still open.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4486) Kafka Streams - exception in process still commits offsets

2016-12-05 Thread Joel Lundell (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722633#comment-15722633
 ] 

Joel Lundell commented on KAFKA-4486:
-

[~guozhang], I'm throwing a org.apache.kafka.streams.errors.StreamsException. 

Let me know if I can help out in any way. 

Thank you, Joel.

> Kafka Streams - exception in process still commits offsets
> --
>
> Key: KAFKA-4486
> URL: https://issues.apache.org/jira/browse/KAFKA-4486
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Java 8
>Reporter: Joel Lundell
>
> I'm building a streams application and would like to be able to control the 
> commits manually using ProcessorContext#commit() from an instance of 
> org.apache.kafka.streams.processor.Processor.
> My use case is that I want to read messages from a topic and push them to AWS 
> SQS and I need to be able to guarantee that all messages reach the queue at 
> least once. I also want to use SQS batching support so my approach at the 
> moment is that in Processor#process i'm saving X records in a data structure 
> and when I have a full batch I send it off and if successful i commit. If I 
> for any reason can't deliver the records I don't want the offsets being 
> committed so that when processing works again I can start processing from the 
> last successful record.
> When I was trying out the error handling I noticed that if I create a 
> Processor and in the process method always throw an exception that will 
> trigger StreamThread#shutdownTaskAndState which calls 
> StreamThread#commitOffsets and next time I run the application it starts as 
> if the previous "record" was successfully processed.
> Is there a way to achieve what I'm looking for?
> I found a similar discussion in 
> https://issues.apache.org/jira/browse/KAFKA-3491 but that issue is still open.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-12-05 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3811:
---

Assignee: Eno Thereska  (was: aarti gupta)

> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Eno Thereska
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3811) Introduce Kafka Streams metrics recording levels

2016-12-05 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3811 started by Eno Thereska.
---
> Introduce Kafka Streams metrics recording levels
> 
>
> Key: KAFKA-3811
> URL: https://issues.apache.org/jira/browse/KAFKA-3811
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Eno Thereska
> Attachments: Muon-Snapshot.zip, Muon-latency.zip, screenshot-1.png, 
> screenshot-latency.png
>
>
> Follow-up from the discussions here:
> https://github.com/apache/kafka/pull/1447
> https://issues.apache.org/jira/browse/KAFKA-3769
> The proposal is to introduce configuration to control the granularity/volumes 
> of metrics emitted by Kafka Streams jobs, since the per-record level metrics 
> introduce non-trivial overhead and are possibly less useful once a job has 
> been optimized. 
> Proposal from guozhangwang:
> level0 (stream thread global): per-record process / punctuate latency, commit 
> latency, poll latency, etc
> level1 (per processor node, and per state store): IO latency, per-record .. 
> latency, forward throughput, etc.
> And by default we only turn on level0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2016-12-05 Thread Edoardo Comar
+1 (non binding)
--
Edoardo Comar
IBM MessageHub
eco...@uk.ibm.com
IBM UK Ltd, Hursley Park, SO21 2JN

IBM United Kingdom Limited Registered in England and Wales with number 
741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 
3AU



From:   Mickael Maison 
To: dev@kafka.apache.org
Date:   05/12/2016 14:38
Subject:[VOTE] KIP-81: Bound Fetch memory usage in the consumer



Hi all,

I'd like to start the vote for KIP-81:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer


Thank you




Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


[VOTE] KIP-81: Bound Fetch memory usage in the consumer

2016-12-05 Thread Mickael Maison
Hi all,

I'd like to start the vote for KIP-81:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer

Thank you


Re: [VOTE] KIP-96 - Add per partition metrics for in-sync and replica count

2016-12-05 Thread Bill Bejeck
+1

On Mon, Dec 5, 2016 at 8:07 AM, Ismael Juma  wrote:

> Thanks for the KIP, +1 (binding).
>
> Ismael
>
> On Thu, Dec 1, 2016 at 12:59 PM, Tom Crayford 
> wrote:
>
> > +1 (non-binding)
> >
> > On Thu, Dec 1, 2016 at 12:11 AM, Apurva Mehta 
> wrote:
> >
> > > +1 (non-binding)
> > >
> > > On Wed, Nov 30, 2016 at 2:00 PM, Jason Gustafson 
> > > wrote:
> > >
> > > > +1. Thanks for the KIP!
> > > >
> > > > On Wed, Nov 30, 2016 at 1:47 PM, Gwen Shapira 
> > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On Wed, Nov 30, 2016 at 1:34 PM, Xavier Léauté <
> xav...@confluent.io>
> > > > > wrote:
> > > > > > Based on the feedback KIP-96 seems pretty uncontroversial, so I'd
> > > like
> > > > to
> > > > > > initiate a vote on it.
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 96+-+Add+per+partition+metrics+for+in-sync+and+
> > assigned+replica+count
> > > > > >
> > > > > > Xavier
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Gwen Shapira
> > > > > Product Manager | Confluent
> > > > > 650.450.2760 | @gwenshap
> > > > > Follow us: Twitter | blog
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-96 - Add per partition metrics for in-sync and replica count

2016-12-05 Thread Jim Jagielski
+1
> On Nov 30, 2016, at 4:34 PM, Xavier Léauté  wrote:
> 
> Based on the feedback KIP-96 seems pretty uncontroversial, so I'd like to
> initiate a vote on it.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-96+-+Add+per+partition+metrics+for+in-sync+and+assigned+replica+count
> 
> Xavier



[jira] [Commented] (KAFKA-3228) Partition reassignment failure for brokers freshly added to cluster

2016-12-05 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722362#comment-15722362
 ] 

Ismael Juma commented on KAFKA-3228:


Thanks for confirming. I removed the fix version to avoid having duplicates in 
the release notes, it's enough to have the issue that has the "Fixed" 
resolution.

> Partition reassignment failure for brokers freshly added to cluster
> ---
>
> Key: KAFKA-3228
> URL: https://issues.apache.org/jira/browse/KAFKA-3228
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
>Reporter: Andrew Olson
>Assignee: Neha Narkhede
>
> After adding about new 20 brokers to double the size of an existing 
> production Kafka deployment, when attempting to rebalance partitions we were 
> initially unable to reassign any partitions to 5 of the 20. There was no 
> problem with the other 15. The controller broker logged error messages like:
> {noformat}
> ERROR kafka.controller.KafkaController: [Controller 19]: Error completing 
> reassignment of partition [TOPIC-NAME,2]
> kafka.common.KafkaException: Only 4,33 replicas out of the new set of 
> replicas 4,34,33 for partition [TOPIC-NAME,2]
> to be reassigned are alive. Failing partition reassignment
>   at 
> kafka.controller.KafkaController.initiateReassignReplicasForTopicPartition(KafkaController.scala:611)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply$mcV$sp(KafkaController.scala:1203)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at kafka.utils.Utils$.inLock(Utils.scala:535)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1196)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1195)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>   at 
> kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:1195)
>   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:751)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> We reattempted the reassignment to one of these new brokers, with the same 
> result.
> We also saw these messages in the controller's log. There was a "Broken pipe" 
> error for each of the new brokers.
> {noformat}
> 2016-02-09 12:13:22,082 WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest...
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
>   at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:148)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
>   at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
>   at 
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
>   at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>   at 
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> {noformat}
> WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest... to 
> broker id:34...
> Reconnecting to broker.
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at kafka.utils.Utils$.read(Utils.scala:381)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:133)
>   at 
> 

[jira] [Updated] (KAFKA-3228) Partition reassignment failure for brokers freshly added to cluster

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3228:
---
Fix Version/s: (was: 0.10.1.0)

> Partition reassignment failure for brokers freshly added to cluster
> ---
>
> Key: KAFKA-3228
> URL: https://issues.apache.org/jira/browse/KAFKA-3228
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
>Reporter: Andrew Olson
>Assignee: Neha Narkhede
>
> After adding about new 20 brokers to double the size of an existing 
> production Kafka deployment, when attempting to rebalance partitions we were 
> initially unable to reassign any partitions to 5 of the 20. There was no 
> problem with the other 15. The controller broker logged error messages like:
> {noformat}
> ERROR kafka.controller.KafkaController: [Controller 19]: Error completing 
> reassignment of partition [TOPIC-NAME,2]
> kafka.common.KafkaException: Only 4,33 replicas out of the new set of 
> replicas 4,34,33 for partition [TOPIC-NAME,2]
> to be reassigned are alive. Failing partition reassignment
>   at 
> kafka.controller.KafkaController.initiateReassignReplicasForTopicPartition(KafkaController.scala:611)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply$mcV$sp(KafkaController.scala:1203)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at kafka.utils.Utils$.inLock(Utils.scala:535)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1196)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1195)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>   at 
> kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:1195)
>   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:751)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> We reattempted the reassignment to one of these new brokers, with the same 
> result.
> We also saw these messages in the controller's log. There was a "Broken pipe" 
> error for each of the new brokers.
> {noformat}
> 2016-02-09 12:13:22,082 WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest...
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
>   at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:148)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
>   at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
>   at 
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
>   at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>   at 
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> {noformat}
> WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest... to 
> broker id:34...
> Reconnecting to broker.
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at kafka.utils.Utils$.read(Utils.scala:381)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:133)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> {noformat}
> INFO 

[jira] [Updated] (KAFKA-3228) Partition reassignment failure for brokers freshly added to cluster

2016-12-05 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-3228:

Fix Version/s: 0.10.1.0

> Partition reassignment failure for brokers freshly added to cluster
> ---
>
> Key: KAFKA-3228
> URL: https://issues.apache.org/jira/browse/KAFKA-3228
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
>Reporter: Andrew Olson
>Assignee: Neha Narkhede
> Fix For: 0.10.1.0
>
>
> After adding about new 20 brokers to double the size of an existing 
> production Kafka deployment, when attempting to rebalance partitions we were 
> initially unable to reassign any partitions to 5 of the 20. There was no 
> problem with the other 15. The controller broker logged error messages like:
> {noformat}
> ERROR kafka.controller.KafkaController: [Controller 19]: Error completing 
> reassignment of partition [TOPIC-NAME,2]
> kafka.common.KafkaException: Only 4,33 replicas out of the new set of 
> replicas 4,34,33 for partition [TOPIC-NAME,2]
> to be reassigned are alive. Failing partition reassignment
>   at 
> kafka.controller.KafkaController.initiateReassignReplicasForTopicPartition(KafkaController.scala:611)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply$mcV$sp(KafkaController.scala:1203)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at kafka.utils.Utils$.inLock(Utils.scala:535)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1196)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1195)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>   at 
> kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:1195)
>   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:751)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> We reattempted the reassignment to one of these new brokers, with the same 
> result.
> We also saw these messages in the controller's log. There was a "Broken pipe" 
> error for each of the new brokers.
> {noformat}
> 2016-02-09 12:13:22,082 WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest...
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
>   at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:148)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
>   at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
>   at 
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
>   at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>   at 
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> {noformat}
> WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest... to 
> broker id:34...
> Reconnecting to broker.
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at kafka.utils.Utils$.read(Utils.scala:381)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:133)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> 

[jira] [Commented] (KAFKA-3228) Partition reassignment failure for brokers freshly added to cluster

2016-12-05 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722343#comment-15722343
 ] 

Andrew Olson commented on KAFKA-3228:
-

Added fix version and duplicate link.

> Partition reassignment failure for brokers freshly added to cluster
> ---
>
> Key: KAFKA-3228
> URL: https://issues.apache.org/jira/browse/KAFKA-3228
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
>Reporter: Andrew Olson
>Assignee: Neha Narkhede
> Fix For: 0.10.1.0
>
>
> After adding about new 20 brokers to double the size of an existing 
> production Kafka deployment, when attempting to rebalance partitions we were 
> initially unable to reassign any partitions to 5 of the 20. There was no 
> problem with the other 15. The controller broker logged error messages like:
> {noformat}
> ERROR kafka.controller.KafkaController: [Controller 19]: Error completing 
> reassignment of partition [TOPIC-NAME,2]
> kafka.common.KafkaException: Only 4,33 replicas out of the new set of 
> replicas 4,34,33 for partition [TOPIC-NAME,2]
> to be reassigned are alive. Failing partition reassignment
>   at 
> kafka.controller.KafkaController.initiateReassignReplicasForTopicPartition(KafkaController.scala:611)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply$mcV$sp(KafkaController.scala:1203)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at kafka.utils.Utils$.inLock(Utils.scala:535)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1196)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1195)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>   at 
> kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:1195)
>   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:751)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> We reattempted the reassignment to one of these new brokers, with the same 
> result.
> We also saw these messages in the controller's log. There was a "Broken pipe" 
> error for each of the new brokers.
> {noformat}
> 2016-02-09 12:13:22,082 WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest...
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
>   at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:148)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
>   at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
>   at 
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
>   at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>   at 
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> {noformat}
> WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest... to 
> broker id:34...
> Reconnecting to broker.
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at kafka.utils.Utils$.read(Utils.scala:381)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:133)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at 

[jira] [Commented] (KAFKA-3228) Partition reassignment failure for brokers freshly added to cluster

2016-12-05 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722341#comment-15722341
 ] 

Andrew Olson commented on KAFKA-3228:
-

Since the exception message that we saw has been removed from the code, I agree 
that this should be marked resolved. The bug description for KAFKA-4214 makes 
sense for applicability to this scenario as well.

> Partition reassignment failure for brokers freshly added to cluster
> ---
>
> Key: KAFKA-3228
> URL: https://issues.apache.org/jira/browse/KAFKA-3228
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
>Reporter: Andrew Olson
>Assignee: Neha Narkhede
> Fix For: 0.10.1.0
>
>
> After adding about new 20 brokers to double the size of an existing 
> production Kafka deployment, when attempting to rebalance partitions we were 
> initially unable to reassign any partitions to 5 of the 20. There was no 
> problem with the other 15. The controller broker logged error messages like:
> {noformat}
> ERROR kafka.controller.KafkaController: [Controller 19]: Error completing 
> reassignment of partition [TOPIC-NAME,2]
> kafka.common.KafkaException: Only 4,33 replicas out of the new set of 
> replicas 4,34,33 for partition [TOPIC-NAME,2]
> to be reassigned are alive. Failing partition reassignment
>   at 
> kafka.controller.KafkaController.initiateReassignReplicasForTopicPartition(KafkaController.scala:611)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply$mcV$sp(KafkaController.scala:1203)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at kafka.utils.Utils$.inLock(Utils.scala:535)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1196)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1195)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>   at 
> kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:1195)
>   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:751)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> We reattempted the reassignment to one of these new brokers, with the same 
> result.
> We also saw these messages in the controller's log. There was a "Broken pipe" 
> error for each of the new brokers.
> {noformat}
> 2016-02-09 12:13:22,082 WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest...
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
>   at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:148)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
>   at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
>   at 
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
>   at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>   at 
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> {noformat}
> WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest... to 
> broker id:34...
> Reconnecting to broker.
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at kafka.utils.Utils$.read(Utils.scala:381)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at 
> 

Re: [VOTE] KIP-96 - Add per partition metrics for in-sync and replica count

2016-12-05 Thread Ismael Juma
Thanks for the KIP, +1 (binding).

Ismael

On Thu, Dec 1, 2016 at 12:59 PM, Tom Crayford  wrote:

> +1 (non-binding)
>
> On Thu, Dec 1, 2016 at 12:11 AM, Apurva Mehta  wrote:
>
> > +1 (non-binding)
> >
> > On Wed, Nov 30, 2016 at 2:00 PM, Jason Gustafson 
> > wrote:
> >
> > > +1. Thanks for the KIP!
> > >
> > > On Wed, Nov 30, 2016 at 1:47 PM, Gwen Shapira 
> wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > On Wed, Nov 30, 2016 at 1:34 PM, Xavier Léauté 
> > > > wrote:
> > > > > Based on the feedback KIP-96 seems pretty uncontroversial, so I'd
> > like
> > > to
> > > > > initiate a vote on it.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 96+-+Add+per+partition+metrics+for+in-sync+and+
> assigned+replica+count
> > > > >
> > > > > Xavier
> > > >
> > > >
> > > >
> > > > --
> > > > Gwen Shapira
> > > > Product Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter | blog
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-96 - Add per partition metrics for in-sync and assigned replica count

2016-12-05 Thread Ismael Juma
Hi Gwen,

At this level, that's probably right, so sounds good (at an implementor's
level, I think it's more subtle).

Ismael

On Fri, Dec 2, 2016 at 8:51 PM, Gwen Shapira  wrote:

> All replicas are assigned replicas. Calling it AssignedReplicasCount
> makes it sound like we have non-assigned replicas that we are also
> counting somewhere.
>
> (Guess who requested the change...)
>
> On Fri, Dec 2, 2016 at 5:25 AM, Ismael Juma  wrote:
> > Hi Xavier,
> >
> > Can you please share the reasoning for the name change? Good to record
> such
> > things for posterity. :)
> >
> > Ismael
> >
> > On Wed, Nov 30, 2016 at 9:20 PM, Xavier Léauté 
> wrote:
> >
> >> FYI, Based on internal feedback I renamed AssignedReplicasCount to
> simply
> >> be called ReplicasCount.
> >>
> >> On Tue, Nov 29, 2016 at 7:56 PM Neha Narkhede 
> wrote:
> >>
> >> > This seems useful, +1
> >> >
> >> > On Tue, Nov 29, 2016 at 5:39 AM Ismael Juma 
> wrote:
> >> >
> >> > > Hi Xavier,
> >> > >
> >> > > Thanks for the KIP. Sounds good to me.
> >> > >
> >> > > Ismael
> >> > >
> >> > > On Tue, Nov 29, 2016 at 12:40 AM, Xavier Léauté <
> xav...@confluent.io>
> >> > > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I created KIP-96 to propose per partition in-sync / assigned
> replica
> >> > > > metrics. Should be straightforward, but submitting it for proposal
> >> > since
> >> > > we
> >> > > > require it for metrics changes.
> >> > > >
> >> > > > Here's the link to the KIP:
> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > 96+-+Add+per+partition+metrics+for+in-sync+and+
> >> assigned+replica+count
> >> > > >
> >> > > > Thank you,
> >> > > > Xavier
> >> > > >
> >> > >
> >> > --
> >> > Thanks,
> >> > Neha
> >> >
> >>
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


[jira] [Resolved] (KAFKA-2871) Newly replicated brokers don't expire log segments properly

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-2871.

Resolution: Duplicate

This was changed as part of KIP-33.

> Newly replicated brokers don't expire log segments properly
> ---
>
> Key: KAFKA-2871
> URL: https://issues.apache.org/jira/browse/KAFKA-2871
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Evan Huus
>Assignee: Neha Narkhede
>Priority: Minor
>
> We recently brought up a few brokers to replace some existing nodes, and used 
> the provided script to reassign partitions from the retired nodes to the new 
> ones, one at a time.
> A little while after the fact, we noticed extreme disk usage on the new 
> nodes. Tracked this down to the fact that the replicated segments are all 
> timestamped from the moment of replication rather than using whatever 
> timestamp was set on the original node. Since this is the timestamp the log 
> roller uses, it takes a full week (rollover time) before any data is purged 
> from the new brokers.
> In the short term, what is the safest workaround? Can we just `rm` these old 
> segments, or should we be messing with the filesystem metadata so kafka 
> removes them itself?
> In the longer term, the partition mover should be setting timestamps 
> appropriately on the segments it moves.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-87) time based segment index

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-87?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-87.
--
Resolution: Duplicate

Duplicate of KAFKA-3163

> time based segment index
> 
>
> Key: KAFKA-87
> URL: https://issues.apache.org/jira/browse/KAFKA-87
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Chris Burroughs
>Assignee: Chris Burroughs
>
> A time index that:
>  - Has minimal performance impact (such as by being append only)
>  - Is suitable for
>  - Works with getOffsetsBefore
>  - Can have it's granularity configured.  With numbers >= 1 minute being 
> "normal".
>  - Can be disabled
> See mailing list discussion : 
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/201107.mbox/%3c4e2f678e.6060...@gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-740) Improve crash-safety of log segment swap

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-740:
--
Labels: reliability  (was: )

> Improve crash-safety of log segment swap
> 
>
> Key: KAFKA-740
> URL: https://issues.apache.org/jira/browse/KAFKA-740
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Jay Kreps
>Assignee: Jay Kreps
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> Currently Log.replaceSegments has a bug that can cause a swap containing 
> multiple segments to partially complete. This would lead to duplicate data in 
> the log.
> The proposed fix is to use a name like offset1_and_offset2.swap for a segment 
> meant to replace segments with base offsets offset1 and offset2.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-746) Snappy compression isn't applied to messages written to disk on broker

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-746.
---
Resolution: Fixed

I am going to mark this as fixed as there have been no reports of this for 
several years.

> Snappy compression isn't applied to messages written to disk on broker
> --
>
> Key: KAFKA-746
> URL: https://issues.apache.org/jira/browse/KAFKA-746
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.7.2
>Reporter: Esko Suomi
>Assignee: Jun Rao
>Priority: Minor
>
> We recently tested 0.7.2 performance using different compression codecs for 
> messages and noticed that when using Snappy (compression.codec = 2 in 
> Producer configuration) the messages written to topic partitions were written 
> without compression. Compared to GZIP (compression.codec = 1) this is 
> especially peculiar since with GZIP the partition files themselves were 
> compressed as expected.
> I did walk through the 0.8 code briefly and didn't spot any obvious reasons 
> why this would be happening so I'm now handing this issue completely into 
> your able hands.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1106) HighwaterMarkCheckpoint failure puting broker into a bad state

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-1106:
---
Labels: reliability  (was: )

> HighwaterMarkCheckpoint failure puting broker into a bad state
> --
>
> Key: KAFKA-1106
> URL: https://issues.apache.org/jira/browse/KAFKA-1106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.0
>Reporter: David Lao
>  Labels: reliability
> Attachments: KAFKA-1106-patch, kafka.log
>
>
> I'm encountering a case where broker get stuck due to HighwaterMarkCheckpoint 
> failing to recover from reading what appear to be corrupted isr entries. Once 
> in this state, leader election can never succeed and hence stalling the 
> entire cluster. 
> Please see the detailed stack trace from the attached log.  Perhaps failing 
> fast when HighwaterMarkCheckpoint fails to read would force the broker to 
> restart and recover.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1833) OfflinePartitionLeaderSelector may return null leader when ISR and Assgined Broker have no common

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-1833:
---
Labels: easyfix reliability  (was: easyfix)

> OfflinePartitionLeaderSelector may return null leader when ISR and Assgined 
> Broker have no common
> -
>
> Key: KAFKA-1833
> URL: https://issues.apache.org/jira/browse/KAFKA-1833
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.0
>Reporter: xiajun
>Assignee: Neha Narkhede
>  Labels: easyfix, reliability
> Attachments: KAFKA-1883.patch
>
>
> In OfflinePartitonLeaderSelector::selectLeader, when liveBrokerInIsr is not 
> empty and have no common broker with liveAssignedreplicas, selectLeader will 
> return no leader;



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1907) ZkClient can block controlled shutdown indefinitely

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-1907:
---
Labels: reliability zkclient-problems  (was: zkclient-problems)

> ZkClient can block controlled shutdown indefinitely
> ---
>
> Key: KAFKA-1907
> URL: https://issues.apache.org/jira/browse/KAFKA-1907
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 0.8.2.0
>Reporter: Ewen Cheslack-Postava
>Assignee: jaikiran pai
>  Labels: reliability, zkclient-problems
> Attachments: KAFKA-1907.patch, KAFKA-1907.patch, 
> KAFKA-1907_2015-05-25_09:18:14.patch
>
>
> There are some calls to ZkClient via ZkUtils in 
> KafkaServer.controlledShutdown() that can block indefinitely because they 
> internally call waitUntilConnected. The ZkClient API doesn't provide an 
> alternative with timeouts, so fixing this will require enforcing timeouts in 
> some other way.
> This may be a more general issue if there are any non daemon threads that 
> also call ZkUtils methods.
> Stacktrace showing the issue:
> {code}
> "Thread-2" prio=10 tid=0xb3305000 nid=0x4758 waiting on condition [0x6ad69000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x70a93368> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:267)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2130)
> at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
> at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:456)
> at kafka.utils.ZkUtils$.getController(ZkUtils.scala:65)
> at 
> kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:194)
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:269)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:45)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:269)
> at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:42)
> at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2158) Close all fetchers in AbstractFetcherManager without blocking

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-2158:
---
Resolution: Duplicate
Status: Resolved  (was: Patch Available)

Duplicate of KAFKA-4319.

> Close all fetchers in AbstractFetcherManager without blocking
> -
>
> Key: KAFKA-2158
> URL: https://issues.apache.org/jira/browse/KAFKA-2158
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Jiasheng Wang
> Attachments: KAFKA-2158.patch
>
>
> def closeAllFetchers() {
> mapLock synchronized {
>   for ( (_, fetcher) <- fetcherThreadMap) {
> fetcher.shutdown()
>   }
>   fetcherThreadMap.clear()
> }
>   }
> It is time consuming for closeAllFetchers() in AbstractFetcherManager.scala 
> because each time a fetcher calls shutdown method it will block until 
> awaitShutdown() returns. As a result it will slow down the restart of kafka 
> service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2303) Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction failures

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-2303.

Resolution: Duplicate

I think this is a duplicate of KAFKA-3894. Please reopen if you disagree.

> Fix for KAFKA-2235 LogCleaner offset map overflow causes another compaction 
> failures
> 
>
> Key: KAFKA-2303
> URL: https://issues.apache.org/jira/browse/KAFKA-2303
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 0.8.2.1
>Reporter: Alexander Demidko
>Assignee: Guozhang Wang
>
> We have rolled out the patch for KAFKA-2235 to our kafka cluster, and 
> recently instead of 
> {code}
> "kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: Attempt to add a new 
> entry to a full offset map." 
> {code}
> we started to see 
> {code}
> kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: 131390902 messages in 
> segment -cgstate-8/79840768.log but offset map can 
> fit only 80530612. You can increase log.cleaner.dedupe.buffer.size or 
> decrease log.cleaner.threads
> {code}
> So, we had to roll it back to avoid disk depletion although I'm not sure if 
> it needs to be rolled back in trunk. This patch applies more strict checks 
> than were in place before: even if there is only one unique key for a 
> segment, cleanup will fail if this segment is too big. 
> Does it make sense to eliminate a limit for the offset map slots count, for 
> example to use an offset map backed by a memory mapped file?
> The limit of 80530612 slots comes from memory / bytesPerEntry, where memory 
> is Int.MaxValue (we use only one cleaner thread) and bytesPerEntry is 8 + 
> digest hash size. Might be wrong, but it seems if the overall number of 
> unique keys per partition is more than 80M slots in an OffsetMap, compaction 
> will always fail and cleaner thread will die. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2530) metrics for old replica fetcher thread need to be deregistered

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-2530.

Resolution: Duplicate

I believe this is a duplicate of KAFKA-3632.

> metrics for old replica fetcher thread need to be deregistered
> --
>
> Key: KAFKA-2530
> URL: https://issues.apache.org/jira/browse/KAFKA-2530
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Jun Rao
>
> Currently, the lag metrics in the replica fetcher has the following format 
> where the leader broker id is included in the clientId tag.
> clientId="ReplicaFetcherThread-0-101",partition="0",topic="test",mbean_property_type="FetcherLagMetrics",Value="262"
> There are a couple of issues. (1) When the replica changes from a follower to 
> a leader, we will need to set the lag to 0 or deregister the metric. (2) 
> Similarly, when the follower switch to another leader, we should deregister 
> the metric or clear the value. Also, we probably should remove the leader 
> broker id from the clientId tag. That way, the metric name doesn't change 
> when the follower switches leaders.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2540) Inconsistent constant pertaining to autoCommitEnable

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-2540.

Resolution: Won't Fix

It's a fair point, but the new consumer has been out for a while and the old 
consumers will be deprecated, so we will just live with this inconsistency.

> Inconsistent constant pertaining to autoCommitEnable
> 
>
> Key: KAFKA-2540
> URL: https://issues.apache.org/jira/browse/KAFKA-2540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.8.2.1
>Reporter: Muthu Jayakumar
>Assignee: Neha Narkhede
>Priority: Minor
>
> Hello there,
> It seems like the class kafka.consumer.ConsumerConfig containing
> {quote}
> val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
> {quote}
> is inconsistent with org.apache.kafka.clients.consumer.ConsumerConfig
> {quote}
> public static final String ENABLE_AUTO_COMMIT_CONFIG = 
> "enable.auto.commit";
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2545) SSLConsumerTest.testSeek fails with JDK8u60

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-2545.

Resolution: Not A Problem

Only happens on my laptop (not sure why), so closing.

> SSLConsumerTest.testSeek fails with JDK8u60
> ---
>
> Key: KAFKA-2545
> URL: https://issues.apache.org/jira/browse/KAFKA-2545
> Project: Kafka
>  Issue Type: Bug
>  Components: security
> Environment: OS X 10.11
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Minor
>
> This fails consistently for me with JDK8u60, but passes with JDK7u80. I don't 
> know if this is a real problem with the implementation or just an issue with 
> the test, but we need to investigate before the release.
> Stacktrace follows:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 3000 ms.
>   at 
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:639)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:406)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:297)
>   at kafka.api.SSLConsumerTest$$anonfun$1.apply(SSLConsumerTest.scala:212)
>   at kafka.api.SSLConsumerTest$$anonfun$1.apply(SSLConsumerTest.scala:211)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.Range.foreach(Range.scala:141)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at kafka.api.SSLConsumerTest.sendRecords(SSLConsumerTest.scala:211)
>   at kafka.api.SSLConsumerTest.testSeek(SSLConsumerTest.scala:144)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update 
> metadata after 3000 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2627) Kafka Heap Size increase impact performance badly

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-2627.

Resolution: Not A Problem

As stated, this is a GC tuning issue and not a Kafka issue.

> Kafka Heap Size increase impact performance badly
> -
>
> Key: KAFKA-2627
> URL: https://issues.apache.org/jira/browse/KAFKA-2627
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
> Environment: CentOS Linux release 7.0.1406 (Core)
> NAME="CentOS Linux"
> VERSION="7 (Core)"
> ID="centos"
> ID_LIKE="rhel fedora"
> VERSION_ID="7"
> PRETTY_NAME="CentOS Linux 7 (Core)"
> ANSI_COLOR="0;31"
> CPE_NAME="cpe:/o:centos:centos:7"
> HOME_URL="https://www.centos.org/;
> BUG_REPORT_URL="https://bugs.centos.org/;
> CentOS Linux release 7.0.1406 (Core)
> CentOS Linux release 7.0.1406 (Core)
>Reporter: Mihir Pandya
>
> Initial Kafka server was configured with 
> KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
> As we have high resource to utilize, we changed it to below value 
> KAFKA_HEAP_OPTS="-Xmx16G -Xms8G"
> Change highly impacted Kafka & Zookeeper, we started getting various issue at 
> both end.
> We were not getting all replica in ISR. And it was an issue with Leader 
> Selection which in-turn throwing Socket Connection Error.
> To debug, we checked kafaServer-gc.log, we were getting GC(Allocation 
> Failure) though we have lot more Memory is avalable.
> == GC Error ===
> 2015-10-08T09:43:08.796+: 4.651: [GC (Allocation Failure) 4.651: [ParNew: 
> 272640K->7265K(306688K), 0.0277514 secs] 272640K->7265K(1014528K), 0.0281243 
> secs] [Times: user=0.03 sys=0.05, real=0.03 secs]
> 2015-10-08T09:43:11.317+: 7.172: [GC (Allocation Failure) 7.172: [ParNew: 
> 279905K->3793K(306688K), 0.0157898 secs] 279905K->3793K(1014528K), 0.0159913 
> secs] [Times: user=0.03 sys=0.01, real=0.02 secs]
> 2015-10-08T09:43:13.522+: 9.377: [GC (Allocation Failure) 9.377: [ParNew: 
> 276433K->2827K(306688K), 0.0064236 secs] 276433K->2827K(1014528K), 0.0066834 
> secs] [Times: user=0.03 sys=0.00, real=0.01 secs]
> 2015-10-08T09:43:15.518+: 11.372: [GC (Allocation Failure) 11.373: 
> [ParNew: 275467K->3090K(306688K), 0.0055454 secs] 275467K->3090K(1014528K), 
> 0.0057979 secs] [Times: user=0.02 sys=0.00, real=0.01 secs]
> 2015-10-08T09:43:17.558+: 13.412: [GC (Allocation Failure) 13.412: 
> [ParNew: 275730K->3346K(306688K), 0.0053757 secs] 275730K->3346K(1014528K), 
> 0.0055039 secs] [Times: user=0.02 sys=0.00, real=0.01 secs]
> 
> = Other Kafka Errors =
> [2015-10-01 15:35:19,039] INFO conflict in /brokers/ids/3 data: 
> {"jmx_port":-1,"timestamp":"1443709506024","host":"","version":1,"port":9092}
>  stored data: 
> {"jmx_port":-1,"timestamp":"1443702430352","host":"","version":1,"port":9092}
>  (kafka.utils.ZkUtils$)
> [2015-10-01 15:35:19,042] INFO I wrote this conflicted ephemeral node 
> [{"jmx_port":-1,"timestamp":"1443709506024","host":"","version":1,"port":9092}]
>  at /brokers/ids/3 a while back in a different session, hence I will backoff 
> for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> [2015-10-01 15:23:12,378] INFO Closing socket connection to /172.28.72.162. 
> (kafka.network.Processor)
> [2015-10-01 15:23:12,378] INFO Closing socket connection to /172.28.72.162. 
> (kafka.network.Processor)
> [2015-10-01 15:21:53,831] ERROR [ReplicaFetcherThread-4-1], Error for 
> partition [workorder-topic,1] to broker 1:class 
> kafka.common.NotLeaderForPartitionException 
> (kafka.server.ReplicaFetcherThread)
> [2015-10-01 15:21:53,834] ERROR [ReplicaFetcherThread-4-1], Error for 
> partition [workorder-topic,1] to broker 1:class 
> kafka.common.NotLeaderForPartitionException 
> (kafka.server.ReplicaFetcherThread)
> [2015-10-01 15:21:53,835] ERROR [ReplicaFetcherThread-4-1], Error for 
> partition [workorder-topic,1] to broker 1:class 
> kafka.common.NotLeaderForPartitionException 
> (kafka.server.ReplicaFetcherThread)
> [2015-10-01 15:21:53,837] ERROR [ReplicaFetcherThread-4-1], Error for 
> partition [workorder-topic,1] to broker 1:class 
> kafka.common.NotLeaderForPartitionException 
> (kafka.server.ReplicaFetcherThread)
> [2015-10-01 15:20:36,210] WARN [ReplicaFetcherThread-0-2], Error in fetch 
> Name: FetchRequest; Version: 0; CorrelationId: 9; ClientId: 
> ReplicaFetcherThread-0-2; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes; 
> RequestInfo: [__consumer_offsets,17] -> 
> PartitionFetchInfo(0,1048576),[__consumer_offsets,23] -> 
> PartitionFetchInfo(0,1048576),[__consumer_offsets,29] -> 
> PartitionFetchInfo(0,1048576),[__consumer_offsets,35] -> 
> PartitionFetchInfo(0,1048576),[__consumer_offsets,41] -> 
> 

[jira] [Updated] (KAFKA-3033) Reassigning partition stuck in progress

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3033:
---
Labels: reliability  (was: )

> Reassigning partition stuck in progress
> ---
>
> Key: KAFKA-3033
> URL: https://issues.apache.org/jira/browse/KAFKA-3033
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.9.0.0
> Environment: centos 7.2
>Reporter: Leo Xuzhang Lin
>Assignee: Neha Narkhede
>Priority: Critical
>  Labels: reliability
>
> We were trying to increase the replication factor on a test topic we've 
> created. 
> We followed the documentation's instruction:
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> and received:
> ```
> Current partition replica assignment
> {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1]}]}
> Save this to use as the --reassignment-json-file option during rollback
> Successfully started reassignment of partitions 
> {"version":1,"partitions":[{"topic":"test"
> ,"partition":0,"replicas":["1","2","3"]}]}
> ```
> After that whenever we try verify, it is stuck on:
> ```
> Status of partition reassignment:
> Reassignment of partition [test,0] is still in progress
> ```
> - We tried restarting the cluster and it still did not work.
> - The topic has 1 partition
> - The zookeeper /admin/reassign_partitions znode is empty



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3126) Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr in zookeeper is not updated during controlled shutdown.

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3126:
---
Labels: reliability  (was: )

> Weird behavior in kafkaController on Controlled shutdowns. The leaderAndIsr 
> in zookeeper is not updated during controlled shutdown.
> ---
>
> Key: KAFKA-3126
> URL: https://issues.apache.org/jira/browse/KAFKA-3126
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mayuresh Gharat
>  Labels: reliability
>
> Consider Broker B is controller, broker A is undergoing shutdown. 
> 2016/01/14 19:49:22.884 [KafkaController] [Controller B]: Shutting down 
> broker A
> 2016/01/14 19:49:22.918 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic1,Partition=1,Replica=A] ---> (1)
> 2016/01/14 19:49:22.930 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic1,1] is {"leader":D,"leader_epoch":1,"isr":[D]} 
> --> (2)
> 2016/01/14 19:49:23.028 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=1,Replica=A] ---> (3)
> 2016/01/14 19:49:23.032 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":10,"isr":[C]} 
> -> (4)
> 2016/01/14 19:49:23.996 [KafkaController] [Controller B]: Broker failure 
> callback for A
> 2016/01/14 19:49:23.997 [PartitionStateMachine] [Partition state machine on 
> Controller B]: Invoking state change to OfflinePartition for partitions 
> 2016/01/14 19:49:23.998 [ReplicaStateMachine] [Replica state machine on 
> controller B]: Invoking state change to OfflineReplica for replicas 
> [Topic=testTopic2,Partition=0,Replica=A],
> [Topic=__consumer_offsets,Partition=5,Replica=A],
> [Topic=testTopic1,Partition=2,Replica=A],
> [Topic=__consumer_offsets,Partition=96,Replica=A],
> [Topic=testTopic2,Partition=1,Replica=A],
> [Topic=__consumer_offsets,Partition=36,Replica=A],
> [Topic=testTopic1,Partition=4,Replica=A],
> [Topic=__consumer_offsets,Partition=85,Replica=A],
> [Topic=testTopic1,Partition=6,Replica=A],
> [Topic=testTopic1,Partition=1,Replica=A]
> 2016/01/14 19:49:24.029 [KafkaController] [Controller B]: New leader and ISR 
> for partition [testTopic2,1] is {"leader":C,"leader_epoch":11,"isr":[C]} 
> --> (5)
> 2016/01/14 19:49:24.212 [KafkaController] [Controller B]: Cannot remove 
> replica A from ISR of partition [testTopic1,1] since it is not in the ISR. 
> Leader = D ; ISR = List(D) --> (6)
> If after (1) and (2) controller gets rid of the replica A from the ISR in 
> zookeeper for [testTopic1-1] as displayed in 6), why doesn't it do the  same 
> for [testTopic2-1] as per (5)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3228) Partition reassignment failure for brokers freshly added to cluster

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3228.

Resolution: Duplicate

I think this is the same as KAFKA-4214. Please reopen if you disagree.

> Partition reassignment failure for brokers freshly added to cluster
> ---
>
> Key: KAFKA-3228
> URL: https://issues.apache.org/jira/browse/KAFKA-3228
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.1
>Reporter: Andrew Olson
>Assignee: Neha Narkhede
>
> After adding about new 20 brokers to double the size of an existing 
> production Kafka deployment, when attempting to rebalance partitions we were 
> initially unable to reassign any partitions to 5 of the 20. There was no 
> problem with the other 15. The controller broker logged error messages like:
> {noformat}
> ERROR kafka.controller.KafkaController: [Controller 19]: Error completing 
> reassignment of partition [TOPIC-NAME,2]
> kafka.common.KafkaException: Only 4,33 replicas out of the new set of 
> replicas 4,34,33 for partition [TOPIC-NAME,2]
> to be reassigned are alive. Failing partition reassignment
>   at 
> kafka.controller.KafkaController.initiateReassignReplicasForTopicPartition(KafkaController.scala:611)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply$mcV$sp(KafkaController.scala:1203)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4$$anonfun$apply$6.apply(KafkaController.scala:1197)
>   at kafka.utils.Utils$.inLock(Utils.scala:535)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1196)
>   at 
> kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$4.apply(KafkaController.scala:1195)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>   at 
> kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:1195)
>   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:751)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> We reattempted the reassignment to one of these new brokers, with the same 
> result.
> We also saw these messages in the controller's log. There was a "Broken pipe" 
> error for each of the new brokers.
> {noformat}
> 2016-02-09 12:13:22,082 WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest...
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
>   at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:148)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
>   at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
>   at 
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
>   at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>   at 
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {noformat}
> {noformat}
> WARN kafka.controller.RequestSendThread: 
> [Controller-19-to-broker-34-send-thread],
> Controller 19 epoch 28 fails to send request Name:UpdateMetadataRequest... to 
> broker id:34...
> Reconnecting to broker.
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>   at kafka.utils.Utils$.read(Utils.scala:381)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:133)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at 

[jira] [Updated] (KAFKA-3240) Replication issues

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3240:
---
Labels: reliability  (was: )

> Replication issues
> --
>
> Key: KAFKA-3240
> URL: https://issues.apache.org/jira/browse/KAFKA-3240
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0, 0.8.2.2, 0.9.0.1
> Environment: FreeBSD 10.2-RELEASE-p9
>Reporter: Jan Omar
>  Labels: reliability
>
> Hi,
> We are trying to replace our 3-broker cluster running on 0.6 with a new 
> cluster on 0.9.0.1 (but tried 0.8.2.2 and 0.9.0.0 as well).
> - 3 kafka nodes with one zookeeper instance on each machine
> - FreeBSD 10.2 p9
> - Nagle off (sysctl net.inet.tcp.delayed_ack=0)
> - all kafka machines write a ZFS ZIL to a dedicated SSD
> - 3 producers on 3 machines, writing to 1 topics, partitioning 3, replication 
> factor 3
> - acks all
> - 10 Gigabit Ethernet, all machines on one switch, ping 0.05 ms worst case.
> While using the ProducerPerformance or rdkafka_performance we are seeing very 
> strange Replication errors. Any hint on what's going on would be highly 
> appreciated. Any suggestion on how to debug this properly would help as well.
> This is what our broker config looks like:
> {code}
> broker.id=5
> auto.create.topics.enable=false
> delete.topic.enable=true
> listeners=PLAINTEXT://:9092
> port=9092
> host.name=kafka-five.acc
> advertised.host.name=10.5.3.18
> zookeeper.connect=zookeeper-four.acc:2181,zookeeper-five.acc:2181,zookeeper-six.acc:2181
> zookeeper.connection.timeout.ms=6000
> num.replica.fetchers=1
> replica.fetch.max.bytes=1
> replica.fetch.wait.max.ms=500
> replica.high.watermark.checkpoint.interval.ms=5000
> replica.socket.timeout.ms=30
> replica.socket.receive.buffer.bytes=65536
> replica.lag.time.max.ms=1000
> min.insync.replicas=2
> controller.socket.timeout.ms=3
> controller.message.queue.size=100
> log.dirs=/var/db/kafka
> num.partitions=8
> message.max.bytes=1
> auto.create.topics.enable=false
> log.index.interval.bytes=4096
> log.index.size.max.bytes=10485760
> log.retention.hours=168
> log.flush.interval.ms=1
> log.flush.interval.messages=2
> log.flush.scheduler.interval.ms=2000
> log.roll.hours=168
> log.retention.check.interval.ms=30
> log.segment.bytes=536870912
> zookeeper.connection.timeout.ms=100
> zookeeper.sync.time.ms=5000
> num.io.threads=8
> num.network.threads=4
> socket.request.max.bytes=104857600
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> queued.max.requests=10
> fetch.purgatory.purge.interval.requests=100
> producer.purgatory.purge.interval.requests=100
> replica.lag.max.messages=1000
> {code}
> These are the errors we're seeing:
> {code:borderStyle=solid}
> ERROR [Replica Manager on Broker 5]: Error processing fetch operation on 
> partition [test,0] offset 50727 (kafka.server.ReplicaManager)
> java.lang.IllegalStateException: Invalid message size: 0
>   at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:141)
>   at kafka.log.LogSegment.translateOffset(LogSegment.scala:105)
>   at kafka.log.LogSegment.read(LogSegment.scala:126)
>   at kafka.log.Log.read(Log.scala:506)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536)
>   at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507)
>   at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462)
>   at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)0
> {code}
> and 
> {code}
> ERROR Found invalid messages during fetch for partition [test,0] offset 2732 
> error Message found with corrupt size (0) in shallow iterator 
> (kafka.server.ReplicaFetcherThread)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3939) add new consumer metrics in docs

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3939.

Resolution: Duplicate

Will be done as part of KAFKA-3480.

> add new consumer metrics in docs
> 
>
> Key: KAFKA-3939
> URL: https://issues.apache.org/jira/browse/KAFKA-3939
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>
> In the monitoring section of our documentation, we have metrics for the 
> broker and the producer. It would be useful to add the metrics for the new 
> java consumer as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3900) High CPU util on broker

2016-12-05 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722088#comment-15722088
 ] 

Ismael Juma commented on KAFKA-3900:


Does this still happen with 0.10.1.0? There was a fix in the code appearing in 
the stacktrace.

> High CPU util on broker
> ---
>
> Key: KAFKA-3900
> URL: https://issues.apache.org/jira/browse/KAFKA-3900
> Project: Kafka
>  Issue Type: Bug
>  Components: network, replication
>Affects Versions: 0.10.0.0
> Environment: kafka = 2.11-0.10.0.0
> java version "1.8.0_91"
> amazon linux
>Reporter: Andrey Konyaev
>  Labels: reliability
>
> I start kafka cluster in amazon with m4.xlarge (4 cpu and 16 GB mem (14 
> allocate for kafka in heap)). Have three nodes.
> I haven't high load (6000 message/sec) and we have cpu_idle = 70%, but 
> sometime (about once a day) I see this message in server.log:
> [2016-06-24 14:52:22,299] WARN [ReplicaFetcherThread-0-2], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6eaa1034 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
> at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
> at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
> at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> I know, this can be network glitch, but why kafka eat all cpu time?
> My config:
> inter.broker.protocol.version=0.10.0.0
> log.message.format.version=0.10.0.0
> default.replication.factor=3
> num.partitions=3
> replica.lag.time.max.ms=15000
> broker.id=0
> listeners=PLAINTEXT://:9092
> log.dirs=/mnt/kafka/kafka
> log.retention.check.interval.ms=30
> log.retention.hours=168
> log.segment.bytes=1073741824
> num.io.threads=20
> num.network.threads=10
> num.partitions=1
> num.recovery.threads.per.data.dir=2
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> zookeeper.connection.timeout.ms=6000
> delete.topic.enable = true
> broker.max_heap_size=10 GiB 
>   
> Any ideas?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3900) High CPU util on broker

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3900:
---
Labels: reliability  (was: )

> High CPU util on broker
> ---
>
> Key: KAFKA-3900
> URL: https://issues.apache.org/jira/browse/KAFKA-3900
> Project: Kafka
>  Issue Type: Bug
>  Components: network, replication
>Affects Versions: 0.10.0.0
> Environment: kafka = 2.11-0.10.0.0
> java version "1.8.0_91"
> amazon linux
>Reporter: Andrey Konyaev
>  Labels: reliability
>
> I start kafka cluster in amazon with m4.xlarge (4 cpu and 16 GB mem (14 
> allocate for kafka in heap)). Have three nodes.
> I haven't high load (6000 message/sec) and we have cpu_idle = 70%, but 
> sometime (about once a day) I see this message in server.log:
> [2016-06-24 14:52:22,299] WARN [ReplicaFetcherThread-0-2], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@6eaa1034 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
> at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
> at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
> at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
> at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
> at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> I know, this can be network glitch, but why kafka eat all cpu time?
> My config:
> inter.broker.protocol.version=0.10.0.0
> log.message.format.version=0.10.0.0
> default.replication.factor=3
> num.partitions=3
> replica.lag.time.max.ms=15000
> broker.id=0
> listeners=PLAINTEXT://:9092
> log.dirs=/mnt/kafka/kafka
> log.retention.check.interval.ms=30
> log.retention.hours=168
> log.segment.bytes=1073741824
> num.io.threads=20
> num.network.threads=10
> num.partitions=1
> num.recovery.threads.per.data.dir=2
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> zookeeper.connection.timeout.ms=6000
> delete.topic.enable = true
> broker.max_heap_size=10 GiB 
>   
> Any ideas?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3917) Some __consumer_offsets replicas grow way too big

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3917:
---
Labels: reliability  (was: )

> Some __consumer_offsets replicas grow way too big
> -
>
> Key: KAFKA-3917
> URL: https://issues.apache.org/jira/browse/KAFKA-3917
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2
> Environment: Runs with Docker 1.10.1 in a container on 
> Linux 3.13.0-77-generic #121-Ubuntu SMP Wed Jan 20 10:50:42 UTC 2016 x86_64
>Reporter: Maxim Vladimirskiy
>  Labels: reliability
>
> We noticed that some replicas of partitions of the __consumer_offsets topic 
> grow way too big. Looking inside respective folders it became apparent that 
> old segments had not been cleaned up. Please see below example of disk usage 
> data for both affected and not affected partitions:
> Not affected partitions:
> Partition: 0  Leader: 2   Replicas: 2,3,4 Isr: 2,4,3
> 2: 49M
> 3: 49M
> 4: 49M
> Affected partitions:
> Partition: 10 Leader: 2   Replicas: 2,0,1 Isr: 1,2,0
> 0: 86M
> 1: 22G <<< too big!
> 2: 86M
> Partition: 38 Leader: 0   Replicas: 0,4,1 Isr: 1,0,4
> 0: 43M
> 1: 26G <<<  too big!
> 4: 26G <<<  too big!
> As you can see sometimes only one replica is affected, sometimes both 
> replicas are affected.
> When I try to restart a broker that has affected replicas it fails to start 
> with an exception that looks like this:
> [2016-06-28 23:15:20,441] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Corrupt index found, index file 
> (/var/kafka/__consumer_offsets-38/.index) has non-zero 
> size but the last offset is -676703869 and the base offset is 0 
> (kafka.log.LogManager)
> [2016-06-28 23:15:20,442] FATAL [Kafka Server 1], Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Corrupt index found, 
> index file (/var/kafka/__consumer_offsets-38/.index) has 
> non-zero size but the last offset is -676703869 and the base offset is 0
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352)
> at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:184)
> at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:183)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.log.Log.loadSegments(Log.scala:183)
> at kafka.log.Log.(Log.scala:67)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
> at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> After the content of the affected partition is deleted broker starts 
> successfully. 
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3944) After the broker restart, fetchers stopped due to a delayed controlled shutdown message

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3944:
---
Labels: reliability  (was: )

> After the broker restart, fetchers stopped due to a delayed controlled 
> shutdown message
> ---
>
> Key: KAFKA-3944
> URL: https://issues.apache.org/jira/browse/KAFKA-3944
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Maysam Yabandeh
>Priority: Minor
>  Labels: reliability
>
> The symptom is that cluster reports under-replicated blocks and some replicas 
> do not seem to catch up ever. It turns out that the corresponding fetchers in 
> those brokers were stopped shortly after the broker's restart. The broker had 
> stopped the fetcher upon receiving stop-replica request from the controller. 
> The controller had issued those request upon processing controlled shutdown 
> request form the same broker. However those requests were all sent before the 
> broker restart but the controller processed them after. Here is the timeline:
> # broker sends controlled shutdown message to controller
> # the process fails and the broker proceeds with an unclean shutdown
> # the broker is restated
> # the controller processes the perviously sent controlled shutdown messages
> # the controller sends stop replica messages to the broker
> # the broker shuts down the fetchers, while it has no intent to shut down 
> again
> # this leads to under-replicated blocks
> Example from logs:
> {code}
> broker19.com:/var/log/kafka$ grep "Retrying controlled shutdow\|unclean 
> shutdown" server.log.2016-07-07.2 
> 2016-07-07 15:58:10,818 WARN server.KafkaServer: [Kafka Server 19], Retrying 
> controlled shutdown after the previous attempt failed...
> 2016-07-07 15:58:45,887 WARN server.KafkaServer: [Kafka Server 19], Retrying 
> controlled shutdown after the previous attempt failed...
> 2016-07-07 15:59:20,927 WARN server.KafkaServer: [Kafka Server 19], Retrying 
> controlled shutdown after the previous attempt failed...
> 2016-07-07 15:59:20,929 WARN server.KafkaServer: [Kafka Server 19], 
> Proceeding to do an unclean shutdown as all the controlled shutdown attempts 
> failed
> broker19.com:/var/log/kafka$ head -1 server.log.2016-07-07.3
> 2016-07-07 16:00:23,191 INFO server.KafkaConfig: KafkaConfig values: 
> {code}
> {code}
> broker13.com:/var/log/kafka$ grep "Shutting down broker 19" 
> controller.log.2016-07-07.1 
> 2016-07-07 15:57:35,822 INFO controller.KafkaController: [Controller 13]: 
> Shutting down broker 19
> 2016-07-07 16:02:45,526 INFO controller.KafkaController: [Controller 13]: 
> Shutting down broker 19
> 2016-07-07 16:05:42,432 INFO controller.KafkaController: [Controller 13]: 
> Shutting down broker 19
> {code}
> which resulted into many stop replica request to broker 19:
> {code}
> broker13.com:/var/log/kafka$ grep "The stop replica request (delete = false) 
> sent to broker 19 is" controller.log.2016-07-07.1 | tail -1
> 2016-07-07 16:06:02,374 DEBUG controller.ControllerBrokerRequestBatch: The 
> stop replica request (delete = false) sent to broker 19 is 
> [Topic=topic-xyz,Partition=6,Replica=19]
> {code}
> broker 19 processes them AFTER its restart:
> {code}
> broker19.com:/var/log/kafka$ grep "handling stop replica (delete=false) for 
> partition .topic-xzy,3." state-change.log.2016-07-07.2 
> 2016-07-07 16:06:00,154 TRACE change.logger: Broker 19 handling stop replica 
> (delete=false) for partition [topic-xzy,3]
> 2016-07-07 16:06:00,154 TRACE change.logger: Broker 19 finished handling stop 
> replica (delete=false) for partition [topic-xyz,3]
> 2016-07-07 16:06:00,155 TRACE change.logger: Broker 19 handling stop replica 
> (delete=false) for partition [topic-xyz,3]
> 2016-07-07 16:06:00,155 TRACE change.logger: Broker 19 finished handling stop 
> replica (delete=false) for partition [topic-xyz,3]
> {code}
> and removes the fetchers:
> {code}
> broker19.com:/var/log/kafka$ grep "Removed fetcher.*topic-xyz.3" 
> server.log.2016-07-07.3 | tail -2
> 2016-07-07 16:06:00,154 INFO server.ReplicaFetcherManager: 
> [ReplicaFetcherManager on broker 19] Removed fetcher for partitions 
> [topic-xyz,3]
> 2016-07-07 16:06:00,155 INFO server.ReplicaFetcherManager: 
> [ReplicaFetcherManager on broker 19] Removed fetcher for partitions 
> [topic-xyz,3]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3951) kafka.common.KafkaStorageException: I/O exception in append to log

2016-12-05 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15722076#comment-15722076
 ] 

Ismael Juma commented on KAFKA-3951:


Is it possible that your `/tmp` folder is being deleted?

> kafka.common.KafkaStorageException: I/O exception in append to log
> --
>
> Key: KAFKA-3951
> URL: https://issues.apache.org/jira/browse/KAFKA-3951
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.1
>Reporter: wanzi.zhao
> Attachments: server-1.properties, server.properties
>
>
> I have two brokers in the same server using two ports,10.45.33.195:9092 and 
> 10.45.33.195:9093.They use two log directory "log.dirs=/tmp/kafka-logs" and 
> "log.dirs=/tmp/kafka-logs-1".When I shutdown my consumer application(java 
> api)  then change a groupId and restart it,my kafka brokers will stop 
> working, this is the stack trace I get
> [2016-07-11 17:02:47,314] INFO [Group Metadata Manager on Broker 0]: Loading 
> offsets and group metadata from [__consumer_offsets,0] 
> (kafka.coordinator.GroupMetadataManager)
> [2016-07-11 17:02:47,955] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> '__consumer_offsets-38'
> at kafka.log.Log.append(Log.scala:318)
> at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442)
> at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
> at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)
> at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)
> at 
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)
> at 
> kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:228)
> at 
> kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:429)
> at 
> kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:429)
> at scala.Option.foreach(Option.scala:236)
> at 
> kafka.coordinator.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429)
> at 
> kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:280)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /tmp/kafka-logs/__consumer_offsets-38/.index (No such 
> file or directory)
> at java.io.RandomAccessFile.open0(Native Method)
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
> at 
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:277)
> at 
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
> at 
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265)
> at 
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
> at 
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3940) Log should check the return value of dir.mkdirs()

2016-12-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3940:
---
Labels: newbie reliability  (was: newbie)

> Log should check the return value of dir.mkdirs()
> -
>
> Key: KAFKA-3940
> URL: https://issues.apache.org/jira/browse/KAFKA-3940
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Ishita Mandhan
>  Labels: newbie, reliability
>
> In Log.loadSegments(), we call dir.mkdirs() w/o checking the return value and 
> just assume the directory will exist after the call. However, if the 
> directory can't be created (e.g. due to no space), we will hit 
> NullPointerException in the next statement, which will be confusing.
>for(file <- dir.listFiles if file.isFile) {



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2016-12-05 Thread Michael Andre Pearce (IG) (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15721875#comment-15721875
 ] 

Michael Andre Pearce (IG) commented on KAFKA-4477:
--

This occurred again in a prod environment just after 2am on Sat. 

Ive attached the stack trace that was captured before the node was restarted by 
our platform operations team. Looking at the stack trace, there are no 
deadlocks, unlike the JIRA ticket you mentioned.

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce (IG)
>Priority: Critical
>  Labels: reliability
> Attachments: kafka.jstack
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2016-12-05 Thread Michael Andre Pearce (IG) (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Andre Pearce (IG) updated KAFKA-4477:
-
Attachment: kafka.jstack

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce (IG)
>Priority: Critical
>  Labels: reliability
> Attachments: kafka.jstack
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-94: Session Windows

2016-12-05 Thread Damian Guy
Hi Guozhang,

Thanks for the feedback.

1. If we want developers to be able to provide custom SessionStore
implementations then we need this method to be on the public interface. We
are currently allowing this for other operations, so it seems to make
sense, for the sake of consistency, to allow a custom SessionStore. What do
you think?

2. Sure, makes sense. I'll update the KIP.

Thanks,
Damian


On Sat, 3 Dec 2016 at 00:06 Guozhang Wang  wrote:

> Thanks for the KIP Damian, it looks great to me overall. A couple of minor
> comments:
>
> 1. findSessionsToMerge(): does this need to be a public interface? Does
> users ever need to call it from the processor API or even DSL?
>
> 2. SessionMerger: I am wondering if we could make it more general by
> renaming to "Merger" only? This if for future potential work when we have
> alternative aggregation operator implementations where both an aggregator
> V, T -> T and a merger T, T -> T could be provided by users.
>
>
> Guozhang
>
>
> On Wed, Nov 30, 2016 at 2:42 AM, Damian Guy  wrote:
>
> > Thanks Matthias.
> >
> > 1) Yes good suggestion will update.
> > 2) As it is consistent with Aggregator and a developer may want to use
> the
> > key. So why not?
> > 3) Thanks. I'll update the KIP
> >
> > Cheers,
> > Damian
> >
> > On Tue, 29 Nov 2016 at 23:47 Matthias J. Sax 
> > wrote:
> >
> > > Very nice KIP!
> > >
> > >
> > > Couple of comments:
> > >
> > > 1) Current window API is as follows:
> > >
> > > >
> > > JoinWindows.of(timeDifference).before(timeDifference).after(
> > timeDifference)
> > > > TimeWindows.of(size).advanceBy(interval)
> > > > UnlimitedWindow.of().startOn(start)
> > >
> > > To align with this scheme, I think it would be better to use the
> > > following API for SessionWindows
> > >
> > > > SessionWindows.with(inactivityGap)
> > >
> > >
> > > 2) I am wondering, why SessionMerger does need the key?
> > >
> > > 3) You KIP API for SessionWindows and you PR does not align. There are
> > > some getters in you code that are not part of the KIP (not sure how
> > > important this is)
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 11/24/16 7:59 AM, Damian Guy wrote:
> > > > Hi all,
> > > >
> > > > I would like to start the discussion on KIP-94:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 94+Session+Windows
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-05 Thread Michael Pearce
Hi James,

Yes exactly what we do. Alas as it's bespoke to our internal wrapper ATM so we 
cannot share this. This is the eco system argument that's with having native 
headers then we would be able to share these kind of things.

Cheers
Mike

Sent using OWA for iPhone

From: James Cheng 
Sent: Monday, December 5, 2016 8:50:30 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

> On Dec 2, 2016, at 4:57 PM, Michael Pearce  wrote:
>
> Hi Jun.
>
> RE Mirroring,
>
> [...]
>
> Lastly around mirroring we have a partionKey field, as the key used for 
> portioning logic != compaction key all the time but we want to preserve it 
> for when we mirror so that if source cluster partition count != destination 
> cluster partition count we can honour the same partitioning logic.
>
>

Michael,

Sorry to be off topic about the KIP, but this is genius. I'm totally stealing 
this idea.

We currently require partition counts to be the same on source and destination 
cluster and do 1-to-1 mapping of partitions (i.e. partition 1 goes to partition 
1, partition 2 goes to partition 2, etc) because otherwise mirrormaker would 
partition based off of the compaction key. Explicitly including the partition 
key would let us work around that.

Does that mean you have a custom partitioner in mirrormaker that uses the 
partitionKey field? Is it implemented inside a MirrorMakerMessageHandler?

Thanks!
-James

The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-05 Thread James Cheng

> On Dec 2, 2016, at 4:57 PM, Michael Pearce  wrote:
> 
> Hi Jun.
> 
> RE Mirroring,
> 
> [...]
> 
> Lastly around mirroring we have a partionKey field, as the key used for 
> portioning logic != compaction key all the time but we want to preserve it 
> for when we mirror so that if source cluster partition count != destination 
> cluster partition count we can honour the same partitioning logic.
> 
> 

Michael,

Sorry to be off topic about the KIP, but this is genius. I'm totally stealing 
this idea.

We currently require partition counts to be the same on source and destination 
cluster and do 1-to-1 mapping of partitions (i.e. partition 1 goes to partition 
1, partition 2 goes to partition 2, etc) because otherwise mirrormaker would 
partition based off of the compaction key. Explicitly including the partition 
key would let us work around that.

Does that mean you have a custom partitioner in mirrormaker that uses the 
partitionKey field? Is it implemented inside a MirrorMakerMessageHandler?

Thanks!
-James



[GitHub] kafka pull request #2211: HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.

2016-12-05 Thread kkonstantine
GitHub user kkonstantine opened a pull request:

https://github.com/apache/kafka/pull/2211

HOTFIX: Fix bug in readToLogEnd in KafkaBasedLog.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kkonstantine/kafka 
HOTFIX-Correctly-read-to-end-of-offsets-log-in-Connect-KafkaBasedLog

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2211


commit 84d7624441ebfdfe29af6491fbb10cad915e6d7e
Author: Konstantine Karantasis 
Date:   2016-12-05T08:05:29Z

HOTFIX Fix bug in readToLogEnd in KafkaBasedLog.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---