[jira] [Commented] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-04 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17008004#comment-17008004
 ] 

Jonathan Santilli commented on KAFKA-9312:
--

There is a [pending PR|[https://github.com/apache/kafka/pull/7877]] to address 
this issue [~guozhang], [~lucasbradstreet] has been reviewing it. Another pair 
of eyes will be great to have on that PR.

Thanks in advance.

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007664#comment-17007664
 ] 

Jonathan Santilli commented on KAFKA-9312:
--

Checking a little bit further seems like yes, it finishes but if the batch was 
splitted, the _future_ gets chained:

 
{code:java}
@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
this.result.await(); // If Finish here
if (nextRecordMetadata != null)
return nextRecordMetadata.get();
return valueOrError();
}

...

/**
 * This method is used when we have to split a large batch in smaller ones. A 
chained metadata will allow the
 * future that has already returned to the users to wait on the newly created 
split batches even after the
 * old big batch has been deemed as done.
 */
void chain(FutureRecordMetadata futureRecordMetadata) {
if (nextRecordMetadata == null)
nextRecordMetadata = futureRecordMetadata;
else
nextRecordMetadata.chain(futureRecordMetadata);
}

And ProducerBatch#tryAppendForSplit calls thunk.future.chain(future);{code}
So, I think is ok, I will create a test case to verify it.

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 5:39 PM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send() -> FutureRecordMetadata#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send() -> ProduceRequestResult#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:19 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send() -> ProduceRequestResult#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:18 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to 
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to KafkaProducer#send()#get() will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:18 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to:
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to 
{code:java}
KafkaProducer#send()#get(){code}
will also wait until the request gets completed, but it does not warranty the 
record has been sent since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007335#comment-17007335
 ] 

Jonathan Santilli edited comment on KAFKA-9312 at 1/3/20 9:18 AM:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to KafkaProducer#send()#get() will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 


was (Author: pachilo):
After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to`KafkaProducer#send()#get()` will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|[https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2020-01-03 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007335#comment-17007335
 ] 

Jonathan Santilli commented on KAFKA-9312:
--

After interacting with the code to solve the issue addressed by this task, I 
have noticed that the call to`KafkaProducer#send()#get()` will also wait until 
the request gets completed, but it does not warranty the record has been sent 
since it could get splitted as well.

[According to the 
documentation:|[https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]]

 

 ??Invoking 
[{{get()}}|https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html?is-external=true#get--]
 on this future will block until the associated request completes and then 
return the metadata for the record or throw any exception that occurred while 
sending the record.??

 

After reading that, I will interpret that if the `get()` method returns 
successfully, it means the record was sent successfully.

 

What do you think [~lucasbradstreet]?

 

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9312) KafkaProducer flush behavior does not guarantee completed sends under record batch splitting

2019-12-25 Thread Jonathan Santilli (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli reassigned KAFKA-9312:


Assignee: Jonathan Santilli

> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> 
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>Reporter: Lucas Bradstreet
>Assignee: Jonathan Santilli
>Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8761) Flaky Test AdminClientIntegrationTest.testIncrementalAlterConfigsForLog4jLogLevels

2019-11-13 Thread Jonathan Santilli (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973247#comment-16973247
 ] 

Jonathan Santilli commented on KAFKA-8761:
--

Hello [~ableegoldman] and [~mjsax]

I have tried to check the reports:

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6740/testReport/junit/kafka.api/AdminClientIntegrationTest/testIncrementalAlterConfigsForLog4jLogLevels/]

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6834/testReport/junit/kafka.api/AdminClientIntegrationTest/testIncrementalAlterConfigsForLog4jLogLevels/]

But it shows a 404, it is because I do not have permissions to see that content 
or actually the page of the test report is gone?

> Flaky Test 
> AdminClientIntegrationTest.testIncrementalAlterConfigsForLog4jLogLevels
> --
>
> Key: KAFKA-8761
> URL: https://issues.apache.org/jira/browse/KAFKA-8761
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core, unit tests
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6740/testReport/junit/kafka.api/AdminClientIntegrationTest/testIncrementalAlterConfigsForLog4jLogLevels/]
>  
> h3. Error Message
> org.junit.ComparisonFailure: expected:<[FATAL]> but was:<[INFO]>
> h3. Stacktrace
> org.junit.ComparisonFailure: expected:<[FATAL]> but was:<[INFO]> at 
> org.junit.Assert.assertEquals(Assert.java:117) at 
> org.junit.Assert.assertEquals(Assert.java:146) at 
> kafka.api.AdminClientIntegrationTest.testIncrementalAlterConfigsForLog4jLogLevels(AdminClientIntegrationTest.scala:1850)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.lang.Thread.run(Thread.java:834)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers

2019-07-16 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16886441#comment-16886441
 ] 

Jonathan Santilli commented on KAFKA-8412:
--

