[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Prasanna Subburaj (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355012#comment-16355012
 ] 

Prasanna Subburaj commented on KAFKA-6490:
--

[~ewencp]: Thanks for giving me permissions. 

I am interested in working on this improvement and yes we need to discuss more 
on the dead letter queue.

After creating the page I will start the discussion thread in the mailing list.

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Assignee: Prasanna Subburaj
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Prasanna Subburaj (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prasanna Subburaj reassigned KAFKA-6490:


Assignee: Prasanna Subburaj

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Assignee: Prasanna Subburaj
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6533) Kafka log cleaner stopped due to "cannot allocate memory" error

2018-02-06 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354994#comment-16354994
 ] 

huxihx commented on KAFKA-6533:
---

[~law1] Seems there is no way to only restart the background thread. "Cannot 
allocate memory"  there is insufficient memory for the Java Runtime 
Environment to continue.

> Kafka log cleaner stopped due to "cannot allocate memory" error
> ---
>
> Key: KAFKA-6533
> URL: https://issues.apache.org/jira/browse/KAFKA-6533
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Law
>Priority: Major
>
> Hi,
> I am on Kafka 0.10.2.0 and have an issue where the log cleaner is running 
> okay but suddenly stops because of a "cannot allocate memory" error.
> Here is the error from log-cleaner.log file:
> [2018-02-04 02:57:41,343] INFO [kafka-log-cleaner-thread-0],
>     Log cleaner thread 0 cleaned log __consumer_offsets-35 (dirty section 
> = [31740820448, 31740820448])
>     100.1 MB of log processed in 1.5 seconds (67.5 MB/sec).
>     Indexed 100.0 MB in 0.8 seconds (131.8 Mb/sec, 51.2% of total time)
>     Buffer utilization: 0.0%
>     Cleaned 100.1 MB in 0.7 seconds (138.2 Mb/sec, 48.8% of total time)
>     Start size: 100.1 MB (771,501 messages)
>     End size: 0.1 MB (501 messages)
>     99.9% size reduction (99.9% fewer messages)
>  (kafka.log.LogCleaner)
> [2018-02-04 02:57:41,348] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-15. (kafka.log.LogCleaner)
> [2018-02-04 02:57:41,348] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-15... (kafka.log.LogCleaner)
> [2018-02-04 02:57:41,359] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-15 for 1 segments in offset range [19492717509, 
> 19493524087). (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,067] INFO Cleaner 0: Offset map for log 
> __consumer_offsets-15 complete. (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,067] INFO Cleaner 0: Cleaning log __consumer_offsets-15 
> (cleaning prior to Sun Feb 04 02:57:34 GMT 2018, discarding tombstones prior 
> to Sat Feb 03 02:53:31 GMT 2018)... (k
> [2018-02-04 02:57:42,068] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-15 (largest timestamp Sat Sep 02 15:26:15 GMT 2017) into 
> 0, discarding deletes. (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,078] INFO Cleaner 0: Swapping in cleaned segment 0 for 
> segment(s) 0 in log __consumer_offsets-15. (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,078] INFO Cleaner 0: Cleaning segment 2148231985 in log 
> __consumer_offsets-15 (largest timestamp Thu Sep 28 15:50:19 GMT 2017) into 
> 2148231985, discarding deletes. (kafka.
> [2018-02-04 02:57:42,080] INFO Cleaner 0: Swapping in cleaned segment 
> 2148231985 for segment(s) 2148231985 in log __consumer_offsets-15. 
> (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,081] INFO Cleaner 0: Cleaning segment 4296532622 in log 
> __consumer_offsets-15 (largest timestamp Tue Oct 24 10:33:20 GMT 2017) into 
> 4296532622, discarding deletes. (kafka.
> [2018-02-04 02:57:42,083] INFO Cleaner 0: Swapping in cleaned segment 
> 4296532622 for segment(s) 4296532622 in log __consumer_offsets-15. 
> (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,083] INFO Cleaner 0: Cleaning segment 6444525822 in log 
> __consumer_offsets-15 (largest timestamp Mon Nov 20 11:33:30 GMT 2017) into 
> 6444525822, discarding deletes. (kafka.
> [2018-02-04 02:57:42,085] INFO Cleaner 0: Swapping in cleaned segment 
> 6444525822 for segment(s) 6444525822 in log __consumer_offsets-15. 
> (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,086] INFO Cleaner 0: Cleaning segment 8592045249 in log 
> __consumer_offsets-15 (largest timestamp Sat Dec 16 06:35:53 GMT 2017) into 
> 8592045249, discarding deletes. (kafka.
> [2018-02-04 02:57:42,088] INFO Cleaner 0: Swapping in cleaned segment 
> 8592045249 for segment(s) 8592045249 in log __consumer_offsets-15. 
> (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,088] INFO Cleaner 0: Cleaning segment 10739582585 in log 
> __consumer_offsets-15 (largest timestamp Wed Dec 27 21:15:44 GMT 2017) into 
> 10739582585, discarding deletes. (kafk
> [2018-02-04 02:57:42,091] INFO Cleaner 0: Swapping in cleaned segment 
> 10739582585 for segment(s) 10739582585 in log __consumer_offsets-15. 
> (kafka.log.LogCleaner)
> [2018-02-04 02:57:42,096] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.io.FileNotFoundException: 
> /kafka/broker1-logs/__consumer_offsets-15/012887210320.log.cleaned 
> (Cannot allocate memory)
>     at java.io.RandomAccessFile.open0(Native Method)
>     at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
>     at java.io.RandomAccessFile.(RandomAccessFile.java:243)
>     at 
> 

[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354959#comment-16354959
 ] 

Ewen Cheslack-Postava commented on KAFKA-6490:
--

[~prasanna1433] I've given you wiki permissions, you should be able to create a 
page now.

The dead letter queue is something I've specifically heard from a number of 
users, so it's definitely in demand. The list I gave is based on a ton of real 
user feedback, so I feel pretty confident that it is both a) covering important 
use cases and b) comprehensive enough to address the vast majority of use 
cases. But I'm of course open to discussion of the options. I suspect *more* 
options would be the result rather than removing some. If you want to take on 
this improvement, we can of course discuss further in the KIP thread :)

 

With respect to the version, new features should almost universally be worked 
on in trunk – older release branches are reserved for bug fixes. In this case, 
since we just cut 1.1 branches, this would be a candidate for 1.2 / 2.0 and can 
simply be developed against trunk.

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Prasanna Subburaj (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354957#comment-16354957
 ] 

Prasanna Subburaj commented on KAFKA-6490:
--

[~ewencp]: Thanks for feedback. What you are mentioning makes sense we should 
users option to chose from because each use case is different. I feel that 
Discard and log option can be provided to the user and skeptical about the dead 
letter queue.  

Also which version should this bug be worked upon ? 

 

Can I get please get access to the confluent 
([https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)]
 as well ? 

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354945#comment-16354945
 ] 

Ewen Cheslack-Postava commented on KAFKA-6490:
--

A change in behavior like that would definitely require a KIP – existing users 
would not expect this at all.

Connect started with the current behavior because for many users losing data is 
worse than suffering some downtime. However, it's clear some alternatives are 
warranted; this question comes up from time to time on mailing lists. Generally 
there are only a few options that seem to make sense:
 * Stop processing (current behavior) and log
 * Log and retry (really only makes sense for unusual edge cases where data got 
corrupted in flight between Kafka and Connect)
 * Discard and log (I care about uptime more than a bit of lost data)
 * Dead letter queue (or some other fallback handler)

The retry case is probably the least important here as it will rarely make a 
difference, so the other 3 are the ones I think we'd want to implement. A KIP 
for this should be straightforward, though the implementation will require care 
to make sure we handle all places errors can occur (in the producer/consumer, 
during deserialization, during transformations, etc).

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2018-02-06 Thread Prasanna Subburaj (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354924#comment-16354924
 ] 

Prasanna Subburaj commented on KAFKA-3832:
--

[~kkonstantine], [~wicknicks] Please advice on this issue

> Kafka Connect's JSON Converter never outputs a null value
> -
>
> Key: KAFKA-3832
> URL: https://issues.apache.org/jira/browse/KAFKA-3832
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Prasanna Subburaj
>Priority: Major
>  Labels: newbie
>
> Kafka Connect's JSON Converter will never output a null value when 
> {{enableSchemas=true}}. This means that when a connector outputs a 
> {{SourceRecord}} with a null value, the JSON Converter will always produce a 
> message value with:
> {code:javascript}
> {
>   "schema": null,
>   "payload": null
> }
> {code}
> And, this means that while Kafka log compaction will always be able to remove 
> earlier messages with the same key, log compaction will never remove _all_ of 
> the messages with the same key. 
> The JSON Connector's {{fromConnectData(...)}} should always return null when 
> it is supplied a null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-3832) Kafka Connect's JSON Converter never outputs a null value

2018-02-06 Thread Prasanna Subburaj (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prasanna Subburaj reassigned KAFKA-3832:


Assignee: Prasanna Subburaj

> Kafka Connect's JSON Converter never outputs a null value
> -
>
> Key: KAFKA-3832
> URL: https://issues.apache.org/jira/browse/KAFKA-3832
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Assignee: Prasanna Subburaj
>Priority: Major
>  Labels: newbie
>
> Kafka Connect's JSON Converter will never output a null value when 
> {{enableSchemas=true}}. This means that when a connector outputs a 
> {{SourceRecord}} with a null value, the JSON Converter will always produce a 
> message value with:
> {code:javascript}
> {
>   "schema": null,
>   "payload": null
> }
> {code}
> And, this means that while Kafka log compaction will always be able to remove 
> earlier messages with the same key, log compaction will never remove _all_ of 
> the messages with the same key. 
> The JSON Connector's {{fromConnectData(...)}} should always return null when 
> it is supplied a null value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Prasanna Subburaj (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354908#comment-16354908
 ] 

Prasanna Subburaj edited comment on KAFKA-6490 at 2/7/18 3:41 AM:
--

Thanks for you response [~mjsax]. I think we need a similar feature in 
connector because now if we get a malformed JSON message the connector will 
fail and will not process any additional message that are coming after this 
one. Can you also please add me to the contributors list ?


was (Author: prasanna1433):
Thanks for you response [~mjsax]. I think we need a similar feature in 
connector because now if we get a malformed JSON message the connector will 
fail and will not process any additional message that are coming after this 
one. I can work on KIP for solving this issue if the forks you tagged in agree 
with what I am saying. Can you also please add me to the contributors list ?

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Prasanna Subburaj (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354908#comment-16354908
 ] 

Prasanna Subburaj commented on KAFKA-6490:
--

Thanks for you response [~mjsax]. I think we need a similar feature in 
connector because now if we get a malformed JSON message the connector will 
fail and will not process any additional message that are coming after this 
one. I can work on KIP for solving this issue if the forks you tagged in agree 
with what I am saying. Can you also please add me to the contributors list ?

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6539) KafkaConsumer endlessly try to connect to a dead broker, ignoring brokers alive

2018-02-06 Thread Song Younghwan (JIRA)
Song Younghwan created KAFKA-6539:
-

 Summary: KafkaConsumer endlessly try to connect to a dead broker, 
ignoring brokers alive
 Key: KAFKA-6539
 URL: https://issues.apache.org/jira/browse/KAFKA-6539
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.0.0
 Environment: Java 8
Brokers on CentOS 7.4
Consumers on Windows 10
Reporter: Song Younghwan
 Attachments: consumer.log

I consider to use Kafka in my company, so currently doing failover test.

Conditions:
 * org.apache.kafka:kafka-clients:1.0.0
 * New consumer using bootstrap.servers, a consumer group and a group 
coordinator
 * num. brokers = 3 (id #1, #2, #3)
 * Topic num. partitions = 3, replication factor = 3
 * offsets.topic.replication.factor = 3

Reproduction Step:
 # Run consumers in the same consumer group, each of them subscribe to a topic
 # Kill (kill -9) #1, #2 broker simultaneously (only #3 online)
 # Consumers eventually connect to #3 broker
 # Start #1, #2 broker again after a while (#1, #2, #3 online)
 # Kill (kill -9) #2, #3 broker simultaneously (only #1 online)
 # *{color:#FF}Now consumers endlessly try to connect to #3 broker 
only{color}*
 # Start #2 broker again after a while (#1, #2 online)
 # *{color:#FF}Consumers still blindly try to connect to #3 broker{color}*

Expectation:

Consumers successfully connect to #1 broker after step 5.

Record:

I attached a consumer log file with TRACE log level. Related events below:
 * 12:03:13 kills #1, #2 broker simultaneously
 * 12:03:42 starts #1, #2 broker again
 * 12:04:01 kills #2, #3 broker simultaneously
 * 12:04:42 starts #2 broker again

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-06 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354889#comment-16354889
 ] 

Bill Bejeck commented on KAFKA-2967:


+1 for RST from me.

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic

2018-02-06 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354876#comment-16354876
 ] 