[~guozhang] also mentioned that we may need to refactor that part of the code  
[in a comment on the KAFKA-7678 
|https://issues.apache.org/jira/browse/KAFKA-7678?focusedCommentId=16715220=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16715220],
 wich is aligned with your proposal [~mjsax].
In your opinion [~mjsax], should we address that or first we could add a null 
check before calling the Producer#flush method? to then do the refactor of 
course.
 

> Still a nullpointer exception thrown on shutdown while flushing before 
> closing producers
> 
>
> Key: KAFKA-8412
> URL: https://issues.apache.org/jira/browse/KAFKA-8412
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Sebastiaan
>Assignee: Matthias J. Sax
>Priority: Minor
>
> I found a closed issue and replied there but decided to open one myself 
> because although they're related they're slightly different. The original 
> issue is at https://issues.apache.org/jira/browse/KAFKA-7678
> The fix there has been to implement a null check around closing a producer 
> because in some cases the producer is already null there (has been closed 
> already)
> In version 2.1.1 we are getting a very similar exception, but in the 'flush' 
> method that is called pre-close. This is in the log:
> {code:java}
> message: stream-thread 
> [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
> while closing StreamTask 1_26 due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> Followed by:
>  
> {code:java}
> message: task [1_26] Could not close task due to the following error:
> logger_name: org.apache.kafka.streams.processor.internals.StreamTask
> java.lang.NullPointerException: null
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>     at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
> If I look at the source code at this point, I see a nice null check in the 
> close method, but not in the flush method that is called just before that:
> {code:java}
> public void flush() {
>     this.log.debug("Flushing producer");
>     this.producer.flush();
>     this.checkForException();
> }
> public void close() {
>     this.log.debug("Closing producer");
>     if (this.producer != null) {
>     this.producer.close();
>     this.producer = null;
>     }
>     this.checkForException();
> }{code}
> Seems to my (ignorant) eye that the flush method should also be wrapped in a 
> null check in the same way as has been done for close.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8615) Change to track partition time breaks TimestampExtractor

2019-07-05 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli updated KAFKA-8615:
-
Description: 
>From the users mailing list, *UPDATED* by Jonathan Santilli:
{noformat}

Am testing the new version 2.3 for Kafka Streams specifically. I have noticed 
that now, the implementation of the method extract from the
interface org.apache.kafka.streams.processor.TimestampExtractor:

public class OwnTimeExtractor implements TimestampExtractor {

    @Override
    public long extract(final ConsumerRecord record, final long 
previousTimestamp) {
        // previousTimestamp is always == -1. For version 2.3
    }
}

Previous version 2.2.1 was returning the correct value for the record partition.
{noformat}

  was:
>From the users mailing list
{noformat}
am testing the new version 2.3 for Kafka Streams specifically. I have
noticed that now, the implementation of the method extract from the
interface org.apache.kafka.streams.processor.TimestampExtractor

*public* *long* extract(ConsumerRecord record, *long*
previousTimestamp)


is always returning -1 as value.


Previous version 2.2.1 was returning the correct value for the record
partition.
{noformat}


> Change to track partition time breaks TimestampExtractor
> 
>
> Key: KAFKA-8615
> URL: https://issues.apache.org/jira/browse/KAFKA-8615
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> From the users mailing list, *UPDATED* by Jonathan Santilli:
> {noformat}
> Am testing the new version 2.3 for Kafka Streams specifically. I have noticed 
> that now, the implementation of the method extract from the
> interface org.apache.kafka.streams.processor.TimestampExtractor:
> public class OwnTimeExtractor implements TimestampExtractor {
>     @Override
>     public long extract(final ConsumerRecord record, final 
> long previousTimestamp) {
>         // previousTimestamp is always == -1. For version 2.3
>     }
> }
> Previous version 2.2.1 was returning the correct value for the record 
> partition.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813657#comment-16813657
 ] 

Jonathan Santilli edited comment on KAFKA-7656 at 4/9/19 5:38 PM:
--

Currently 2.2, we have updated from 2.0 recently.

This error started showing up since we update to 2.2 version. [~jagsancio]


was (Author: pachilo):
Currently 2.2, we have updated from 2.0 recently.

This error started showing up since we update to 2.2 version.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813657#comment-16813657
 ] 

Jonathan Santilli commented on KAFKA-7656:
--

Currently 2.2, we have updated from 2.0 recently.

This error started showing up since we update to 2.2 version.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813140#comment-16813140
 ] 

Jonathan Santilli edited comment on KAFKA-7656 at 4/9/19 8:53 AM:
--

Hello [~jagsancio], this is the only log I see (on the leader):

 
{code:java}
 [2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing 
fetch with max size -2147483648 from consumer on partition 
__consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) 
java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/opt/kafka/logdata/__consumer_offsets-14/.log, start=0, 
end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at 
kafka.log.Log.$anonfun$read$2(Log.scala:1245) at 
kafka.log.Log.maybeHandleIOException(Log.scala:2013) at 
kafka.log.Log.read(Log.scala:1200) at 
kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at 
kafka.cluster.Partition.readRecords(Partition.scala:781) at 
kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at 
kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at 
kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at 
kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at 
kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:109) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748){code}
 

Is the only partition with the problem so far.


was (Author: pachilo):
Hello [~jagsancio], this is the only log I see (on the leader):

 
{noformat}
[2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing 
fetch with max size -2147483648 from consumer on partition 
__consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) 
java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/opt/kafka/logdata/__consumer_offsets-14/.log, start=0, 
end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at 
kafka.log.Log.$anonfun$read$2(Log.scala:1245) at 
kafka.log.Log.maybeHandleIOException(Log.scala:2013) at 
kafka.log.Log.read(Log.scala:1200) at 
kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at 
kafka.cluster.Partition.readRecords(Partition.scala:781) at 
kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at 
kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at 
kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at 
kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at 
kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:109) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748)
{noformat}
 

Is the only partition with the problem so far.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max 

[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813140#comment-16813140
 ] 

Jonathan Santilli commented on KAFKA-7656:
--