huxihx commented on KAFKA-6524:
---

[~omkreddy] Could you set `client.id=__admin_client` in producer.config and 
retry?

> kafka mirror can't producer internal topic 
> ---
>
> Key: KAFKA-6524
> URL: https://issues.apache.org/jira/browse/KAFKA-6524
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Ahmed Madkour
>Priority: Minor
>
> We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka 
> cluster and producer the data to another single broker kafka cluster
> We want to include internal topics so we added the following in the consumer 
> configuration
> exclude.internal.topics=false
> We keep receiving the following errors:
> {code:java}
> org.apache.kafka.common.errors.InvalidTopicException: The request attempted 
> to perform an operation on an invalid topic.
>  ERROR Error when sending message to topic __consumer_offsets with key: 43 
> bytes, value: 28 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> {code}
> It seems that the producer can't access the internal topic __consumer_offsets.
> Any way to fix that?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration

2018-02-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6534:
-
Issue Type: Bug  (was: Improvement)

> Consumer.poll may not trigger rebalance in time when there is a task migration
> --
>
> Key: KAFKA-6534
> URL: https://issues.apache.org/jira/browse/KAFKA-6534
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> When Streams detect a task migration event in one of its thread, today it 
> will always let its trigger to call {{consumer.poll}} hoping it could trigger 
> the rebalance and hence clean up the records buffered from the partitions 
> that on longer owned. However, because the rebalance is based on heartbeat 
> responses which has a window of race, the rebalance is not always guaranteed 
> to be triggered when task migration happens. As a result it could cause the 
> records buffered in consumer to not be cleaned up and later be processed by 
> Streams, realizing it no longer belongs to the thread, causing:
> {code:java}
> java.lang.IllegalStateException: Record's partition does not belong to this 
> partition-group.
> {code}
> Note this issue is only relevant when EOS is turned on, and based the default 
> heartbeat.interval.ms value (3 sec), the race likelihood should not be high.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6534) Consumer.poll may not trigger rebalance in time when there is a task migration

2018-02-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6534:


Assignee: Guozhang Wang

> Consumer.poll may not trigger rebalance in time when there is a task migration
> --
>
> Key: KAFKA-6534
> URL: https://issues.apache.org/jira/browse/KAFKA-6534
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> When Streams detect a task migration event in one of its thread, today it 
> will always let its trigger to call {{consumer.poll}} hoping it could trigger 
> the rebalance and hence clean up the records buffered from the partitions 
> that on longer owned. However, because the rebalance is based on heartbeat 
> responses which has a window of race, the rebalance is not always guaranteed 
> to be triggered when task migration happens. As a result it could cause the 
> records buffered in consumer to not be cleaned up and later be processed by 
> Streams, realizing it no longer belongs to the thread, causing:
> {code:java}
> java.lang.IllegalStateException: Record's partition does not belong to this 
> partition-group.
> {code}
> Note this issue is only relevant when EOS is turned on, and based the default 
> heartbeat.interval.ms value (3 sec), the race likelihood should not be high.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354856#comment-16354856
 ] 

Guozhang Wang commented on KAFKA-2967:
--

I'm +1.

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354853#comment-16354853
 ] 

Matthias J. Sax commented on KAFKA-2967:


Big +1 for RST from my side.

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354852#comment-16354852
 ] 

Matthias J. Sax commented on KAFKA-6490:


I am not too familiar with the details of Connect. However, it sound like this 
change might require a KIP. There was a similar issue for Streams and we added 
a config so people can choose to resume of fail for this case: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers]

\cc  [~wicknicks] [~rhauch] [~kkonstantine]

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-02-06 Thread Allen Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354831#comment-16354831
 ] 

Allen Wang commented on KAFKA-6514:
---

[~ijuma] Since we don't change the name of the metric, but just adding a tag, 
it shouldn't break existing monitoring system. So I am wondering why a KIP is 
still required in this case. 

For example, I found that when upgrading to Kafka 0.10, "records-consumed-rate" 
metric has an added tag "topic", which makes sense to me. 

I would prefer keeping the same metric name since people are already familiar 
with it and the name makes sense. It would also have minimal impact on the 
storage required on a typical metric aggregation system. 

 

> Add API version as a tag for the RequestsPerSec metric
> --
>
> Key: KAFKA-6514
> URL: https://issues.apache.org/jira/browse/KAFKA-6514
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Allen Wang
>Priority: Major
>
> After we upgrade broker to a new version, one important insight is to see how 
> many clients have been upgraded so that we can switch the message format when 
> most of the clients have also been updated to the new version to minimize the 
> performance penalty. 
> RequestsPerSec with the version tag will give us that insight.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6472) WordCount example code error

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354817#comment-16354817
 ] 

ASF GitHub Bot commented on KAFKA-6472:
---

joel-hamill opened a new pull request #126: KAFKA-6472 - Fix WordCount example 
code error
URL: https://github.com/apache/kafka-site/pull/126
 
 
   https://issues.apache.org/jira/browse/KAFKA-6472
   
   - related https://github.com/apache/kafka/pull/4538
   
   CC: @guozhangwang 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Trivial
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6472) WordCount example code error

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354816#comment-16354816
 ] 

ASF GitHub Bot commented on KAFKA-6472:
---

joel-hamill opened a new pull request #4538: KAFKA-6472 - Fix WordCount example 
code error
URL: https://github.com/apache/kafka/pull/4538
 
 
   https://issues.apache.org/jira/browse/KAFKA-6472
   
   CC: @guozhangwang 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Trivial
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6490) JSON SerializationException Stops Connect

2018-02-06 Thread Prasanna Subburaj (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354805#comment-16354805
 ] 

Prasanna Subburaj commented on KAFKA-6490:
--

[~mjsax] Can you please help [~wspeirs] with this ticket ?

 
 

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Priority: Major
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6430) Improve Kafka GZip compression performance

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354798#comment-16354798
 ] 

ASF GitHub Bot commented on KAFKA-6430:
---

ying-zheng opened a new pull request #4537: KAFKA-6430: Add buffer between Java 
data stream and gzip stream
URL: https://github.com/apache/kafka/pull/4537
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Kafka GZip compression performance
> --
>
> Key: KAFKA-6430
> URL: https://issues.apache.org/jira/browse/KAFKA-6430
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, compression, core
>Reporter: Ying Zheng
>Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>   new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataOutputStream, which writes 4 bytes 
> separately in big-endian order. 
> {code}
> public final void writeInt(int v) throws IOException {
> out.write((v >>> 24) & 0xFF);
> out.write((v >>> 16) & 0xFF);
> out.write((v >>>  8) & 0xFF);
> out.write((v >>>  0) & 0xFF);
> incCount(4);
> }
> {code}
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only provides a write(byte[], offset, len) method, which calls 
> the corresponding JNI zlib function. The write(byte) calls from 
> DataOutputStream are translated into write(byte[], offset, len) calls in a 
> very inefficient way: (Oracle JDK 1.8 code)
> {code}
> class DeflaterOutputStream {
> public void write(int b) throws IOException {
> byte[] buf = new byte[1];
> buf[0] = (byte)(b & 0xff);
> write(buf, 0, 1);
> }
> public void write(byte[] b, int off, int len) throws IOException {
> if (def.finished()) {
> throw new IOException("write beyond end of stream");
> }
> if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
> throw new IndexOutOfBoundsException();
> } else if (len == 0) {
> return;
> }
> if (!def.finished()) {
> def.setInput(b, off, len);
> while (!def.needsInput()) {
> deflate();
> }
> }
> }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
> public synchronized void write(byte[] buf, int off, int len)
> throws IOException
> {
> super.write(buf, off, len);
> crc.update(buf, off, len);
> }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int 
> flush);
> }
> class CRC32 {
> public void update(byte[] b, int off, int len) {
> if (b == null) {
> throw new NullPointerException();
> }
> if (off < 0 || len < 0 || off > b.length - len) {
> throw new ArrayIndexOutOfBoundsException();
> }
> crc = updateBytes(crc, b, off, len);
> }
> private native static int updateBytes(int crc, byte[] b, int off, int 
> len);
> }
> {code}
> For each meta data byte, the code above has to allocate 1 single byte array, 
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
> bytes.
> The call stack of Deflater.deflateBytes():
> 