Hello [~jagsancio], this is the only log I see (on the leader):

 
{noformat}
[2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing 
fetch with max size -2147483648 from consumer on partition 
__consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, 
currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) 
java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/opt/kafka/logdata/__consumer_offsets-14/.log, start=0, 
end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at 
kafka.log.Log.$anonfun$read$2(Log.scala:1245) at 
kafka.log.Log.maybeHandleIOException(Log.scala:2013) at 
kafka.log.Log.read(Log.scala:1200) at 
kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at 
kafka.cluster.Partition.readRecords(Partition.scala:781) at 
kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at 
kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at 
kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at 
kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at 
kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:109) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748)
{noformat}
 

Is the only partition with the problem so far.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-04-08 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812809#comment-16812809
 ] 

Jonathan Santilli commented on KAFKA-7656:
--

I facing the same issue, but just after updating from version 2.0 to version 2.2

I can confirm that is the replica fetcher.

I have stopped all producer and consumers and stopped the brokers one by one 
until the message stopped. After bringing back all brokers, the error is still 
showing up.

> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6582) Partitions get underreplicated, with a single ISR, and doesn't recover. Other brokers do not take over and we need to manually restart the broker.

2019-04-05 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16810713#comment-16810713
 ] 

Jonathan Santilli commented on KAFKA-6582:
--

Hello [~pogo] we are facing the same issue in version *2.0.0*

> Partitions get underreplicated, with a single ISR, and doesn't recover. Other 
> brokers do not take over and we need to manually restart the broker.
> --
>
> Key: KAFKA-6582
> URL: https://issues.apache.org/jira/browse/KAFKA-6582
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.0.0
> Environment: Ubuntu 16.04
> Linux kafka04 4.4.0-109-generic #132-Ubuntu SMP Tue Jan 9 19:52:39 UTC 2018 
> x86_64 x86_64 x86_64 GNU/Linux
> java version "9.0.1"
> Java(TM) SE Runtime Environment (build 9.0.1+11)
> Java HotSpot(TM) 64-Bit Server VM (build 9.0.1+11, mixed mode) 
> but also tried with the latest JVM 8 before with the same result.
>Reporter: Jurriaan Pruis
>Priority: Major
> Attachments: Screenshot 2019-01-18 at 13.08.17.png, Screenshot 
> 2019-01-18 at 13.16.59.png
>
>
> Partitions get underreplicated, with a single ISR, and doesn't recover. Other 
> brokers do not take over and we need to manually restart the 'single ISR' 
> broker (if you describe the partitions of replicated topic it is clear that 
> some partitions are only in sync on this broker).
> This bug resembles KAFKA-4477 a lot, but since that issue is marked as 
> resolved this is probably something else but similar.
> We have the same issue (or at least it looks pretty similar) on Kafka 1.0. 
> Since upgrading to Kafka 1.0 in November 2017 we've had these issues (we've 
> upgraded from Kafka 0.10.2.1).
> This happens almost every 24-48 hours on a random broker. This is why we 
> currently have a cronjob which restarts every broker every 24 hours. 
> During this issue the ISR shows the following server log: 
> {code:java}
> [2018-02-20 12:02:08,342] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.132.0.32:9092-10.14.148.20:56352-96708 (kafka.network.Processor)
> [2018-02-20 12:02:08,364] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.132.0.32:9092-10.14.150.25:54412-96715 (kafka.network.Processor)
> [2018-02-20 12:02:08,349] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.132.0.32:9092-10.14.149.18:35182-96705 (kafka.network.Processor)
> [2018-02-20 12:02:08,379] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.132.0.32:9092-10.14.150.25:54456-96717 (kafka.network.Processor)
> [2018-02-20 12:02:08,448] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.132.0.32:9092-10.14.159.20:36388-96720 (kafka.network.Processor)
> [2018-02-20 12:02:08,683] WARN Attempting to send response via channel for 
> which there is no open connection, connection id 
> 10.132.0.32:9092-10.14.157.110:41922-96740 (kafka.network.Processor)
> {code}
> Also on the ISR broker, the controller log shows this:
> {code:java}
> [2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-3-send-thread]: 
> Controller 3 connected to 10.132.0.32:9092 (id: 3 rack: null) for sending 
> state change requests (kafka.controller.RequestSendThread)
> [2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-0-send-thread]: 
> Controller 3 connected to 10.132.0.10:9092 (id: 0 rack: null) for sending 
> state change requests (kafka.controller.RequestSendThread)
> [2018-02-20 12:02:14,928] INFO [Controller-3-to-broker-1-send-thread]: 
> Controller 3 connected to 10.132.0.12:9092 (id: 1 rack: null) for sending 
> state change requests (kafka.controller.RequestSendThread){code}
> And the non-ISR brokers show these kind of errors:
>  
> {code:java}
> 2018-02-20 12:02:29,204] WARN [ReplicaFetcher replicaId=1, leaderId=3, 
> fetcherId=0] Error in fetch to broker 3, request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={..}, isolationLevel=READ_UNCOMMITTED) 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the response was 
> read
>  at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95)
>  at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96)
>  at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:205)
>  at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:41)
>  at 
> 

[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-12-10 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714439#comment-16714439
 ] 

Jonathan Santilli commented on KAFKA-7678:
--

Hello [~guozhang] I can confirm the issue about calling the *.close()* method 
out of a null have been solved.

However, I do not know the reason why that was happening (I did not dig enough 
into the code to understand the reason).

Maybe we can create another Jira to explore it, what do you think?

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1, 2.0.1, 2.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
> Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-12-03 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707879#comment-16707879
 ] 

Jonathan Santilli commented on KAFKA-7678:
--

PR submitted [~mjsax]

[https://github.com/apache/kafka/pull/5993]

 

Cheers!

--

Jonathan

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
>  Labels: bug
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701647#comment-16701647
 ] 

Jonathan Santilli commented on KAFKA-7678:
--