[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-06 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354762#comment-16354762
 ] 

Ewen Cheslack-Postava commented on KAFKA-2967:
--

I'd like to raise this once again and see what people think of the current 
state. The docs have grown increasingly hacky, especially with the streams docs 
sort of ending up on their own page, weirdly integrated with the table of 
contents, javascript hacks to get things rendered properly client-side, and 
what I consider a mess of SSIs.

I'd like to propose a solution that I think addresses most of the concerns from 
the thread. I had some offline discussion with some other folks and we 
discussed a few different options, including markdown, which I assumed would be 
the most popular for people writing docs. We concluded that the support from 
just simple markdown wasn't going to be good enough and that usually you end up 
having to adopt some layer on top of it anyway (and have to learn all the 
details of that tool). Depending on what you choose, the ability to integrate 
with the rest of the site well has varying levels of difficulty.

 

I'd like to propose an alternative that Gwen mentioned originally – use 
sphinx/rst. 
 * Theming to both reuse existing styling and layout is trivial.
 * Its a full docs solution and has all the doodads and plugins we'd need as 
the docs continue to grow.
 * Migrate progressively, and only migrate docs for now. We'll just be 
compiling down to html anyway, so we can move progressively, and can choose 
which parts of the site we want to live under sphinx. Of course there is one 
caveat that the initial conversion will probably require a big chunk of work 
since the docs are currently just one giant page and we'd probably want it 
broken up from the get-go. But html content could still easily be used and 
linked to.
 * Can easily escape hatch back to raw html if we ever need it / want more 
flexibility in layout / etc. It's a trivial command to do so.
 * Maybe most importantly, I'm prepared to take on the work of setting up the 
build & doing initial conversion.

The main drawback is that not everyone will be familiar with rst. As I 
mentioned, this is actually a problem with even markdown as you usually need 
additional tooling on top of the raw syntax so I don't see it as a big problem. 
The other annoyance is that while we can include html snippets (e.g. configs 
and metrics autogenerated docs) directly, but it'll look a lot better if we 
actually convert the autogeneration to spit out rst instead. We don't have too 
many of these and it's just another method to generate the output, so probably 
best to just do that at the same time as initial conversion.

 

I think it is easy to bikeshed on something like this (everyone has their 
favorite tool), but does anybody have any objections to this proposal? I'd like 
to be able to move forward with this soon as I know dealing with the current 
form of docs has been a constant pain - there are fewer Connect docs than there 
should be (and would be if it wasn't so hard to add them) and Streams is 
constantly fighting with the html docs. If nobody pipes up in a few days, I'll 
plan to execute and we can take the rest of the discussion to a PR.

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-02-06 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6538:
--

 Summary: Enhance ByteStore exceptions with more context information
 Key: KAFKA-6538
 URL: https://issues.apache.org/jira/browse/KAFKA-6538
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.2.0
Reporter: Matthias J. Sax


In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
only have concrete key/value types on outer layers/wrappers of the stores.

For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
messages anymore if a put/get/delete operation fails as it only handles plain 
bytes.

Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
corresponding information for which key/value the operation failed in the 
wrapping stores (KeyValueStore, WindowedStored, and SessionStore).

Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2423) Introduce Scalastyle

2018-02-06 Thread Ray Chiang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354661#comment-16354661
 ] 

Ray Chiang commented on KAFKA-2423:
---

[~granthenke], let me know if I can take this one over.

> Introduce Scalastyle
> 
>
> Key: KAFKA-2423
> URL: https://issues.apache.org/jira/browse/KAFKA-2423
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Grant Henke
>Priority: Major
>
> This is similar to Checkstyle (which we already use), but for Scala:
> http://www.scalastyle.org/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-02-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354625#comment-16354625
 ] 

Ismael Juma commented on KAFKA-6514:


Thanks for the JIRA [~allenxwang]. Agreed that this would be useful. A couple 
of comments/questions:
 # For compatibility reasons, we may consider keeping the metric without a 
version and add new metrics with the version tag.
 # We need a simple KIP since this affects metrics, which are public interfaces.
 # Given that the KIP freeze for 1.1.0 was on 23 January 
([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957546),] 
this would have to target the next release.

> Add API version as a tag for the RequestsPerSec metric
> --
>
> Key: KAFKA-6514
> URL: https://issues.apache.org/jira/browse/KAFKA-6514
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Allen Wang
>Priority: Major
>
> After we upgrade broker to a new version, one important insight is to see how 
> many clients have been upgraded so that we can switch the message format when 
> most of the clients have also been updated to the new version to minimize the 
> performance penalty. 
> RequestsPerSec with the version tag will give us that insight.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-02-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354610#comment-16354610
 ] 

Matthias J. Sax commented on KAFKA-6535:


I don't think we should do this by default. I am just wondering, if we should 
allow users to "tell" Kafka Streams to purge data ?

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6536) Streams quickstart pom.xml is missing versions for a bunch of plugins

2018-02-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6536:
-
Labels: newbie  (was: )

> Streams quickstart pom.xml is missing versions for a bunch of plugins
> -
>
> Key: KAFKA-6536
> URL: https://issues.apache.org/jira/browse/KAFKA-6536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 0.11.0.2, 1.0.1
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: newbie
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> There are a bunch of plugins being used that maven helpfully warns you about 
> being unversioned:
> {code:java}
> > [INFO] Scanning for projects...
> > [WARNING]
> > [WARNING] Some problems were encountered while building the effective model 
> > for org.apache.kafka:streams-quickstart-java:maven-archetype:1.0.1
> > [WARNING] 'build.plugins.plugin.version' for 
> > org.apache.maven.plugins:maven-shade-plugin is missing. @ 
> > org.apache.kafka:streams-quickstart:1.0.1, 
> > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, 
> > line 64, column 21
> > [WARNING] 'build.plugins.plugin.version' for 
> > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ 
> > org.apache.kafka:streams-quickstart:1.0.1, 
> > /Users/ewencp/kafka.git/.release_work_dir/kafka/streams/quickstart/pom.xml, 
> > line 74, column 21
> > [WARNING]
> > [WARNING] Some problems were encountered while building the effective model 
> > for org.apache.kafka:streams-quickstart:pom:1.0.1
> > [WARNING] 'build.plugins.plugin.version' for 
> > org.apache.maven.plugins:maven-shade-plugin is missing. @ line 64, column 21
> > [WARNING] 'build.plugins.plugin.version' for 
> > com.github.siom79.japicmp:japicmp-maven-plugin is missing. @ line 74, 
> > column 21
> > [WARNING]
> > [WARNING] It is highly recommended to fix these problems because they 
> > threaten the stability of your build.
> > [WARNING]
> > [WARNING] For this reason, future Maven versions might no longer support 
> > building such malformed projects.{code}
> Unversioned dependencies are dangerous as they make the build 
> non-reproducible. In fact, a released version may become very difficult to 
> build as the user would have to track down the working versions of the 
> plugins. This seems particularly bad for the quickstart as it's likely to be 
> copy/pasted into people's own projects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-02-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354548#comment-16354548
 ] 

Guozhang Wang commented on KAFKA-6535:
--

I think we cannot safely do that for user topics, since these topics maybe 
shared by multiple applications, and some of them may not even be written in 
Streams. Though admittedly in practice such sharing may not be common, but we 
still cannot do that for sure.

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2018-02-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354546#comment-16354546
 ] 

Guozhang Wang commented on KAFKA-4750:
--

As a summary of this resolution:

{code}
Here is the new rule for handling nulls:
* in the interface store, put(key, null) are handled normally and value serde 
applied to null.
* in the inner most store, null bytes after serialization will always be 
treated as deletes.
* in the inner most store, range queries returning iterators should never 
return null bytes.
* in the interface store, if null bytes get returned in get(key), serde will be 
avoided and null object will be returned.
{code}

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>Priority: Major
>  Labels: newbie
> Fix For: 1.1.0
>
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4750) KeyValueIterator returns null values

2018-02-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4750:
-
Fix Version/s: 1.1.0

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>Priority: Major
>  Labels: newbie
> Fix For: 1.1.0
>
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-02-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354332#comment-16354332
 ] 

Matthias J. Sax commented on KAFKA-5253:


I doubled checked {{TopologyTestDriver}} and it is not able to handle Pattern 
subscription. The fix should be fairly easy, however, required a public API 
change and thus a KIP.

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-02-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5253:
---
Affects Version/s: (was: 0.10.2.1)
   1.1.0

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-02-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5253:
---
Description: 
*Context*
 -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
unit test topologies while developing KStreams apps.

One such topology uses a Pattern to consume from multiple topics at once.

*Problem*
 The unit test of the topology fails because -KStreamTestDriver- 
TopologyTestDriver fails to deal with Patterns properly.

*Example*
 Underneath is a unit test explaining what I understand should happen, but is 
failing.

Explicitly adding a source topic matching the topic pattern, generates an 
exception as the topology builder explicitly checks overlapping topic names and 
patterns, in any order of adding pattern and topic. So, it is intended 
behaviour.
{code:java}
@Test
public void shouldProcessFromSourcesThatDoMatchThePattern() {
// -- setup stream pattern
final KStream source = 
builder.stream(Pattern.compile("topic-source-\\d"));
source.to("topic-sink");

// -- setup processor to capture results
final MockProcessorSupplier processorSupplier = new 
MockProcessorSupplier<>();
source.process(processorSupplier);

// -- add source to stream data from
//builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
"topic-source-3");

// -- build test driver
driver = new KStreamTestDriver(builder);
driver.setTime(0L);

// -- test
driver.process("topic-source-3", "A", "aa");

// -- validate
// no exception was thrown
assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
}
{code}
*Solution*
 If anybody can help in defining the solution, I can create a pull request for 
this change.

  was:
*Context*
KStreamTestDriver is being used to unit test topologies while developing 
KStreams apps. 

One such topology uses a Pattern to consume from multiple topics at once. 

*Problem*
The unit test of the topology fails because KStreamTestDriver fails to deal 
with Patterns properly.

*Example*
Underneath is a unit test explaining what I understand should happen, but is 
failing. 

Explicitly adding a source topic matching the topic pattern, generates an 
exception as the topology builder explicitly checks overlapping topic names and 
patterns, in any order of adding pattern and topic. So, it is intended 
behaviour.

{code:java}
@Test
public void shouldProcessFromSourcesThatDoMatchThePattern() {
// -- setup stream pattern
final KStream source = 
builder.stream(Pattern.compile("topic-source-\\d"));
source.to("topic-sink");

// -- setup processor to capture results
final MockProcessorSupplier processorSupplier = new 
MockProcessorSupplier<>();
source.process(processorSupplier);

// -- add source to stream data from
//builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
"topic-source-3");

// -- build test driver
driver = new KStreamTestDriver(builder);
driver.setTime(0L);

// -- test
driver.process("topic-source-3", "A", "aa");

// -- validate
// no exception was thrown
assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
}
{code}

*Solution*
If anybody can help in defining the solution, I can create a pull request for 
this change. 


> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> 

[jira] [Updated] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-02-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5253:
---
Summary: TopologyTestDriver must handle streams created with patterns  
(was: KStreamTestDriver must handle streams created with patterns)

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.10.2.1
>Reporter: Wim Van Leuven
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
> KStreamTestDriver is being used to unit test topologies while developing 
> KStreams apps. 
> One such topology uses a Pattern to consume from multiple topics at once. 
> *Problem*
> The unit test of the topology fails because KStreamTestDriver fails to deal 
> with Patterns properly.
> *Example*
> Underneath is a unit test explaining what I understand should happen, but is 
> failing. 
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
> If anybody can help in defining the solution, I can create a pull request for 
> this change. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5253) KStreamTestDriver must handle streams created with patterns

2018-02-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5253:
---
Labels: beginner needs-kip newbie  (was: )

> KStreamTestDriver must handle streams created with patterns
> ---
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.10.2.1
>Reporter: Wim Van Leuven
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
> KStreamTestDriver is being used to unit test topologies while developing 
> KStreams apps. 
> One such topology uses a Pattern to consume from multiple topics at once. 
> *Problem*
> The unit test of the topology fails because KStreamTestDriver fails to deal 
> with Patterns properly.
> *Example*
> Underneath is a unit test explaining what I understand should happen, but is 
> failing. 
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
> If anybody can help in defining the solution, I can create a pull request for 
> this change. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-02-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6474:
---
Labels: beginner newbie  (was: )

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5889) MetricsTest is flaky

2018-02-06 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reopened KAFKA-5889:
---

As of 332e698ac9c74ce29317021b03a54512c92ac8b3 , I got:
{code}
kafka.metrics.MetricsTest > testMetricsLeak FAILED
java.lang.AssertionError: expected:<1421> but was:<1424>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
kafka.metrics.MetricsTest$$anonfun$testMetricsLeak$1.apply$mcVI$sp(MetricsTest.scala:68)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at kafka.metrics.MetricsTest.testMetricsLeak(MetricsTest.scala:66)
{code}

> MetricsTest is flaky
> 
>
> Key: KAFKA-5889
> URL: https://issues.apache.org/jira/browse/KAFKA-5889
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Major
>
> The following appeared in several recent builds (e.g. 
> https://builds.apache.org/job/kafka-trunk-jdk7/2758) :
> {code}
> kafka.metrics.MetricsTest > testMetricsLeak FAILED
> java.lang.AssertionError: expected:<1216> but was:<1219>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:645)
> at org.junit.Assert.assertEquals(Assert.java:631)
> at 
> kafka.metrics.MetricsTest$$anonfun$testMetricsLeak$1.apply$mcVI$sp(MetricsTest.scala:68)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> at kafka.metrics.MetricsTest.testMetricsLeak(MetricsTest.scala:66)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6184) report a metric of the lag between the consumer offset and the start offset of the log

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354323#comment-16354323
 ] 

ASF GitHub Bot commented on KAFKA-6184:
---