Yes [~mjsax] , is like that, still finished successfully and gracefully.

Now that we can say is a bug, I have changed to *minor* and assigned to myself.

Hope to send the PR soon.

 

Cheers!

--

Jonathan

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli reassigned KAFKA-7678:


Assignee: Jonathan Santilli

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-28 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli updated KAFKA-7678:
-
Priority: Minor  (was: Major)

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Priority: Minor
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-11-27 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli resolved KAFKA-7508.
--
Resolution: Not A Problem

Closed since was requested by the reported.

Sathish agree to try JVM recommended parameters

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-27 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli updated KAFKA-7678:
-
Description: 
This occurs when the group is rebalancing in a Kafka Stream application and the 
process (the Kafka Stream application) receives a *SIGTERM* to stop it 
gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
Failed to close producer due to the following error:
java.lang.NullPointerException
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method 
`*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
class is expecting any kind of error to happen since is catching `*Throwable*`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `*null*` possibility at 
`*RecordCollectorImpl*.*java*` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
producer.close();
producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 

  was:
This occurs when the group is rebalancing in a Kafka Stream application and the 
process (the Kafka Stream application) receives a SIGTERM to stop it gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
Failed to close producer due to the following error:
java.lang.NullPointerException
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method 
`maybeAbortTransactionAndCloseRecordCollector` in the `StreamTask.java` class 
is expecting any kind of error to happen since is catching `Throwable`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `null` possibility at 
`RecordCollectorImpl.java` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
producer.close();
producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 


> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  

[jira] [Created] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-27 Thread Jonathan Santilli (JIRA)
Jonathan Santilli created KAFKA-7678:


 Summary: Failed to close producer due to 
java.lang.NullPointerException
 Key: KAFKA-7678
 URL: https://issues.apache.org/jira/browse/KAFKA-7678
 Project: Kafka
  Issue Type: Bug
Reporter: Jonathan Santilli


This occurs when the group is rebalancing in a Kafka Stream application and the 
process (the Kafka Stream application) receives a SIGTERM to stop it gracefully.

 

 
{noformat}
ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
Failed to close producer due to the following error:
java.lang.NullPointerException
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
 at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
 

 

Although I have checked the code and the method 
`maybeAbortTransactionAndCloseRecordCollector` in the `StreamTask.java` class 
is expecting any kind of error to happen since is catching `Throwable`.

 

 

 
{noformat}
try {
 recordCollector.close();
} catch (final Throwable e) {
 log.error("Failed to close producer due to the following error:", e);
} finally {
 producer = null;
}{noformat}
 

Should we consider this a bug?

In my opinion, we could check for the `null` possibility at 
`RecordCollectorImpl.java` class:
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 producer.close();
 producer = null;
 checkForException();
}{noformat}
 

Change it for:

 
{noformat}
@Override
public void close() {
 log.debug("Closing producer");
 if ( Objects.nonNull(producer) ) {
producer.close();
producer = null;
 }
 checkForException();
}{noformat}
 

How does that sound?

 

Kafka Brokers running 2.0.0

Kafka Stream and client 2.1.0

OpenJDK 8

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-11-27 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700187#comment-16700187
 ] 

Jonathan Santilli commented on KAFKA-7508:
--

Kafka Brokers offer good metrics from JMX, you can use your favorite JMX client 
to connect to the Brokers JMX port.

Linkedin has good tools to monitor [https://github.com/linkedin/streaming]

Kafka Brokers is a process that runs in your server, you can monitor it as a 
normal process (just to check is the Broker is up or not)

But is not just that of course, you need to monitor the Broker from the 
application point of view as well and JMX metrics will help you with that.

 

BTW, if you consider this issue has been solved, let us know to close it, 
please.

Cheers!

--

Jonathan

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-11-26 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699269#comment-16699269
 ] 

Jonathan Santilli commented on KAFKA-7508:
--

Hello [~sathish051] since you are using Java 8, have you checked 
[http://kafka.apache.org/documentation/#java] ?

The documentation offers you a good starting point with different values for 
your memory params configuration.

 

Cheers!

--

Jonathan

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-11-20 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693860#comment-16693860
 ] 

Jonathan Santilli commented on KAFKA-7508:
--

Hello Sathish Yanamala the logs show a clear java.lang.OutOfMemoryError: Java 
heap space.

Can you provide pease the whole java command executed to run the Broker?

Also, it is possible that the machine does not have that 10GB you are trying to 
claim? how many processes are you running on that machine that could be 
consuming resources not enabling the Broker to have enough capacity?

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-11-19 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692211#comment-16692211
 ] 

Jonathan Santilli commented on KAFKA-4277:
--

Hello [~birger],  KAFKA-7165 will solve the NODEEXISTS issue in case the 
session is regenerated (expired) and the Broker was the author of creating the 
ephemeral znode into zookeeper.

If I understood correctly, I think is what happened according to part of the 
description:

 
{noformat}
...
[2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
...
{noformat}
 

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6584) Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure

2018-11-10 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16682503#comment-16682503
 ] 

Jonathan Santilli commented on KAFKA-6584:
--

Good call [~ijuma], I have updated the description accordingly.

> Session expiration concurrent with ZooKeeper leadership failover may lead to 
> broker registration failure
> 
>
> Key: KAFKA-6584
> URL: https://issues.apache.org/jira/browse/KAFKA-6584
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 1.0.0
>Reporter: Chris Thunes
>Priority: Major
>
> It seems that an edge case exists which can lead to sessions "un-expiring" 
> during a ZooKeeper leadership failover. Additional details can be found in 
> ZOOKEEPER-2985.
> This leads to a NODEXISTS error when attempting to re-create the ephemeral 
> brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this 
> issue on each node within a 3-node Kafka cluster running 1.0.0. All three 
> nodes continued running (producers and consumers appeared unaffected), but 
> none of the nodes were considered online and partition leadership could be 
> not re-assigned.
> I took a quick look at trunk and I believe the issue is still present, but 
> has moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
> error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
>  when it finds that the broker/ids/\{id} node exists, but belongs to the old 
> (believed expired) session.
>  
> *NOTE:* KAFKA-7165 introduce a workaround to cope with the case described 
> here. We decided to keep this issue open to track the ZOOKEEPER-2985 status.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6584) Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure

2018-11-10 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli updated KAFKA-6584:
-
Description: 
It seems that an edge case exists which can lead to sessions "un-expiring" 
during a ZooKeeper leadership failover. Additional details can be found in 
ZOOKEEPER-2985.

This leads to a NODEXISTS error when attempting to re-create the ephemeral 
brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this issue 
on each node within a 3-node Kafka cluster running 1.0.0. All three nodes 
continued running (producers and consumers appeared unaffected), but none of 
the nodes were considered online and partition leadership could be not 
re-assigned.

I took a quick look at trunk and I believe the issue is still present, but has 
moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
 when it finds that the broker/ids/\{id} node exists, but belongs to the old 
(believed expired) session.

 

*NOTE:* KAFKA-7165 introduce a workaround to cope with the case described here. 
We decided to keep this issue open to track the ZOOKEEPER-2985 status.

  was:
It seems that an edge case exists which can lead to sessions "un-expiring" 
during a ZooKeeper leadership failover. Additional details can be found in 
ZOOKEEPER-2985.

This leads to a NODEXISTS error when attempting to re-create the ephemeral 
brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this issue 
on each node within a 3-node Kafka cluster running 1.0.0. All three nodes 
continued running (producers and consumers appeared unaffected), but none of 
the nodes were considered online and partition leadership could be not 
re-assigned.

I took a quick look at trunk and I believe the issue is still present, but has 
moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
 when it finds that the broker/ids/\{id} node exists, but belongs to the old 
(believed expired) session.


> Session expiration concurrent with ZooKeeper leadership failover may lead to 
> broker registration failure
> 
>
> Key: KAFKA-6584
> URL: https://issues.apache.org/jira/browse/KAFKA-6584
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 1.0.0
>Reporter: Chris Thunes
>Priority: Major
>
> It seems that an edge case exists which can lead to sessions "un-expiring" 
> during a ZooKeeper leadership failover. Additional details can be found in 
> ZOOKEEPER-2985.
> This leads to a NODEXISTS error when attempting to re-create the ephemeral 
> brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this 
> issue on each node within a 3-node Kafka cluster running 1.0.0. All three 
> nodes continued running (producers and consumers appeared unaffected), but 
> none of the nodes were considered online and partition leadership could be 
> not re-assigned.
> I took a quick look at trunk and I believe the issue is still present, but 
> has moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
> error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
>  when it finds that the broker/ids/\{id} node exists, but belongs to the old 
> (believed expired) session.
>  
> *NOTE:* KAFKA-7165 introduce a workaround to cope with the case described 
> here. We decided to keep this issue open to track the ZOOKEEPER-2985 status.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6584) Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure

2018-11-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681912#comment-16681912
 ] 

Jonathan Santilli commented on KAFKA-6584:
--

Done [~omkreddy], thanks for the confirmation.

> Session expiration concurrent with ZooKeeper leadership failover may lead to 
> broker registration failure
> 
>
> Key: KAFKA-6584
> URL: https://issues.apache.org/jira/browse/KAFKA-6584
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 1.0.0
>Reporter: Chris Thunes
>Priority: Major
>
> It seems that an edge case exists which can lead to sessions "un-expiring" 
> during a ZooKeeper leadership failover. Additional details can be found in 
> ZOOKEEPER-2985.
> This leads to a NODEXISTS error when attempting to re-create the ephemeral 
> brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this 
> issue on each node within a 3-node Kafka cluster running 1.0.0. All three 
> nodes continued running (producers and consumers appeared unaffected), but 
> none of the nodes were considered online and partition leadership could be 
> not re-assigned.
> I took a quick look at trunk and I believe the issue is still present, but 
> has moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
> error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
>  when it finds that the broker/ids/\{id} node exists, but belongs to the old 
> (believed expired) session.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4277) creating ephemeral node already exist

2018-11-09 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli resolved KAFKA-4277.
--
Resolution: Duplicate

This is getting closed since KAFKA-7165 have been solved

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5971) Broker keeps running even though not registered in ZK

2018-11-09 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli resolved KAFKA-5971.
--
Resolution: Duplicate

This is getting closed since KAFKA-7165 have been solved

> Broker keeps running even though not registered in ZK
> -
>
> Key: KAFKA-5971
> URL: https://issues.apache.org/jira/browse/KAFKA-5971
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Igor Canadi
>Priority: Major
>
> We had a curious situation happen to our kafka cluster running version 
> 0.11.0.0. One of the brokers was happily running, even though its ID was not 
> registered in Zookeeper under `/brokers/ids`.
> Based on the logs, it appears that the broker restarted very quickly and 
> there was a node under `/brokers/ids/2` still present from the previous run. 
> However, in that case I'd expect the broker to try again or just exit. In 
> reality it continued running without any errors in the logs.
> Here's the relevant part of the logs: 
> ```
> [2017-09-06 23:50:26,095] INFO Opening socket connection to server 
> zookeeper.kafka.svc.cluster.local/100.66.99.54:2181. Will not attempt to 
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,096] INFO Socket connection established to 
> zookeeper.kafka.svc.cluster.local/100.66.99.54:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,099] WARN Unable to reconnect to ZooKeeper service, 
> session 0x15e4477405f1d40 has expired (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,099] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-09-06 23:50:26,099] INFO Unable to reconnect to ZooKeeper service, 
> session 0x15e4477405f1d40 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,099] INFO Initiating client connection, 
> connectString=zookeeper:2181 sessionTimeout=6000 
> watcher=org.I0Itec.zkclient.ZkClient@2cb4893b (org.apache.zookeeper.ZooKeeper)
> [2017-09-06 23:50:26,102] INFO EventThread shut down for session: 
> 0x15e4477405f1d40 (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,107] INFO Opening socket connection to server 
> zookeeper.kafka.svc.cluster.local/100.66.99.54:2181. Will not attempt to 
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,108] INFO Socket connection established to 
> zookeeper.kafka.svc.cluster.local/100.66.99.54:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,111] INFO Session establishment complete on server 
> zookeeper.kafka.svc.cluster.local/100.66.99.54:2181, sessionid = 
> 0x15e599a1a3e0013, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2017-09-06 23:50:26,112] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-09-06 23:50:26,114] INFO re-registering broker info in ZK for broker 2 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2017-09-06 23:50:26,115] INFO Creating /brokers/ids/2 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-09-06 23:50:26,123] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-09-06 23:50:26,124] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@699f40a0] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/2. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:417)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:403)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:736)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:72)
> [2017-09-06 23:51:42,257] INFO [Group Metadata Manager on Broker 2]: Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)
> [2017-09-07 00:00:06,198] INFO Unable to read additional data from server 
> sessionid 0x15e599a1a3e0013, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2017-09-07 00:00:06,354] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-09-07 00:00:07,675] INFO Opening socket connection to server 
> 

[jira] [Commented] (KAFKA-6584) Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure

2018-11-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681886#comment-16681886
 ] 

Jonathan Santilli commented on KAFKA-6584:
--

Yes, good call [~omkreddy], so will be closing as duplicated of -KAFKA-7165,  
this KAFKA-5971 and this KAFKA-4277?

> Session expiration concurrent with ZooKeeper leadership failover may lead to 
> broker registration failure
> 
>
> Key: KAFKA-6584
> URL: https://issues.apache.org/jira/browse/KAFKA-6584
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 1.0.0
>Reporter: Chris Thunes
>Priority: Major
>
> It seems that an edge case exists which can lead to sessions "un-expiring" 
> during a ZooKeeper leadership failover. Additional details can be found in 
> ZOOKEEPER-2985.
> This leads to a NODEXISTS error when attempting to re-create the ephemeral 
> brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this 
> issue on each node within a 3-node Kafka cluster running 1.0.0. All three 
> nodes continued running (producers and consumers appeared unaffected), but 
> none of the nodes were considered online and partition leadership could be 
> not re-assigned.
> I took a quick look at trunk and I believe the issue is still present, but 
> has moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
> error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
>  when it finds that the broker/ids/\{id} node exists, but belongs to the old 
> (believed expired) session.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6584) Session expiration concurrent with ZooKeeper leadership failover may lead to broker registration failure