junrao closed pull request #4191: KAFKA-6184: report a metric of the lag 
between the consumer offset ...
URL: https://github.com/apache/kafka/pull/4191
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5dc0b26f8fb..55b7bbb9a8c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -44,6 +44,7 @@
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.BufferSupplier;
@@ -578,6 +579,11 @@ private long endTimestamp() {
 if (partitionLag != null)
 
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
 
+Long lead = 
subscriptions.partitionLead(partitionRecords.partition);
+if (lead != null) {
+
this.sensors.recordPartitionLead(partitionRecords.partition, lead);
+}
+
 return partRecords;
 } else {
 // these records aren't next in line based on the last 
consumed position, ignore them
@@ -860,6 +866,11 @@ private PartitionRecords 
parseCompletedFetch(CompletedFetch completedFetch) {
 subscriptions.updateHighWatermark(tp, 
partition.highWatermark);
 }
 
+if (partition.logStartOffset >= 0) {
+log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset);
+subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset);
+}
+
 if (partition.lastStableOffset >= 0) {
 log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset);
 subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset);
@@ -934,7 +945,7 @@ private PartitionRecords parseCompletedFetch(CompletedFetch 
completedFetch) {
 
 @Override
 public void onAssignment(Set assignment) {
-sensors.updatePartitionLagSensors(assignment);
+sensors.updatePartitionLagAndLeadSensors(assignment);
 }
 
 public static Sensor throttleTimeSensor(Metrics metrics, 
FetcherMetricsRegistry metricsRegistry) {
@@ -1250,6 +1261,7 @@ protected void increment(int bytes, int records) {
 private final Sensor recordsFetched;
 private final Sensor fetchLatency;
 private final Sensor recordsFetchLag;
+private final Sensor recordsFetchLead;
 
 private Set assignedPartitions;
 
@@ -1276,6 +1288,9 @@ private FetchManagerMetrics(Metrics metrics, 
FetcherMetricsRegistry metricsRegis
 
 this.recordsFetchLag = metrics.sensor("records-lag");
 
this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), 
new Max());
+
+this.recordsFetchLead = metrics.sensor("records-lead");
+
this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin),
 new Min());
 }
 
 private void recordTopicFetchMetrics(String topic, int bytes, int 
records) {
@@ -1311,16 +1326,37 @@ private void recordTopicFetchMetrics(String topic, int 
bytes, int records) {
 recordsFetched.record(records);
 }
 
-private void updatePartitionLagSensors(Set 
assignedPartitions) {
+private void updatePartitionLagAndLeadSensors(Set 
assignedPartitions) {
 if (this.assignedPartitions != null) {
 for (TopicPartition tp : this.assignedPartitions) {
-if (!assignedPartitions.contains(tp))
+if (!assignedPartitions.contains(tp)) {
 metrics.removeSensor(partitionLagMetricName(tp));
+metrics.removeSensor(partitionLeadMetricName(tp));
+}
 }
 }
 this.assignedPartitions = assignedPartitions;
 }
 
+private void recordPartitionLead(TopicPartition tp, long lead) {
+

[jira] [Resolved] (KAFKA-6184) report a metric of the lag between the consumer offset and the start offset of the log

2018-02-06 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6184.

   Resolution: Fixed
Fix Version/s: 1.2.0

The PR is merged to trunk.

> report a metric of the lag between the consumer offset and the start offset 
> of the log
> --
>
> Key: KAFKA-6184
> URL: https://issues.apache.org/jira/browse/KAFKA-6184
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Currently, the consumer reports a metric of the lag between the high 
> watermark of a log and the consumer offset. It will be useful to report a 
> similar lag metric between the consumer offset and the start offset of the 
> log. If this latter lag gets close to 0, it's an indication that the consumer 
> may lose data soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable

2018-02-06 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6526:
--
Fix Version/s: 1.2.0

Since it is too late to merge this for 1.1, moving this out to the next release.

> Update controller to handle changes to unclean.leader.election.enable
> -
>
> Key: KAFKA-6526
> URL: https://issues.apache.org/jira/browse/KAFKA-6526
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> At the moment, updates to default unclean.leader.election.enable uses the 
> same code path as updates to topic overrides. This requires controller change 
> for the new value to take effect. It will be good if we can update the 
> controller to handle the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable

2018-02-06 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6526:
--
Issue Type: Improvement  (was: Sub-task)
Parent: (was: KAFKA-6240)

> Update controller to handle changes to unclean.leader.election.enable
> -
>
> Key: KAFKA-6526
> URL: https://issues.apache.org/jira/browse/KAFKA-6526
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> At the moment, updates to default unclean.leader.election.enable uses the 
> same code path as updates to topic overrides. This requires controller change 
> for the new value to take effect. It will be good if we can update the 
> controller to handle the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5971) Broker keeps running even though not registered in ZK

2018-02-06 Thread Andrej Urvantsev (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354265#comment-16354265
 ] 

Andrej Urvantsev commented on KAFKA-5971:
-

I think that we faced the same problem. Running Kafka 1.0.0
{noformat}
[2018-02-05 19:43:23,460] INFO Unable to read additional data from server 
sessionid 0x101aa15a51e000a, likely server has closed socket, closing socket co
nnection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,561] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-02-05 19:43:23,584] INFO Opening socket connection to server 
172.30.35.44/172.30.35.44:2181. Will not attempt to authenticate using SASL 
(unknown e
rror) (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,673] INFO Socket connection established to 
172.30.35.44/172.30.35.44:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,700] WARN Unable to reconnect to ZooKeeper service, 
session 0x101aa15a51e000a has expired (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,700] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-02-05 19:43:23,700] INFO Unable to reconnect to ZooKeeper service, 
session 0x101aa15a51e000a has expired, closing socket connection (org.apache.zoo
keeper.ClientCnxn)
[2018-02-05 19:43:23,700] INFO Initiating client connection, 
connectString=172.30.34.209:2181,172.30.36.237:2181,172.30.35.44:2181 
sessionTimeout=6000 wa
tcher=org.I0Itec.zkclient.ZkClient@6572421 (org.apache.zookeeper.ZooKeeper)
[2018-02-05 19:43:23,703] INFO Opening socket connection to server 
172.30.34.209/172.30.34.209:2181. Will not attempt to authenticate using SASL 
(unknown
 error) (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,704] INFO Socket connection established to 
172.30.34.209/172.30.34.209:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,704] INFO EventThread shut down for session: 
0x101aa15a51e000a (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,705] INFO Unable to read additional data from server 
sessionid 0x0, likely server has closed socket, closing socket connection and a
ttempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:23,805] INFO Waiting for keeper state SyncConnected 
(org.I0Itec.zkclient.ZkClient)
[2018-02-05 19:43:23,805] INFO Waiting for keeper state SyncConnected 
(org.I0Itec.zkclient.ZkClient)
[2018-02-05 19:43:24,523] INFO Opening socket connection to server 
172.30.36.237/172.30.36.237:2181. Will not attempt to authenticate using SASL 
(unknown
 error) (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:24,523] INFO Socket connection established to 