2018-11-09 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681861#comment-16681861
 ] 

Jonathan Santilli commented on KAFKA-6584:
--

[~omkreddy] [~junrao] since KAFKA-7165 has been resolved, this could consider 
resolved as well? and maybe the others as well?

> Session expiration concurrent with ZooKeeper leadership failover may lead to 
> broker registration failure
> 
>
> Key: KAFKA-6584
> URL: https://issues.apache.org/jira/browse/KAFKA-6584
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 1.0.0
>Reporter: Chris Thunes
>Priority: Major
>
> It seems that an edge case exists which can lead to sessions "un-expiring" 
> during a ZooKeeper leadership failover. Additional details can be found in 
> ZOOKEEPER-2985.
> This leads to a NODEXISTS error when attempting to re-create the ephemeral 
> brokers/ids/\{id} node in ZkUtils.registerBrokerInZk. We experienced this 
> issue on each node within a 3-node Kafka cluster running 1.0.0. All three 
> nodes continued running (producers and consumers appeared unaffected), but 
> none of the nodes were considered online and partition leadership could be 
> not re-assigned.
> I took a quick look at trunk and I believe the issue is still present, but 
> has moved into KafkaZkClient.checkedEphemeralCreate which will [raise an 
> error|https://github.com/apache/kafka/blob/90e0bbe/core/src/main/scala/kafka/zk/KafkaZkClient.scala#L1512]
>  when it finds that the broker/ids/\{id} node exists, but belongs to the old 
> (believed expired) session.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-10-30 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668672#comment-16668672
 ] 

Jonathan Santilli edited comment on KAFKA-7165 at 10/30/18 12:59 PM:
-

Thanks for your reply [~junrao] , that was the only time we had that issue of 
the _org.apache.zookeeper.KeeperException$SessionExpiredException_ so far.

Now we are in version 2.0 of Kafka and from time to time suffering the 
*NODEEXISTS* issue.

Maybe the errors were related but difficult to ensure that, hopefully with the 
fix, we can get rid of the *NODEEXISTS* error.

 

Cheers!


was (Author: pachilo):
Thanks for your reply [~junrao] , that was the only time we had that issue of 
the _org.apache.zookeeper.KeeperException$SessionExpiredException_ so far.

Now we are in version 2.0 of Kafka and from time to time suffering the 
*NODEEXISTS* issue.

Maybe the errors were related but difficult to ensure that, hopefully with the 
fix, we can get rid of the *NODEEXISTS* error*.*

 

Cheers!

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-10-30 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668672#comment-16668672
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Thanks for your reply [~junrao] , that was the only time we had that issue of 
the _org.apache.zookeeper.KeeperException$SessionExpiredException_ so far.

Now we are in version 2.0 of Kafka and from time to time suffering the 
*NODEEXISTS* issue.

Maybe the errors were related but difficult to ensure that, hopefully with the 
fix, we can get rid of the *NODEEXISTS* error*.*

 

Cheers!

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> 

[jira] [Comment Edited] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-10-24 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661931#comment-16661931
 ] 

Jonathan Santilli edited comment on KAFKA-7165 at 10/24/18 8:31 AM:


Hello Jun Rao, I would like to continue working on this bug, hope you can have 
some time to elaborate a little bit more your proposal from 
[https://github.com/apache/kafka/pull/5575#issuecomment-416419017:]

 
{noformat}
An alternative approach is to retry the creation of the ephemeral node up to 
sth like twice the session timeout. It may take a bit long for the broker to be 
re-registered. However, it seems it's a bit safer and simpler, until 
ZOOKEEPER-2985 is fixed.{noformat}
 

Cheers,

–

Jonathan


was (Author: pachilo):
Hello Juan Rao, I would like to continue working on this bug, hope you can have 
some time to elaborate a little bit more your proposal from 
[https://github.com/apache/kafka/pull/5575#issuecomment-416419017:]

 
{noformat}
An alternative approach is to retry the creation of the ephemeral node up to 
sth like twice the session timeout. It may take a bit long for the broker to be 
re-registered. However, it seems it's a bit safer and simpler, until 
ZOOKEEPER-2985 is fixed.{noformat}
 

Cheers,

--

Jonathan

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-10-24 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661931#comment-16661931
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Hello Juan Rao, I would like to continue working on this bug, hope you can have 
some time to elaborate a little bit more your proposal from 
[https://github.com/apache/kafka/pull/5575#issuecomment-416419017:]

 
{noformat}
An alternative approach is to retry the creation of the ephemeral node up to 
sth like twice the session timeout. It may take a bit long for the broker to be 
re-registered. However, it seems it's a bit safer and simpler, until 
ZOOKEEPER-2985 is fixed.{noformat}
 

Cheers,

--

Jonathan

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] 

[jira] [Assigned] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-08-25 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli reassigned KAFKA-7165:


Assignee: Jonathan Santilli

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>  at kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:465)
>  at 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-08-24 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591643#comment-16591643
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Done [~omkreddy], let's wait, thanks!

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>  at kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:465)
>  at 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-08-07 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16571905#comment-16571905
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Hello [~omkreddy] and [~cthunes] , thanks a lot for the feedback.

According to the opinions, what could be the next step?

 

Cheers!

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>  at 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-08-01 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564932#comment-16564932
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Sure [~omkreddy] , it makes sense, some misconfiguration could lead having two 
or more brokers with the same id (not ideal situation).

I will wait for [~cthunes] opinion and considerations.

 

Cheers!

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>  at 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-07-22 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552161#comment-16552161
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Ok [~omkreddy], I will get myself familiarized with the code to understand the 
edge case you are mentioning.

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>  at 

[jira] [Commented] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-07-22 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552070#comment-16552070
 ] 

Jonathan Santilli commented on KAFKA-7165:
--

Yes, [~omkreddy] seems to be related, do you know what's the status of this? I 
mean, is somebody working on it? if not, I would like to collaborate, if 
feasible.

 

Cheers!

 

> Error while creating ephemeral at /brokers/ids/BROKER_ID
> 
>
> Key: KAFKA-7165
> URL: https://issues.apache.org/jira/browse/KAFKA-7165
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 1.1.0
>Reporter: Jonathan Santilli
>Priority: Major
>
> Kafka version: 1.1.0
> Zookeeper version: 3.4.12
> 4 Kafka Brokers
> 4 Zookeeper servers
>  
> In one of the 4 brokers of the cluster, we detect the following error:
> [2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,510] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
> sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,287] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_2:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
> broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
>  [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
> session 0x3000c2420cb458d has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
> 0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
> (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session 
> to 
> *ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
>  (kafka.zookeeper.ZooKeeperClient)
>  [2018-07-14 04:38:26,459] INFO Initiating client connection, 
> connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
>  sessionTimeout=6000 
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
> (org.apache.zookeeper.ZooKeeper)
>  [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
> *ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,477] INFO Socket connection established to 
> *ZOOKEEPER_SERVER_1:PORT*, initiating session 
> (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
> *ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout 
> = 6000 (org.apache.zookeeper.ClientCnxn)
>  [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? 
> false) (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
> (kafka.common.ZkNodeChangeNotificationListener)
>  *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
> /brokers/ids/1, node already exists and owner '216186131422332301' does not 
> match current session '288330817911521280' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)*
>  [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 
> is: NODEEXISTS* (kafka.zk.KafkaZkClient)
>  [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
> 'isr-expiration' (kafka.utils.KafkaScheduler)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired for 
> /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>  at 

[jira] [Updated] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-07-15 Thread Jonathan Santilli (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Santilli updated KAFKA-7165:
-
Description: 
Kafka version: 1.1.0

Zookeeper version: 3.4.12

4 Kafka Brokers

4 Zookeeper servers

 

In one of the 4 brokers of the cluster, we detect the following error:

[2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:24,509] INFO Opening socket connection to server 
*ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:24,510] INFO Socket connection established to 
*ZOOKEEPER_SERVER_1:PORT*, initiating session (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:25,287] INFO Opening socket connection to server 
*ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:25,287] INFO Socket connection established to 
*ZOOKEEPER_SERVER_2:PORT*, initiating session (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:25,954] INFO [Partition TOPIC_NAME-PARTITION-# broker=1|#* 
broker=1] Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
 [2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
session 0x3000c2420cb458d has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
(kafka.zookeeper.ZooKeeperClient)
 [2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session to 
*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
 (kafka.zookeeper.ZooKeeperClient)
 [2018-07-14 04:38:26,459] INFO Initiating client connection, 
connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
 sessionTimeout=6000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
(org.apache.zookeeper.ZooKeeper)
 [2018-07-14 04:38:26,465] INFO Opening socket connection to server 
*ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:26,477] INFO Socket connection established to 
*ZOOKEEPER_SERVER_1:PORT*, initiating session (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:26,484] INFO Session establishment complete on server 
*ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
 [2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? false) 
(kafka.zk.KafkaZkClient)
 [2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
(kafka.common.ZkNodeChangeNotificationListener)
 *[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
/brokers/ids/1, node already exists and owner '216186131422332301' does not 
match current session '288330817911521280' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)*
 [2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 is: 
NODEEXISTS* (kafka.zk.KafkaZkClient)
 [2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
'isr-expiration' (kafka.utils.KafkaScheduler)

org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired for /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
 at kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:465)
 at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:621)
 at kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
 at 
kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:669)
 at kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:513)
 at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:504)
 at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:504)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
 at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
 at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:503)
 at 

[jira] [Created] (KAFKA-7165) Error while creating ephemeral at /brokers/ids/BROKER_ID

2018-07-15 Thread Jonathan Santilli (JIRA)
Jonathan Santilli created KAFKA-7165:


 Summary: Error while creating ephemeral at /brokers/ids/BROKER_ID
 Key: KAFKA-7165
 URL: https://issues.apache.org/jira/browse/KAFKA-7165
 Project: Kafka
  Issue Type: Bug
  Components: core, zkclient
Affects Versions: 1.1.0
Reporter: Jonathan Santilli


Kafka version: 1.1.0

Zookeeper version: 3.4.12

4 Kafka Brokers

4 Zookeeper servers

 

In one of the 4 brokers of the cluster, we detect the following error:

[2018-07-14 04:38:23,784] INFO Unable to read additional data from server 
sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:24,509] INFO Opening socket connection to server 
*ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:24,510] INFO Socket connection established to 
*ZOOKEEPER_SERVER_1:PORT*, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:24,513] INFO Unable to read additional data from server 
sessionid 0x3000c2420cb458d, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:25,287] INFO Opening socket connection to server 
*ZOOKEEPER_SERVER_2:PORT*. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:25,287] INFO Socket connection established to 
*ZOOKEEPER_SERVER_2:PORT*, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:25,954] INFO [Partition *TOPIC_NAME-PARTITION-#* broker=1] 
Shrinking ISR from 1,3,4,2 to 1,4,2 (kafka.cluster.Partition)
[2018-07-14 04:38:26,444] WARN Unable to reconnect to ZooKeeper service, 
session 0x3000c2420cb458d has expired (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:26,444] INFO Unable to reconnect to ZooKeeper service, 
session 0x3000c2420cb458d has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:26,445] INFO EventThread shut down for session: 
0x3000c2420cb458d (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:26,446] INFO [ZooKeeperClient] Session expired. 
(kafka.zookeeper.ZooKeeperClient)
[2018-07-14 04:38:26,459] INFO [ZooKeeperClient] Initializing a new session to 
*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*.
 (kafka.zookeeper.ZooKeeperClient)
[2018-07-14 04:38:26,459] INFO Initiating client connection, 
connectString=*ZOOKEEPER_SERVER_1:PORT*,*ZOOKEEPER_SERVER_2:PORT*,*ZOOKEEPER_SERVER_3:PORT*,*ZOOKEEPER_SERVER_4:PORT*
 sessionTimeout=6000 
watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@44821a96 
(org.apache.zookeeper.ZooKeeper)
[2018-07-14 04:38:26,465] INFO Opening socket connection to server 
*ZOOKEEPER_SERVER_1:PORT*. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:26,477] INFO Socket connection established to 
*ZOOKEEPER_SERVER_1:PORT*, initiating session (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:26,484] INFO Session establishment complete on server 
*ZOOKEEPER_SERVER_1:PORT*, sessionid = 0x4005b59eb6a, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2018-07-14 04:38:26,496] *INFO Creating /brokers/ids/1* (is it secure? false) 
(kafka.zk.KafkaZkClient)
[2018-07-14 04:38:26,500] INFO Processing notification(s) to /config/changes 
(kafka.common.ZkNodeChangeNotificationListener)
*[2018-07-14 04:38:26,547] ERROR Error while creating ephemeral at 
/brokers/ids/1, node already exists and owner '216186131422332301' does not 
match current session '288330817911521280' 
(kafka.zk.KafkaZkClient$CheckedEphemeral)*
[2018-07-14 04:38:26,547] *INFO Result of znode creation at /brokers/ids/1 is: 
NODEEXISTS* (kafka.zk.KafkaZkClient)
[2018-07-14 04:38:26,559] ERROR Uncaught exception in scheduled task 
'isr-expiration' (kafka.utils.KafkaScheduler)

org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired for /brokers/topics/*TOPIC_NAME*/partitions/*PARTITION-#*/state
at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:465)
at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:621)
at kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
at 
kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:669)
at kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:513)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:504)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:504)
at