172.30.36.237/172.30.36.237:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:24,524] INFO Unable to read additional data from server 
sessionid 0x0, likely server has closed socket, closing socket connection and a
ttempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:24,886] INFO Socket connection established to 
172.30.35.44/172.30.35.44:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:25,460] INFO Unable to read additional data from server 
sessionid 0x0, likely server has closed socket, closing socket connection and 
attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:27,327] INFO Opening socket connection to server 
172.30.34.209/172.30.34.209:2181. Will not attempt to authenticate using SASL 
(unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:27,328] INFO Socket connection established to 
172.30.34.209/172.30.34.209:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:27,332] INFO Session establishment complete on server 
172.30.34.209/172.30.34.209:2181, sessionid = 0x101b18ebf3b0001, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-02-05 19:43:27,332] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-02-05 19:43:27,334] INFO re-registering broker info in ZK for broker 3523 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-02-05 19:43:27,334] INFO Creating /brokers/ids/3523 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-02-05 19:43:27,340] INFO Result of znode creation is: NODEEXISTS 
(kafka.utils.ZKCheckedEphemeral)
[2018-02-05 19:43:27,341] ERROR Error handling event ZkEvent[New session event 
sent to kafka.server.KafkaHealthcheck$SessionExpireListener@493b2c1d] 
(org.I0Itec.zkclient.ZkEventThread)
java.lang.RuntimeException: A broker is already registered on the path 
/brokers/ids/3523. This probably indicates that you either have configured a 
brokerid that is already in use, or else you have shutdown this broker and 
restarted it faster than the zookeeper timeout so it appears to be 
re-registering.
at 

[jira] [Updated] (KAFKA-3665) Default ssl.endpoint.identification.algorithm should be https

2018-02-06 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3665:
--
Fix Version/s: 2.0.0

> Default ssl.endpoint.identification.algorithm should be https
> -
>
> Key: KAFKA-3665
> URL: https://issues.apache.org/jira/browse/KAFKA-3665
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.0.0
>
>
> The default `ssl.endpoint.identification.algorithm` is `null` which is not a 
> secure default (man in the middle attacks are possible).
> We should probably use `https` instead. A more conservative alternative would 
> be to update the documentation instead of changing the default.
> A paper on the topic (thanks to Ryan Pridgeon for the reference): 
> http://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-3665) Default ssl.endpoint.identification.algorithm should be https

2018-02-06 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-3665:
-

Assignee: Rajini Sivaram

> Default ssl.endpoint.identification.algorithm should be https
> -
>
> Key: KAFKA-3665
> URL: https://issues.apache.org/jira/browse/KAFKA-3665
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
>
> The default `ssl.endpoint.identification.algorithm` is `null` which is not a 
> secure default (man in the middle attacks are possible).
> We should probably use `https` instead. A more conservative alternative would 
> be to update the documentation instead of changing the default.
> A paper on the topic (thanks to Ryan Pridgeon for the reference): 
> http://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2018-02-06 Thread Nick Travers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354259#comment-16354259
 ] 

Nick Travers commented on KAFKA-4669:
-

Chiming in again to note that we're still running into this issue 
intermittently. The failure mode is the same, with a BufferUnderflowException 
and stack trace similar to what I posted above.

For some additional context, when this occurs it ultimately leads to a JVM that 
cannot exit as it is waiting on a latch that will never be closed. Here's the 
hung thread
{code:java}
"async-message-sender-0" #120 daemon prio=5 os_prio=0 tid=0x7f30b4003000 
nid=0x195a1 waiting on condition [0x7f3105ce1000]
   java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@9/Native Method)
- parking to wait for  <0x0007852b1b68> (a 
java.util.concurrent.CountDownLatch$Sync)
at 
java.util.concurrent.locks.LockSupport.park(java.base@9/LockSupport.java:194)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@9/AbstractQueuedSynchronizer.java:871)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1024)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1331)
at 
java.util.concurrent.CountDownLatch.await(java.base@9/CountDownLatch.java:232)
at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462)
at 
com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214)
- locked <0x000728c71998> (a 
com.squareup.kafka.ng.producer.KafkaProducer)
at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585)
at 
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@9/ThreadPoolExecutor.java:1167)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@9/ThreadPoolExecutor.java:641)
at java.lang.Thread.run(java.base@9/Thread.java:844)
{code}
[Here is the 
latch|https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java#L34]
 that is still open in the ProduceRequestResult. I assume that the network 
thread is responsible for closing that, but if that thread crashes for whatever 
reason, it never gets a chance to callCountDownLatch#countDown.

Arguably, we should probably be using a combination of daemon threads, and the 
timed version of Future#get, but it _feels_ like something that could be fixed 
in the producer client, even if it's just for the sake of ensuring that failed 
ProduceRequestResults can be GC'd eventually, which can't happen if another 
thread is hung waiting on the latch.

cc: [~rsivaram] [~hachikuji]

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Assignee: Rajini Sivaram
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at 

[jira] [Commented] (KAFKA-4641) Improve test coverage of StreamsThread

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354236#comment-16354236
 ] 

ASF GitHub Bot commented on KAFKA-4641:
---

guozhangwang opened a new pull request #4531: KAFKA-4641: Add more unit test 
for stream thread
URL: https://github.com/apache/kafka/pull/4531
 
 
   Before the patch, jacoco coverage test:
   
   
   Element | Missed Instructions | Cov. | Missed Branches | Cov. | Missed | 
Cxty | Missed | Lines | Missed | Methods | Missed | Classes
   -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
   Total | 3,386 of 22,177 | 84% | 336 of 1,639 | 79% | 350 | 1,589 | 526 | 
4,451 | 103 | 768 | 1 | 102
   StreamThread |   | 77% |   | 76% | 27 | 102 | 48 | 299 | 1 | 31 | 0 | 1
   
   After the patch:
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve test coverage of StreamsThread
> --
>
> Key: KAFKA-4641
> URL: https://issues.apache.org/jira/browse/KAFKA-4641
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Damian Guy
>Priority: Minor
>  Labels: newbie
>
> Some methods in {{StreamThread}} have little or no coverage.
> In particular:
> {{maybeUpdateStandbyTasks}} has little to no coverage
> Committing of StandbyTasks in {{commitAll}}
> {{maybePunctuate}}
> {{commitOne}} - no tests for exceptions
> {{unAssignChangeLogPartitions} - no tests for exceptions
> {{addStreamsTask}} - no tests for exceptions
> {{runLoop}}
> Please see coverage report attached to parent



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6504) Connect: Some per-task-metrics not working

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354172#comment-16354172
 ] 

ASF GitHub Bot commented on KAFKA-6504:
---

hachikuji closed pull request #4514: KAFKA-6504: Fix creation of a sensor to be 
specific to a metric group so it is not shared
URL: https://github.com/apache/kafka/pull/4514
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 69614948147..4ee8ad6a808 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -677,34 +677,34 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, 
ConnectMetrics connectMetrics) {
 // prevent collisions by removing any previously created metrics 
in this group.
 metricGroup.close();
 
-sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
+sinkRecordRead = metricGroup.sensor("sink-record-read");
 
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new 
Rate());
 
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new 
Total());
 
-sinkRecordSend = metricGroup.metrics().sensor("sink-record-send");
+sinkRecordSend = metricGroup.sensor("sink-record-send");
 
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new 
Rate());
 
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new 
Total());
 
-sinkRecordActiveCount = 
metricGroup.metrics().sensor("sink-record-active-count");
+sinkRecordActiveCount = 
metricGroup.sensor("sink-record-active-count");
 
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount),
 new Value());
 
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountMax),
 new Max());
 
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountAvg),
 new Avg());
 
-partitionCount = metricGroup.metrics().sensor("partition-count");
+partitionCount = metricGroup.sensor("partition-count");
 
partitionCount.add(metricGroup.metricName(registry.sinkRecordPartitionCount), 
new Value());
 
-offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number");
+offsetSeqNum = metricGroup.sensor("offset-seq-number");
 
offsetSeqNum.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), 
new Value());
 
-offsetCompletion = 
metricGroup.metrics().sensor("offset-commit-completion");
+offsetCompletion = metricGroup.sensor("offset-commit-completion");
 
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate),
 new Rate());
 
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal),
 new Total());
 
-offsetCompletionSkip = 
metricGroup.metrics().sensor("offset-commit-completion-skip");
+offsetCompletionSkip = 
metricGroup.sensor("offset-commit-completion-skip");
 
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate),
 new Rate());
 
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal),
 new Total());
 
-putBatchTime = metricGroup.metrics().sensor("put-batch-time");
+putBatchTime = metricGroup.sensor("put-batch-time");
 
putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), 
new Max());
 
putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), 
new Avg());
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a172cdb45f0..473e2359578 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -509,7 +509,7 @@ public SourceTaskMetricsGroup(ConnectorTaskId id, 
ConnectMetrics connectMetrics)
 
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new 
Max());
 
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new 
Avg());
 
-  

[jira] [Created] (KAFKA-6537) Duplicate consumers after consumer group rebalance

2018-02-06 Thread Michael Golovanov (JIRA)
Michael Golovanov created KAFKA-6537:


 Summary: Duplicate consumers after consumer group rebalance
 Key: KAFKA-6537
 URL: https://issues.apache.org/jira/browse/KAFKA-6537
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.1.1
Reporter: Michael Golovanov


*_Deployment description_*

Kafka brokers have been deployed on ten (10) nodes. Zookeeper cluster have 
seven (7) nodes. Nodes of Kafka brokers shared with Zookeeper nodes on bare 
metal hosts.

Broker/Zookeeper hosts OS is Redhat 7 and JVM version is Java 8. Host names are 
grid48, grid237 и grid251.

We have one topic with six (6) patitions. Kafka consumers deployed on three (3) 
hosts. Each host have two (2) consumers. All consumers belong to single group.

 

*_Error description_*

After start all consumers Apache Kafka partitions of topic was balanced evenly.

grid237 owns partitions 0,1 (0 - consumer thread-0, 1, consumer thread-1)
 grid251 owns partitions 2,3 (2 - consumer thread-0, consumer thread-1)
 grid48 owns partitions 4,5 (4- consumer thread-0, 5, consumer thread-1)

After some period of time we see haotic revokes and assigns partitions between 
brokers and then all partitions in log assigned to one consumer on one node 
grid48

 

But really all partitions read not only by thread-1 consumer, but thread-0 on 
grid48. And all messages from topic partitions was duplicate. Consumer thread-0 
try to commit message offset and get commit error, thread-1 consumer 
successfully commit offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize

2018-02-06 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6528.
---
   Resolution: Fixed
Fix Version/s: 1.1.0

> Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
> --
>
> Key: KAFKA-6528
> URL: https://issues.apache.org/jira/browse/KAFKA-6528
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> {code:java}
> java.lang.AssertionError: expected:<108> but was:<123>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.stopAndVerifyProduceConsume(DynamicBrokerReconfigurationTest.scala:755)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:443)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:451){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353653#comment-16353653
 ] 

ASF GitHub Bot commented on KAFKA-6528:
---

rajinisivaram closed pull request #4526: KAFKA-6528: Fix transient test failure 
in testThreadPoolResize
URL: https://github.com/apache/kafka/pull/4526
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 6d88d8de8b2..312123c3ab6 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -70,9 +70,7 @@ abstract class AbstractFetcherManager(protected val name: 
String, clientId: Stri
   def resizeThreadPool(newSize: Int): Unit = {
 def migratePartitions(newSize: Int): Unit = {
   fetcherThreadMap.foreach { case (id, thread) =>
-val removedPartitions = 
thread.partitionStates.partitionStates.asScala.map { case state =>
-  state.topicPartition -> new 
BrokerAndInitialOffset(thread.sourceBroker, state.value.fetchOffset)
-}.toMap
+val removedPartitions = thread.partitionsAndOffsets
 removeFetcherForPartitions(removedPartitions.keySet)
 if (id.fetcherId >= newSize)
   thread.shutdown()
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 925c33095a2..39a70321a6e 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -312,6 +312,12 @@ abstract class AbstractFetcherThread(name: String,
 finally partitionMapLock.unlock()
   }
 
+  private[server] def partitionsAndOffsets: Map[TopicPartition, 
BrokerAndInitialOffset] = inLock(partitionMapLock) {
+partitionStates.partitionStates.asScala.map { case state =>
+  state.topicPartition -> new BrokerAndInitialOffset(sourceBroker, 
state.value.fetchOffset)
+}.toMap
+  }
+
 }
 
 object AbstractFetcherThread {
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b7f0ae863a2..cb2ac5244a0 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -69,6 +69,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
   private val servers = new ArrayBuffer[KafkaServer]
   private val numServers = 3
+  private val numPartitions = 10
   private val producers = new ArrayBuffer[KafkaProducer[String, String]]
   private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
   private val adminClients = new ArrayBuffer[AdminClient]()
@@ -122,7 +123,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 
OffsetConfig.DefaultOffsetsTopicNumPartitions,
   replicationFactor = numServers, servers, 
servers.head.groupCoordinator.offsetsTopicConfigs)
 
-TestUtils.createTopic(zkClient, topic, numPartitions = 10, 
replicationFactor = numServers, servers)
+TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = 
numServers, servers)
 createAdminClient(SecurityProtocol.SSL, SecureInternal)
 
 TestMetricsReporter.testReporters.clear()
@@ -203,7 +204,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
   @Test
   def testKeyStoreAlter(): Unit = {
 val topic2 = "testtopic2"
-TestUtils.createTopic(zkClient, topic2, numPartitions = 10, 
replicationFactor = numServers, servers)
+TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = 
numServers, servers)
 
 // Start a producer and consumer that work with the current truststore.
 // This should continue working while changes are made
@@ -241,7 +242,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 verifyProduceConsume(producer, consumer, 10, topic2)
 
 // Verify that all messages sent with retries=0 while keystores were being 
altered were consumed
-stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayFailRequests = false)
+stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
   @Test
@@ -282,7 +283,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet